Merge pull request #12500 from thalesmg/ds-list-client-api-m-20240209

feat(ds): list disconnected persistent sessions in clients API
This commit is contained in:
Thales Macedo Garitezi 2024-02-21 09:02:27 -03:00 committed by GitHub
commit 529211b9ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 713 additions and 34 deletions

View File

@ -36,10 +36,20 @@
-export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
-export([get_subscriptions/1, put_subscription/4, del_subscription/3]).
-export([make_session_iterator/0, session_iterator_next/2]).
-export([
make_session_iterator/0,
session_iterator_next/2,
session_count/0
]).
-export_type([
t/0, metadata/0, subscriptions/0, seqno_type/0, stream_key/0, rank_key/0, session_iterator/0
t/0,
metadata/0,
subscriptions/0,
seqno_type/0,
stream_key/0,
rank_key/0,
session_iterator/0
]).
-include("emqx_mqtt.hrl").
@ -359,17 +369,18 @@ del_rank(Key, Rec) ->
fold_ranks(Fun, Acc, Rec) ->
gen_fold(ranks, Fun, Acc, Rec).
-spec session_count() -> non_neg_integer().
session_count() ->
%% N.B.: this is potentially costly. Should not be called in hot paths.
%% `mnesia:table_info(_, size)' is always zero for rocksdb, so we need to traverse...
do_session_count(make_session_iterator(), 0).
-spec make_session_iterator() -> session_iterator().
make_session_iterator() ->
case mnesia:dirty_first(?session_tab) of
'$end_of_table' ->
'$end_of_table';
Key ->
Key
end.
mnesia:dirty_first(?session_tab).
-spec session_iterator_next(session_iterator(), pos_integer()) ->
{[{emqx_persistent_session_ds:id(), metadata()}], session_iterator()}.
{[{emqx_persistent_session_ds:id(), metadata()}], session_iterator() | '$end_of_table'}.
session_iterator_next(Cursor, 0) ->
{[], Cursor};
session_iterator_next('$end_of_table', _N) ->
@ -564,6 +575,18 @@ ro_transaction(Fun) ->
%% {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun),
%% Res.
%%
do_session_count('$end_of_table', N) ->
N;
do_session_count(Cursor, N) ->
case session_iterator_next(Cursor, 1) of
{[], _} ->
N;
{_, NextCursor} ->
do_session_count(NextCursor, N + 1)
end.
-compile({inline, check_sequence/1}).
-ifdef(CHECK_SEQNO).

View File

@ -66,6 +66,9 @@
do_kickout_clients/1
]).
%% Internal exports
-export([lookup_running_client/2]).
%% Internal functions
-export([do_call_client/2]).
@ -314,10 +317,16 @@ nodes_info_count(PropList) ->
%%--------------------------------------------------------------------
lookup_client({clientid, ClientId}, FormatFun) ->
lists:append([
lookup_client(Node, {clientid, ClientId}, FormatFun)
|| Node <- emqx:running_nodes()
]);
IsPersistenceEnabled = emqx_persistent_message:is_persistence_enabled(),
case lookup_running_client(ClientId, FormatFun) of
[] when IsPersistenceEnabled ->
case emqx_persistent_session_ds_state:print_session(ClientId) of
undefined -> [];
Session -> [maybe_format(FormatFun, {ClientId, Session})]
end;
Res ->
Res
end;
lookup_client({username, Username}, FormatFun) ->
lists:append([
lookup_client(Node, {username, Username}, FormatFun)
@ -633,6 +642,16 @@ create_banned(Banned) ->
delete_banned(Who) ->
emqx_banned:delete(Who).
%%--------------------------------------------------------------------
%% Internal exports
%%--------------------------------------------------------------------
lookup_running_client(ClientId, FormatFun) ->
lists:append([
lookup_client(Node, {clientid, ClientId}, FormatFun)
|| Node <- emqx:running_nodes()
]).
%%--------------------------------------------------------------------
%% Internal Functions.
%%--------------------------------------------------------------------

View File

@ -39,7 +39,13 @@
parse_pager_params/1,
parse_qstring/2,
init_query_result/0,
accumulate_query_rows/4
init_query_state/5,
reset_query_state/1,
accumulate_query_rows/4,
finalize_query/2,
mark_complete/2,
format_query_result/3,
maybe_collect_total_from_tail_nodes/2
]).
-ifdef(TEST).

View File

@ -701,26 +701,13 @@ list_clients(QString) ->
case maps:get(<<"node">>, QString, undefined) of
undefined ->
Options = #{fast_total_counting => true},
emqx_mgmt_api:cluster_query(
?CHAN_INFO_TAB,
QString,
?CLIENT_QSCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format_channel_info/2,
Options
);
list_clients_cluster_query(QString, Options);
Node0 ->
case emqx_utils:safe_to_existing_atom(Node0) of
{ok, Node1} ->
QStringWithoutNode = maps:without([<<"node">>], QString),
emqx_mgmt_api:node_query(
Node1,
?CHAN_INFO_TAB,
QStringWithoutNode,
?CLIENT_QSCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format_channel_info/2
);
QStringWithoutNode = maps:remove(<<"node">>, QString),
Options = #{},
list_clients_node_query(Node1, QStringWithoutNode, Options);
{error, _} ->
{error, Node0, {badrpc, <<"invalid node">>}}
end
@ -854,6 +841,170 @@ do_unsubscribe(ClientID, Topic) ->
Res
end.
list_clients_cluster_query(QString, Options) ->
case emqx_mgmt_api:parse_pager_params(QString) of
false ->
{error, page_limit_invalid};
Meta = #{} ->
try
{_CodCnt, NQString} = emqx_mgmt_api:parse_qstring(QString, ?CLIENT_QSCHEMA),
Nodes = emqx:running_nodes(),
ResultAcc = emqx_mgmt_api:init_query_result(),
QueryState = emqx_mgmt_api:init_query_state(
?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options
),
Res = do_list_clients_cluster_query(Nodes, QueryState, ResultAcc),
emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res)
catch
throw:{bad_value_type, {Key, ExpectedType, AcutalValue}} ->
{error, invalid_query_string_param, {Key, ExpectedType, AcutalValue}}
end
end.
%% adapted from `emqx_mgmt_api:do_cluster_query'
do_list_clients_cluster_query(
[Node | Tail] = Nodes,
QueryState0,
ResultAcc
) ->
case emqx_mgmt_api:do_query(Node, QueryState0) of
{error, Error} ->
{error, Node, Error};
{Rows, QueryState1 = #{complete := Complete0}} ->
case emqx_mgmt_api:accumulate_query_rows(Node, Rows, QueryState1, ResultAcc) of
{enough, NResultAcc} ->
%% TODO: add persistent session count?
%% TODO: this may return `{error, _, _}'...
QueryState2 = emqx_mgmt_api:maybe_collect_total_from_tail_nodes(
Tail, QueryState1
),
QueryState = add_persistent_session_count(QueryState2),
Complete = Complete0 andalso Tail =:= [] andalso no_persistent_sessions(),
emqx_mgmt_api:finalize_query(
NResultAcc, emqx_mgmt_api:mark_complete(QueryState, Complete)
);
{more, NResultAcc} when not Complete0 ->
do_list_clients_cluster_query(Nodes, QueryState1, NResultAcc);
{more, NResultAcc} when Tail =/= [] ->
do_list_clients_cluster_query(
Tail, emqx_mgmt_api:reset_query_state(QueryState1), NResultAcc
);
{more, NResultAcc} ->
QueryState = add_persistent_session_count(QueryState1),
do_persistent_session_query(NResultAcc, QueryState)
end
end.
list_clients_node_query(Node, QString, Options) ->
case emqx_mgmt_api:parse_pager_params(QString) of
false ->
{error, page_limit_invalid};
Meta = #{} ->
{_CodCnt, NQString} = emqx_mgmt_api:parse_qstring(QString, ?CLIENT_QSCHEMA),
ResultAcc = emqx_mgmt_api:init_query_result(),
QueryState = emqx_mgmt_api:init_query_state(
?CHAN_INFO_TAB, NQString, fun ?MODULE:qs2ms/2, Meta, Options
),
Res = do_list_clients_node_query(Node, QueryState, ResultAcc),
emqx_mgmt_api:format_query_result(fun ?MODULE:format_channel_info/2, Meta, Res)
end.
add_persistent_session_count(QueryState0 = #{total := Totals0}) ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
%% TODO: currently, counting persistent sessions can be not only costly (needs
%% to traverse the whole table), but also hard to deduplicate live connections
%% from it... So this count will possibly overshoot the true count of
%% sessions.
SessionCount = emqx_persistent_session_ds_state:session_count(),
Totals = Totals0#{undefined => SessionCount},
QueryState0#{total := Totals};
false ->
QueryState0
end;
add_persistent_session_count(QueryState) ->
QueryState.
%% adapted from `emqx_mgmt_api:do_node_query'
do_list_clients_node_query(
Node,
QueryState,
ResultAcc
) ->
case emqx_mgmt_api:do_query(Node, QueryState) of
{error, Error} ->
{error, Node, Error};
{Rows, NQueryState = #{complete := Complete}} ->
case emqx_mgmt_api:accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
{enough, NResultAcc} ->
FComplete = Complete andalso no_persistent_sessions(),
emqx_mgmt_api:finalize_query(
NResultAcc, emqx_mgmt_api:mark_complete(NQueryState, FComplete)
);
{more, NResultAcc} when Complete ->
do_persistent_session_query(NResultAcc, NQueryState);
{more, NResultAcc} ->
do_list_clients_node_query(Node, NQueryState, NResultAcc)
end
end.
init_persistent_session_iterator() ->
emqx_persistent_session_ds_state:make_session_iterator().
no_persistent_sessions() ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
Cursor = init_persistent_session_iterator(),
case emqx_persistent_session_ds_state:session_iterator_next(Cursor, 1) of
{[], _} ->
true;
_ ->
false
end;
false ->
true
end.
do_persistent_session_query(ResultAcc, QueryState) ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
do_persistent_session_query1(
ResultAcc,
QueryState,
init_persistent_session_iterator()
);
false ->
emqx_mgmt_api:finalize_query(ResultAcc, QueryState)
end.
do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
%% Since persistent session data is accessible from all nodes, there's no need to go
%% through all the nodes.
#{limit := Limit} = QueryState,
{Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
Rows = remove_live_sessions(Rows0),
case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of
{enough, NResultAcc} ->
emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true));
{more, NResultAcc} when Iter =:= '$end_of_table' ->
emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true));
{more, NResultAcc} ->
do_persistent_session_query1(NResultAcc, QueryState, Iter)
end.
remove_live_sessions(Rows) ->
lists:filtermap(
fun({ClientId, _Session}) ->
case emqx_mgmt:lookup_running_client(ClientId, _FormatFn = undefined) of
[] ->
{true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}};
[_ | _] ->
false
end
end,
Rows
).
%%--------------------------------------------------------------------
%% QueryString to Match Spec
@ -928,7 +1079,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} |
%% format funcs
format_channel_info(ChannInfo = {_, _ClientInfo, _ClientStats}) ->
format_channel_info(node(), ChannInfo).
%% channel info from ETS table (live and/or in-memory session)
format_channel_info(node(), ChannInfo);
format_channel_info({ClientId, PSInfo}) ->
%% offline persistent session
format_persistent_session_info(ClientId, PSInfo).
format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
Node = maps:get(node, ClientInfo0, WhichNode),
@ -986,7 +1141,29 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
maps:without(RemoveList, ClientInfoMap),
TimesKeys
)
).
);
format_channel_info(undefined, {ClientId, PSInfo0 = #{}}) ->
format_persistent_session_info(ClientId, PSInfo0).
format_persistent_session_info(ClientId, PSInfo0) ->
Metadata = maps:get(metadata, PSInfo0, #{}),
PSInfo1 = maps:with([created_at, expiry_interval], Metadata),
CreatedAt = maps:get(created_at, PSInfo1),
PSInfo2 = convert_expiry_interval_unit(PSInfo1),
PSInfo3 = PSInfo2#{
clientid => ClientId,
connected => false,
connected_at => CreatedAt,
ip_address => undefined,
is_persistent => true,
port => undefined
},
PSInfo = lists:foldl(
fun result_format_time_fun/2,
PSInfo3,
[created_at, connected_at]
),
result_format_undefined_to_null(PSInfo).
%% format func helpers
take_maps_from_inner(_Key, Value, Current) when is_map(Value) ->

View File

@ -18,10 +18,27 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
AllTCs = emqx_common_test_helpers:all(?MODULE),
[
{group, persistent_sessions}
| AllTCs -- persistent_session_testcases()
].
groups() ->
[{persistent_sessions, persistent_session_testcases()}].
persistent_session_testcases() ->
[
t_persistent_sessions1,
t_persistent_sessions2,
t_persistent_sessions3,
t_persistent_sessions4,
t_persistent_sessions5
].
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(),
@ -30,6 +47,33 @@ init_per_suite(Config) ->
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite().
init_per_group(persistent_sessions, Config) ->
AppSpecs = [
{emqx, "session_persistence.enable = true"},
emqx_management
],
Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(
"dashboard.listeners.http { enable = true, bind = 18084 }"
),
Cluster = [
{emqx_mgmt_api_clients_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}},
{emqx_mgmt_api_clients_SUITE2, #{role => core, apps => AppSpecs}}
],
Nodes = emqx_cth_cluster:start(
Cluster,
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{nodes, Nodes} | Config];
init_per_group(_Group, Config) ->
Config.
end_per_group(persistent_sessions, Config) ->
Nodes = ?config(nodes, Config),
emqx_cth_cluster:stop(Nodes),
ok;
end_per_group(_Group, _Config) ->
ok.
t_clients(_) ->
process_flag(trap_exit, true),
@ -171,6 +215,290 @@ t_clients(_) ->
AfterKickoutResponse1 = emqx_mgmt_api_test_util:request_api(get, Client1Path),
?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse1).
t_persistent_sessions1(Config) ->
[N1, _N2] = ?config(nodes, Config),
APIPort = 18084,
Port1 = get_mqtt_port(N1, tcp),
?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
?check_trace(
begin
%% Scenario 1
%% 1) Client connects and is listed as connected.
?tp(notice, "scenario 1", #{}),
O = #{api_port => APIPort},
ClientId = <<"c1">>,
C1 = connect_client(#{port => Port1, clientid => ClientId}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
%% 2) Client disconnects and is listed as disconnected.
ok = emqtt:disconnect(C1),
assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
%% 3) Client reconnects and is listed as connected.
C2 = connect_client(#{port => Port1, clientid => ClientId}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
%% 4) Client disconnects.
ok = emqtt:stop(C2),
%% 5) Session is GC'ed, client is removed from list.
?tp(notice, "gc", #{}),
%% simulate GC
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := []}}},
list_request(APIPort)
)
),
ok
end,
[]
),
ok.
t_persistent_sessions2(Config) ->
[N1, _N2] = ?config(nodes, Config),
APIPort = 18084,
Port1 = get_mqtt_port(N1, tcp),
?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
?check_trace(
begin
%% Scenario 2
%% 1) Client connects and is listed as connected.
?tp(notice, "scenario 2", #{}),
O = #{api_port => APIPort},
ClientId = <<"c2">>,
C1 = connect_client(#{port => Port1, clientid => ClientId}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
unlink(C1),
%% 2) Client connects to the same node and takes over, listed only once.
C2 = connect_client(#{port => Port1, clientid => ClientId}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
ok = emqtt:stop(C2),
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := []}}},
list_request(APIPort)
)
),
ok
end,
[]
),
ok.
t_persistent_sessions3(Config) ->
[N1, N2] = ?config(nodes, Config),
APIPort = 18084,
Port1 = get_mqtt_port(N1, tcp),
Port2 = get_mqtt_port(N2, tcp),
?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
?check_trace(
begin
%% Scenario 3
%% 1) Client connects and is listed as connected.
?tp(notice, "scenario 3", #{}),
O = #{api_port => APIPort},
ClientId = <<"c3">>,
C1 = connect_client(#{port => Port1, clientid => ClientId}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
unlink(C1),
%% 2) Client connects to *another node* and takes over, listed only once.
C2 = connect_client(#{port => Port2, clientid => ClientId}),
assert_single_client(O#{node => N2, clientid => ClientId, status => connected}),
%% Doesn't show up in the other node while alive
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := []}}},
list_request(APIPort, "node=" ++ atom_to_list(N1))
)
),
ok = emqtt:stop(C2),
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
ok
end,
[]
),
ok.
t_persistent_sessions4(Config) ->
[N1, N2] = ?config(nodes, Config),
APIPort = 18084,
Port1 = get_mqtt_port(N1, tcp),
Port2 = get_mqtt_port(N2, tcp),
?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
?check_trace(
begin
%% Scenario 4
%% 1) Client connects and is listed as connected.
?tp(notice, "scenario 4", #{}),
O = #{api_port => APIPort},
ClientId = <<"c4">>,
C1 = connect_client(#{port => Port1, clientid => ClientId}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
%% 2) Client disconnects and is listed as disconnected.
ok = emqtt:stop(C1),
%% While disconnected, shows up in both nodes.
assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
assert_single_client(O#{node => N2, clientid => ClientId, status => disconnected}),
%% 3) Client reconnects to *another node* and is listed as connected once.
C2 = connect_client(#{port => Port2, clientid => ClientId}),
assert_single_client(O#{node => N2, clientid => ClientId, status => connected}),
%% Doesn't show up in the other node while alive
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := []}}},
list_request(APIPort, "node=" ++ atom_to_list(N1))
)
),
ok = emqtt:stop(C2),
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
ok
end,
[]
),
ok.
t_persistent_sessions5(Config) ->
[N1, N2] = ?config(nodes, Config),
APIPort = 18084,
Port1 = get_mqtt_port(N1, tcp),
Port2 = get_mqtt_port(N2, tcp),
?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
?check_trace(
begin
%% Pagination with mixed clients
ClientId1 = <<"c5">>,
ClientId2 = <<"c6">>,
ClientId3 = <<"c7">>,
ClientId4 = <<"c8">>,
%% persistent
C1 = connect_client(#{port => Port1, clientid => ClientId1}),
C2 = connect_client(#{port => Port2, clientid => ClientId2}),
%% in-memory
C3 = connect_client(#{
port => Port1, clientid => ClientId3, expiry => 0, clean_start => true
}),
C4 = connect_client(#{
port => Port2, clientid => ClientId4, expiry => 0, clean_start => true
}),
P1 = list_request(APIPort, "limit=3&page=1"),
P2 = list_request(APIPort, "limit=3&page=2"),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"data">> := [_, _, _],
<<"meta">> := #{
%% TODO: if/when we fix the persistent session count, this
%% should be 4.
<<"count">> := 6,
<<"hasnext">> := true
}
}}},
P1
),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"data">> := [_],
<<"meta">> := #{
%% TODO: if/when we fix the persistent session count, this
%% should be 4.
<<"count">> := 6,
<<"hasnext">> := false
}
}}},
P2
),
{ok, {_, _, #{<<"data">> := R1}}} = P1,
{ok, {_, _, #{<<"data">> := R2}}} = P2,
?assertEqual(
lists:sort([ClientId1, ClientId2, ClientId3, ClientId4]),
lists:sort(lists:map(fun(#{<<"clientid">> := CId}) -> CId end, R1 ++ R2))
),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"data">> := [_, _],
<<"meta">> := #{
%% TODO: if/when we fix the persistent session count, this
%% should be 4.
<<"count">> := 6,
<<"hasnext">> := true
}
}}},
list_request(APIPort, "limit=2&page=1")
),
%% Disconnect persistent sessions
lists:foreach(fun emqtt:stop/1, [C1, C2]),
P3 =
?retry(200, 10, begin
P3_ = list_request(APIPort, "limit=3&page=1"),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"data">> := [_, _, _],
<<"meta">> := #{
<<"count">> := 4,
<<"hasnext">> := true
}
}}},
P3_
),
P3_
end),
P4 =
?retry(200, 10, begin
P4_ = list_request(APIPort, "limit=3&page=2"),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"data">> := [_],
<<"meta">> := #{
<<"count">> := 4,
<<"hasnext">> := false
}
}}},
P4_
),
P4_
end),
{ok, {_, _, #{<<"data">> := R3}}} = P3,
{ok, {_, _, #{<<"data">> := R4}}} = P4,
?assertEqual(
lists:sort([ClientId1, ClientId2, ClientId3, ClientId4]),
lists:sort(lists:map(fun(#{<<"clientid">> := CId}) -> CId end, R3 ++ R4))
),
lists:foreach(fun emqtt:stop/1, [C3, C4]),
ok
end,
[]
),
ok.
t_clients_bad_value_type(_) ->
%% get /clients
AuthHeader = [emqx_common_test_http:default_auth_header()],
@ -442,3 +770,111 @@ time_string_to_epoch(DateTime, Unit) when is_binary(DateTime) ->
binary_to_list(DateTime), [{unit, Unit}]
)
end.
get_mqtt_port(Node, Type) ->
{_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
Port.
request(Method, Path, Params) ->
request(Method, Path, Params, _QueryParams = "").
request(Method, Path, Params, QueryParams) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
case emqx_mgmt_api_test_util:request_api(Method, Path, QueryParams, AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} ->
Body = maybe_json_decode(Body0),
{ok, {Status, Headers, Body}};
{error, {Status, Headers, Body0}} ->
Body =
case emqx_utils_json:safe_decode(Body0, [return_maps]) of
{ok, Decoded0 = #{<<"message">> := Msg0}} ->
Msg = maybe_json_decode(Msg0),
Decoded0#{<<"message">> := Msg};
{ok, Decoded0} ->
Decoded0;
{error, _} ->
Body0
end,
{error, {Status, Headers, Body}};
Error ->
Error
end.
maybe_json_decode(X) ->
case emqx_utils_json:safe_decode(X, [return_maps]) of
{ok, Decoded} -> Decoded;
{error, _} -> X
end.
list_request(Port) ->
list_request(Port, _QueryParams = "").
list_request(Port, QueryParams) ->
Host = "http://127.0.0.1:" ++ integer_to_list(Port),
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]),
request(get, Path, [], QueryParams).
lookup_request(ClientId) ->
lookup_request(ClientId, 18083).
lookup_request(ClientId, Port) ->
Host = "http://127.0.0.1:" ++ integer_to_list(Port),
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]),
request(get, Path, []).
assert_single_client(Opts) ->
#{
api_port := APIPort,
clientid := ClientId,
node := Node,
status := Connected
} = Opts,
IsConnected =
case Connected of
connected -> true;
disconnected -> false
end,
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}},
list_request(APIPort)
)
),
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}},
list_request(APIPort, "node=" ++ atom_to_list(Node)),
#{node => Node}
)
),
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"connected">> := IsConnected}}},
lookup_request(ClientId, APIPort)
),
ok.
connect_client(Opts) ->
Defaults = #{
expiry => 30,
clean_start => false
},
#{
port := Port,
clientid := ClientId,
clean_start := CleanStart,
expiry := EI
} = maps:merge(Defaults, Opts),
{ok, C} = emqtt:start_link([
{port, Port},
{proto_ver, v5},
{clientid, ClientId},
{clean_start, CleanStart},
{properties, #{'Session-Expiry-Interval' => EI}}
]),
{ok, _} = emqtt:connect(C),
C.

View File

@ -50,6 +50,21 @@ set_special_configs(emqx_dashboard) ->
set_special_configs(_App) ->
ok.
-spec emqx_dashboard() -> emqx_cth_suite:appspec().
emqx_dashboard() ->
emqx_dashboard("dashboard.listeners.http { enable = true, bind = 18083 }").
emqx_dashboard(Config) ->
{emqx_dashboard, #{
config => Config,
before_start => fun() ->
{ok, _} = emqx_common_test_http:create_default_app()
end,
after_start => fun() ->
true = emqx_dashboard_listener:is_ready(infinity)
end
}}.
%% there is no difference between the 'request' and 'request_api'
%% the 'request' is only to be compatible with the 'emqx_dashboard_api_test_helpers:request'
request(Method, Url) ->

View File

@ -0,0 +1,3 @@
Now disconnected persistent sessions are returned in the `GET /clients` and `GET /client/:clientid` HTTP APIs.
Known issue: the total count returned by this API may overestimate the total number of clients.