diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 8cf3cb284..00bb145d6 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -162,12 +162,21 @@ -type replies() :: emqx_session:replies(). -define(STATS_KEYS, [ + durable, subscriptions_cnt, subscriptions_max, inflight_cnt, inflight_max, mqueue_len, - mqueue_dropped + mqueue_dropped, + seqno_q1_comm, + seqno_q1_dup, + seqno_q1_next, + seqno_q2_comm, + seqno_q2_dup, + seqno_q2_rec, + seqno_q2_next, + n_streams ]). %% @@ -214,6 +223,8 @@ info(id, #{id := ClientID}) -> ClientID; info(clientid, #{id := ClientID}) -> ClientID; +info(durable, _) -> + true; info(created_at, #{s := S}) -> emqx_persistent_session_ds_state:get_created_at(S); info(is_persistent, #{}) -> @@ -249,6 +260,26 @@ info(mqueue_dropped, _Session) -> % AwaitingRel; %% info(awaiting_rel_cnt, #{s := S}) -> %% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); +info(seqno_q1_comm, #{s := S}) -> + emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S); +info(seqno_q1_dup, #{s := S}) -> + emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S); +info(seqno_q1_next, #{s := S}) -> + emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S); +info(seqno_q2_comm, #{s := S}) -> + emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S); +info(seqno_q2_dup, #{s := S}) -> + emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S); +info(seqno_q2_rec, #{s := S}) -> + emqx_persistent_session_ds_state:get_seqno(?rec, S); +info(seqno_q2_next, #{s := S}) -> + emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S); +info(n_streams, #{s := S}) -> + emqx_persistent_session_ds_state:fold_streams( + fun(_, _, Acc) -> Acc + 1 end, + 0, + S + ); info(awaiting_rel_max, #{props := Conf}) -> maps:get(max_awaiting_rel, Conf); info(await_rel_timeout, #{props := _Conf}) -> diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index dbb440f41..439057384 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -132,6 +132,7 @@ -type replies() :: emqx_session:replies(). -define(STATS_KEYS, [ + durable, subscriptions_cnt, subscriptions_max, inflight_cnt, @@ -254,6 +255,8 @@ info(created_at, #session{created_at = CreatedAt}) -> CreatedAt; info(is_persistent, #session{is_persistent = IsPersistent}) -> IsPersistent; +info(durable, _) -> + false; info(subscriptions, #session{subscriptions = Subs}) -> Subs; info(subscriptions_cnt, #session{subscriptions = Subs}) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 5a4c7baed..1b79689f0 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -621,7 +621,67 @@ fields(client) -> " Maximum number of subscriptions allowed by this client">> })}, {username, hoconsc:mk(binary(), #{desc => <<"User name of client when connecting">>})}, - {mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})} + {mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})}, + {durable, hoconsc:mk(boolean(), #{desc => <<"Session is durable">>})}, + {n_streams, + hoconsc:mk(non_neg_integer(), #{ + desc => <<"Number of streams used by the durable session">> + })}, + + {seqno_q1_comm, + hoconsc:mk(non_neg_integer(), #{ + desc => + << + "Sequence number of the last PUBACK received from the client " + "(Durable sessions only)" + >> + })}, + {seqno_q1_dup, + hoconsc:mk(non_neg_integer(), #{ + desc => + << + "Sequence number of the last QoS1 message sent to the client, that hasn't been acked " + "(Durable sessions only)" + >> + })}, + {seqno_q1_next, + hoconsc:mk(non_neg_integer(), #{ + desc => + << + "Sequence number of next QoS1 message to be added to the batch " + "(Durable sessions only)" + >> + })}, + + {seqno_q2_comm, + hoconsc:mk(non_neg_integer(), #{ + desc => + << + "Sequence number of the last PUBCOMP received from the client " + "(Durable sessions only)" + >> + })}, + {seqno_q2_dup, + hoconsc:mk(non_neg_integer(), #{ + desc => + << + "Sequence number of last unacked QoS2 PUBLISH message sent to the client " + "(Durable sessions only)" + >> + })}, + {seqno_q2_rec, + hoconsc:mk(non_neg_integer(), #{ + desc => + <<"Sequence number of last PUBREC received from the client (Durable sessions only)">> + })}, + {seqno_q2_next, + hoconsc:mk(non_neg_integer(), #{ + desc => + << + "Sequence number of next QoS2 message to be added to the batch " + "(Durable sessions only)" + >> + })} ]; fields(authz_cache) -> [ @@ -1588,7 +1648,8 @@ client_example() -> <<"recv_msg">> => 0, <<"recv_pkt">> => 4, <<"recv_cnt">> => 4, - <<"recv_msg.qos0">> => 0 + <<"recv_msg.qos0">> => 0, + <<"durable">> => false }. message_example() ->