Merge pull request #12474 from ieQu1/dev/ds-rest-api
feat(sessds): Expose relevant durable session info in the REST API
This commit is contained in:
commit
34c3c4e892
|
@ -162,12 +162,21 @@
|
|||
-type replies() :: emqx_session:replies().
|
||||
|
||||
-define(STATS_KEYS, [
|
||||
durable,
|
||||
subscriptions_cnt,
|
||||
subscriptions_max,
|
||||
inflight_cnt,
|
||||
inflight_max,
|
||||
mqueue_len,
|
||||
mqueue_dropped
|
||||
mqueue_dropped,
|
||||
seqno_q1_comm,
|
||||
seqno_q1_dup,
|
||||
seqno_q1_next,
|
||||
seqno_q2_comm,
|
||||
seqno_q2_dup,
|
||||
seqno_q2_rec,
|
||||
seqno_q2_next,
|
||||
n_streams
|
||||
]).
|
||||
|
||||
%%
|
||||
|
@ -214,6 +223,8 @@ info(id, #{id := ClientID}) ->
|
|||
ClientID;
|
||||
info(clientid, #{id := ClientID}) ->
|
||||
ClientID;
|
||||
info(durable, _) ->
|
||||
true;
|
||||
info(created_at, #{s := S}) ->
|
||||
emqx_persistent_session_ds_state:get_created_at(S);
|
||||
info(is_persistent, #{}) ->
|
||||
|
@ -249,6 +260,26 @@ info(mqueue_dropped, _Session) ->
|
|||
% AwaitingRel;
|
||||
%% info(awaiting_rel_cnt, #{s := S}) ->
|
||||
%% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S);
|
||||
info(seqno_q1_comm, #{s := S}) ->
|
||||
emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S);
|
||||
info(seqno_q1_dup, #{s := S}) ->
|
||||
emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S);
|
||||
info(seqno_q1_next, #{s := S}) ->
|
||||
emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S);
|
||||
info(seqno_q2_comm, #{s := S}) ->
|
||||
emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S);
|
||||
info(seqno_q2_dup, #{s := S}) ->
|
||||
emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S);
|
||||
info(seqno_q2_rec, #{s := S}) ->
|
||||
emqx_persistent_session_ds_state:get_seqno(?rec, S);
|
||||
info(seqno_q2_next, #{s := S}) ->
|
||||
emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S);
|
||||
info(n_streams, #{s := S}) ->
|
||||
emqx_persistent_session_ds_state:fold_streams(
|
||||
fun(_, _, Acc) -> Acc + 1 end,
|
||||
0,
|
||||
S
|
||||
);
|
||||
info(awaiting_rel_max, #{props := Conf}) ->
|
||||
maps:get(max_awaiting_rel, Conf);
|
||||
info(await_rel_timeout, #{props := _Conf}) ->
|
||||
|
|
|
@ -132,6 +132,7 @@
|
|||
-type replies() :: emqx_session:replies().
|
||||
|
||||
-define(STATS_KEYS, [
|
||||
durable,
|
||||
subscriptions_cnt,
|
||||
subscriptions_max,
|
||||
inflight_cnt,
|
||||
|
@ -254,6 +255,8 @@ info(created_at, #session{created_at = CreatedAt}) ->
|
|||
CreatedAt;
|
||||
info(is_persistent, #session{is_persistent = IsPersistent}) ->
|
||||
IsPersistent;
|
||||
info(durable, _) ->
|
||||
false;
|
||||
info(subscriptions, #session{subscriptions = Subs}) ->
|
||||
Subs;
|
||||
info(subscriptions_cnt, #session{subscriptions = Subs}) ->
|
||||
|
|
|
@ -621,7 +621,67 @@ fields(client) ->
|
|||
" Maximum number of subscriptions allowed by this client">>
|
||||
})},
|
||||
{username, hoconsc:mk(binary(), #{desc => <<"User name of client when connecting">>})},
|
||||
{mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})}
|
||||
{mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})},
|
||||
{durable, hoconsc:mk(boolean(), #{desc => <<"Session is durable">>})},
|
||||
{n_streams,
|
||||
hoconsc:mk(non_neg_integer(), #{
|
||||
desc => <<"Number of streams used by the durable session">>
|
||||
})},
|
||||
|
||||
{seqno_q1_comm,
|
||||
hoconsc:mk(non_neg_integer(), #{
|
||||
desc =>
|
||||
<<
|
||||
"Sequence number of the last PUBACK received from the client "
|
||||
"(Durable sessions only)"
|
||||
>>
|
||||
})},
|
||||
{seqno_q1_dup,
|
||||
hoconsc:mk(non_neg_integer(), #{
|
||||
desc =>
|
||||
<<
|
||||
"Sequence number of the last QoS1 message sent to the client, that hasn't been acked "
|
||||
"(Durable sessions only)"
|
||||
>>
|
||||
})},
|
||||
{seqno_q1_next,
|
||||
hoconsc:mk(non_neg_integer(), #{
|
||||
desc =>
|
||||
<<
|
||||
"Sequence number of next QoS1 message to be added to the batch "
|
||||
"(Durable sessions only)"
|
||||
>>
|
||||
})},
|
||||
|
||||
{seqno_q2_comm,
|
||||
hoconsc:mk(non_neg_integer(), #{
|
||||
desc =>
|
||||
<<
|
||||
"Sequence number of the last PUBCOMP received from the client "
|
||||
"(Durable sessions only)"
|
||||
>>
|
||||
})},
|
||||
{seqno_q2_dup,
|
||||
hoconsc:mk(non_neg_integer(), #{
|
||||
desc =>
|
||||
<<
|
||||
"Sequence number of last unacked QoS2 PUBLISH message sent to the client "
|
||||
"(Durable sessions only)"
|
||||
>>
|
||||
})},
|
||||
{seqno_q2_rec,
|
||||
hoconsc:mk(non_neg_integer(), #{
|
||||
desc =>
|
||||
<<"Sequence number of last PUBREC received from the client (Durable sessions only)">>
|
||||
})},
|
||||
{seqno_q2_next,
|
||||
hoconsc:mk(non_neg_integer(), #{
|
||||
desc =>
|
||||
<<
|
||||
"Sequence number of next QoS2 message to be added to the batch "
|
||||
"(Durable sessions only)"
|
||||
>>
|
||||
})}
|
||||
];
|
||||
fields(authz_cache) ->
|
||||
[
|
||||
|
@ -1588,7 +1648,8 @@ client_example() ->
|
|||
<<"recv_msg">> => 0,
|
||||
<<"recv_pkt">> => 4,
|
||||
<<"recv_cnt">> => 4,
|
||||
<<"recv_msg.qos0">> => 0
|
||||
<<"recv_msg.qos0">> => 0,
|
||||
<<"durable">> => false
|
||||
}.
|
||||
|
||||
message_example() ->
|
||||
|
|
Loading…
Reference in New Issue