diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index a6038bcb7..8cad67695 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -164,6 +164,14 @@ fields(limit) -> ]), Meta = #{in => query, desc => Desc, default => ?DEFAULT_ROW, example => 50}, [{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}]; +fields(cursor) -> + Desc = <<"Opaque value representing the current iteration state.">>, + Meta = #{default => none, in => query, desc => Desc}, + [{cursor, hoconsc:mk(hoconsc:union([none, binary()]), Meta)}]; +fields(cursor_response) -> + Desc = <<"Opaque value representing the current iteration state.">>, + Meta = #{desc => Desc, required => false}, + [{cursor, hoconsc:mk(binary(), Meta)}]; fields(count) -> Desc = << "Total number of records matching the query.
" @@ -197,6 +205,8 @@ fields(start) -> [{start, hoconsc:mk(hoconsc:union([none, binary()]), Meta)}]; fields(meta) -> fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext); +fields(meta_with_cursor) -> + fields(count) ++ fields(hasnext) ++ fields(cursor_response); fields(continuation_meta) -> fields(start) ++ fields(position). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 07f407430..9175f91ff 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -38,6 +38,7 @@ -export([ clients/2, + list_clients_v2/2, kickout_clients/2, client/2, subscriptions/2, @@ -63,6 +64,10 @@ %% for batch operation -export([do_subscribe/3]). +-ifdef(TEST). +-export([parse_cursor/2, serialize_cursor/1]). +-endif. + -define(TAGS, [<<"Clients">>]). -define(CLIENT_QSCHEMA, [ @@ -95,6 +100,14 @@ message => <<"Client connection has been shutdown">> }). +%% tags +-define(CURSOR_VSN1, 1). +-define(CURSOR_TYPE_ETS, 1). +-define(CURSOR_TYPE_DS, 2). +%% field keys +-define(CURSOR_ETS_NODE_IDX, 1). +-define(CURSOR_ETS_CONT, 2). + namespace() -> undefined. api_spec() -> @@ -103,6 +116,7 @@ api_spec() -> paths() -> [ "/clients", + "/clients_v2", "/clients/kickout/bulk", "/clients/:clientid", "/clients/:clientid/authorization/cache", @@ -117,115 +131,38 @@ paths() -> "/sessions_count" ]. +schema("/clients_v2") -> + #{ + 'operationId' => list_clients_v2, + get => #{ + security => [], + description => ?DESC(list_clients), + tags => ?TAGS, + parameters => fields(list_clients_v2_inputs), + responses => #{ + 200 => + emqx_dashboard_swagger:schema_with_example(?R_REF(list_clients_v2_response), #{ + <<"data">> => [client_example()], + <<"meta">> => #{ + <<"count">> => 1, + <<"cursor">> => <<"g2wAAAADYQFhAm0AAAACYzJq">>, + <<"hasnext">> => true + } + }), + 400 => + emqx_dashboard_swagger:error_codes( + ['INVALID_PARAMETER'], <<"Invalid parameters">> + ) + } + } + }; schema("/clients") -> #{ 'operationId' => clients, get => #{ description => ?DESC(list_clients), tags => ?TAGS, - parameters => [ - hoconsc:ref(emqx_dashboard_swagger, page), - hoconsc:ref(emqx_dashboard_swagger, limit), - {node, - hoconsc:mk(binary(), #{ - in => query, - required => false, - desc => <<"Node name">>, - example => <<"emqx@127.0.0.1">> - })}, - {username, - hoconsc:mk(hoconsc:array(binary()), #{ - in => query, - required => false, - desc => << - "User name, multiple values can be specified by" - " repeating the parameter: username=u1&username=u2" - >> - })}, - {ip_address, - hoconsc:mk(binary(), #{ - in => query, - required => false, - desc => <<"Client's IP address">>, - example => <<"127.0.0.1">> - })}, - {conn_state, - hoconsc:mk(hoconsc:enum([connected, idle, disconnected]), #{ - in => query, - required => false, - desc => - <<"The current connection status of the client, ", - "the possible values are connected,idle,disconnected">> - })}, - {clean_start, - hoconsc:mk(boolean(), #{ - in => query, - required => false, - description => <<"Whether the client uses a new session">> - })}, - {proto_ver, - hoconsc:mk(binary(), #{ - in => query, - required => false, - desc => <<"Client protocol version">> - })}, - {like_clientid, - hoconsc:mk(binary(), #{ - in => query, - required => false, - desc => <<"Fuzzy search `clientid` as substring">> - })}, - {like_username, - hoconsc:mk(binary(), #{ - in => query, - required => false, - desc => <<"Fuzzy search `username` as substring">> - })}, - {gte_created_at, - hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ - in => query, - required => false, - desc => - <<"Search client session creation time by greater", - " than or equal method, rfc3339 or timestamp(millisecond)">> - })}, - {lte_created_at, - hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ - in => query, - required => false, - desc => - <<"Search client session creation time by less", - " than or equal method, rfc3339 or timestamp(millisecond)">> - })}, - {gte_connected_at, - hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ - in => query, - required => false, - desc => << - "Search client connection creation time by greater" - " than or equal method, rfc3339 or timestamp(epoch millisecond)" - >> - })}, - {lte_connected_at, - hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ - in => query, - required => false, - desc => << - "Search client connection creation time by less" - " than or equal method, rfc3339 or timestamp(millisecond)" - >> - })}, - {clientid, - hoconsc:mk(hoconsc:array(binary()), #{ - in => query, - required => false, - desc => << - "Client ID, multiple values can be specified by" - " repeating the parameter: clientid=c1&clientid=c2" - >> - })}, - ?R_REF(requested_client_fields) - ], + parameters => fields(list_clients_v1_inputs), responses => #{ 200 => emqx_dashboard_swagger:schema_with_example(?R_REF(clients), #{ @@ -453,11 +390,129 @@ schema("/sessions_count") -> } }. +fields(list_clients_v2_inputs) -> + [ + hoconsc:ref(emqx_dashboard_swagger, cursor) + | fields(common_list_clients_input) + ]; +fields(list_clients_v1_inputs) -> + [ + hoconsc:ref(emqx_dashboard_swagger, page), + {node, + hoconsc:mk(binary(), #{ + in => query, + required => false, + desc => <<"Node name">>, + example => <<"emqx@127.0.0.1">> + })} + | fields(common_list_clients_input) + ]; +fields(common_list_clients_input) -> + [ + hoconsc:ref(emqx_dashboard_swagger, limit), + {username, + hoconsc:mk(hoconsc:array(binary()), #{ + in => query, + required => false, + desc => << + "User name, multiple values can be specified by" + " repeating the parameter: username=u1&username=u2" + >> + })}, + {ip_address, + hoconsc:mk(binary(), #{ + in => query, + required => false, + desc => <<"Client's IP address">>, + example => <<"127.0.0.1">> + })}, + {conn_state, + hoconsc:mk(hoconsc:enum([connected, idle, disconnected]), #{ + in => query, + required => false, + desc => + <<"The current connection status of the client, ", + "the possible values are connected,idle,disconnected">> + })}, + {clean_start, + hoconsc:mk(boolean(), #{ + in => query, + required => false, + description => <<"Whether the client uses a new session">> + })}, + {proto_ver, + hoconsc:mk(binary(), #{ + in => query, + required => false, + desc => <<"Client protocol version">> + })}, + {like_clientid, + hoconsc:mk(binary(), #{ + in => query, + required => false, + desc => <<"Fuzzy search `clientid` as substring">> + })}, + {like_username, + hoconsc:mk(binary(), #{ + in => query, + required => false, + desc => <<"Fuzzy search `username` as substring">> + })}, + {gte_created_at, + hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ + in => query, + required => false, + desc => + <<"Search client session creation time by greater", + " than or equal method, rfc3339 or timestamp(millisecond)">> + })}, + {lte_created_at, + hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ + in => query, + required => false, + desc => + <<"Search client session creation time by less", + " than or equal method, rfc3339 or timestamp(millisecond)">> + })}, + {gte_connected_at, + hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ + in => query, + required => false, + desc => << + "Search client connection creation time by greater" + " than or equal method, rfc3339 or timestamp(epoch millisecond)" + >> + })}, + {lte_connected_at, + hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{ + in => query, + required => false, + desc => << + "Search client connection creation time by less" + " than or equal method, rfc3339 or timestamp(millisecond)" + >> + })}, + {clientid, + hoconsc:mk(hoconsc:array(binary()), #{ + in => query, + required => false, + desc => << + "Client ID, multiple values can be specified by" + " repeating the parameter: clientid=c1&clientid=c2" + >> + })}, + ?R_REF(requested_client_fields) + ]; fields(clients) -> [ {data, hoconsc:mk(hoconsc:array(?REF(client)), #{})}, {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta), #{})} ]; +fields(list_clients_v2_response) -> + [ + {data, hoconsc:mk(hoconsc:array(?REF(client)), #{})}, + {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta_with_cursor), #{})} + ]; fields(client) -> [ {awaiting_rel_cnt, @@ -890,6 +945,218 @@ list_clients(QString) -> {200, Response} end. +list_clients_v2(get, #{query_string := QString0}) -> + Nodes = emqx:running_nodes(), + case maps:get(<<"cursor">>, QString0, none) of + none -> + Cursor = initial_ets_cursor(Nodes), + do_list_clients_v2(Nodes, Cursor, QString0); + CursorBin when is_binary(CursorBin) -> + case parse_cursor(CursorBin, Nodes) of + {ok, Cursor} -> + do_list_clients_v2(Nodes, Cursor, QString0); + {error, bad_cursor} -> + ?BAD_REQUEST(<<"bad cursor">>) + end + end. + +do_list_clients_v2(Nodes, Cursor, QString0) -> + Limit = maps:get(<<"limit">>, QString0, 100), + Acc = #{ + rows => [], + n => 0, + limit => Limit + }, + do_list_clients_v2(Nodes, Cursor, QString0, Acc). + +do_list_clients_v2(_Nodes, Cursor = done, _QString, Acc) -> + format_results(Acc, Cursor); +do_list_clients_v2(Nodes, Cursor = #{type := ?CURSOR_TYPE_ETS, node := Node}, QString0, Acc0) -> + {Rows, NewCursor} = do_ets_select(Nodes, QString0, Cursor), + Acc1 = maps:update_with(rows, fun(Rs) -> [{Node, Rows} | Rs] end, Acc0), + Acc = #{limit := Limit, n := N} = maps:update_with(n, fun(N) -> N + length(Rows) end, Acc1), + case N >= Limit of + true -> + format_results(Acc, NewCursor); + false -> + do_list_clients_v2(Nodes, NewCursor, QString0, Acc) + end; +do_list_clients_v2(Nodes, _Cursor = #{type := ?CURSOR_TYPE_DS, iterator := Iter0}, QString0, Acc0) -> + #{limit := Limit} = Acc0, + {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), + NewCursor = next_ds_cursor(Iter), + Rows1 = drop_live_and_expired(Rows0), + Rows = maybe_run_fuzzy_filter(Rows1, QString0), + Acc1 = maps:update_with(rows, fun(Rs) -> [{undefined, Rows} | Rs] end, Acc0), + Acc = #{n := N} = maps:update_with(n, fun(N) -> N + length(Rows) end, Acc1), + case N >= Limit of + true -> + format_results(Acc, NewCursor); + false -> + do_list_clients_v2(Nodes, NewCursor, QString0, Acc) + end. + +format_results(Acc, Cursor) -> + #{ + rows := NodeRows, + n := N + } = Acc, + Meta = + case Cursor of + done -> + #{ + hasnext => false, + count => N + }; + _ -> + #{ + hasnext => true, + count => N, + cursor => serialize_cursor(Cursor) + } + end, + Resp = #{ + meta => Meta, + data => [ + format_channel_info(Node, Row) + || {Node, Rows} <- NodeRows, + Row <- Rows + ] + }, + ?OK(Resp). + +do_ets_select(Nodes, QString0, #{node := Node, node_idx := NodeIdx, cont := Cont} = _Cursor) -> + {_, QString1} = emqx_mgmt_api:parse_qstring(QString0, ?CLIENT_QSCHEMA), + Limit = maps:get(<<"limit">>, QString0, 10), + {Rows, #{cont := NewCont, node_idx := NewNodeIdx}} = ets_select( + QString1, Limit, Node, NodeIdx, Cont + ), + {Rows, next_ets_cursor(Nodes, NewNodeIdx, NewCont)}. + +maybe_run_fuzzy_filter(Rows, QString0) -> + {_, {_, FuzzyQString}} = emqx_mgmt_api:parse_qstring(QString0, ?CLIENT_QSCHEMA), + FuzzyFilterFn = fuzzy_filter_fun(FuzzyQString), + case FuzzyFilterFn of + undefined -> + Rows; + {Fn, Args} -> + lists:filter( + fun(E) -> erlang:apply(Fn, [E | Args]) end, + Rows + ) + end. + +initial_ets_cursor([Node | _Rest] = _Nodes) -> + #{ + type => ?CURSOR_TYPE_ETS, + node => Node, + node_idx => 1, + cont => undefined + }. + +initial_ds_cursor() -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + #{ + type => ?CURSOR_TYPE_DS, + iterator => init_persistent_session_iterator() + }; + false -> + done + end. + +next_ets_cursor(Nodes, NodeIdx, Cont) -> + case NodeIdx > length(Nodes) of + true -> + initial_ds_cursor(); + false -> + Node = lists:nth(NodeIdx, Nodes), + #{ + type => ?CURSOR_TYPE_ETS, + node_idx => NodeIdx, + node => Node, + cont => Cont + } + end. + +next_ds_cursor('$end_of_table') -> + done; +next_ds_cursor(Iter) -> + #{ + type => ?CURSOR_TYPE_DS, + iterator => Iter + }. + +parse_cursor(CursorBin, Nodes) -> + try base64:decode(CursorBin, #{mode => urlsafe, padding => false}) of + Bin -> + parse_cursor1(Bin, Nodes) + catch + _:_ -> + {error, bad_cursor} + end. + +parse_cursor1(CursorBin, Nodes) -> + try binary_to_term(CursorBin, [safe]) of + [ + ?CURSOR_VSN1, + ?CURSOR_TYPE_ETS, + #{?CURSOR_ETS_NODE_IDX := NodeIdx, ?CURSOR_ETS_CONT := Cont} + ] -> + case NodeIdx > length(Nodes) of + true -> + {error, bad_cursor}; + false -> + Node = lists:nth(NodeIdx, Nodes), + Cursor = #{ + type => ?CURSOR_TYPE_ETS, + node => Node, + node_idx => NodeIdx, + cont => Cont + }, + {ok, Cursor} + end; + [?CURSOR_VSN1, ?CURSOR_TYPE_DS, DSIter] -> + Cursor = #{type => ?CURSOR_TYPE_DS, iterator => DSIter}, + {ok, Cursor}; + _ -> + {error, bad_cursor} + catch + error:badarg -> + {error, bad_cursor} + end. + +serialize_cursor(#{type := ?CURSOR_TYPE_ETS, node_idx := NodeIdx, cont := Cont}) -> + Cursor0 = [ + ?CURSOR_VSN1, + ?CURSOR_TYPE_ETS, + #{?CURSOR_ETS_NODE_IDX => NodeIdx, ?CURSOR_ETS_CONT => Cont} + ], + Bin = term_to_binary(Cursor0, [{compressed, 9}]), + base64:encode(Bin, #{mode => urlsafe, padding => false}); +serialize_cursor(#{type := ?CURSOR_TYPE_DS, iterator := Iter}) -> + Cursor0 = [?CURSOR_VSN1, ?CURSOR_TYPE_DS, Iter], + Bin = term_to_binary(Cursor0, [{compressed, 9}]), + base64:encode(Bin, #{mode => urlsafe, padding => false}). + +%% An adapter function so we can reutilize all the logic in `emqx_mgmt_api' for +%% selecting/fuzzy filters, and also reutilize its BPAPI for selecting rows. +ets_select(NQString, Limit, Node, NodeIdx, Cont) -> + QueryState0 = emqx_mgmt_api:init_query_state( + ?CHAN_INFO_TAB, + NQString, + fun ?MODULE:qs2ms/2, + _Meta = #{page => unused, limit => Limit}, + _Options = #{} + ), + QueryState = QueryState0#{continuation => Cont}, + case emqx_mgmt_api:do_query(Node, QueryState) of + {Rows, #{complete := true}} -> + {Rows, #{node_idx => NodeIdx + 1, cont => undefined}}; + {Rows, #{continuation := NCont}} -> + {Rows, #{node_idx => NodeIdx, cont => NCont}} + end. + lookup(#{clientid := ClientID}) -> case emqx_mgmt:lookup_client({clientid, ClientID}, ?FORMAT_FUN) of [] -> @@ -1410,13 +1677,25 @@ fuzzy_filter_fun(Fuzzy) -> run_fuzzy_filter(_, []) -> true; -run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | Fuzzy]) -> +run_fuzzy_filter( + Row = {_, #{metadata := #{clientinfo := ClientInfo}}}, + [{Key, like, SubStr} | RestArgs] +) -> + %% Row from DS + run_fuzzy_filter1(ClientInfo, Key, SubStr) andalso + run_fuzzy_filter(Row, RestArgs); +run_fuzzy_filter(Row = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | RestArgs]) -> + %% Row from ETS + run_fuzzy_filter1(ClientInfo, Key, SubStr) andalso + run_fuzzy_filter(Row, RestArgs). + +run_fuzzy_filter1(ClientInfo, Key, SubStr) -> Val = case maps:get(Key, ClientInfo, <<>>) of undefined -> <<>>; V -> V end, - binary:match(Val, SubStr) /= nomatch andalso run_fuzzy_filter(E, Fuzzy). + binary:match(Val, SubStr) /= nomatch. %%-------------------------------------------------------------------- %% format funcs diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 2f4804158..ebda34bc2 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -19,8 +19,9 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_router.hrl"). --include_lib("eunit/include/eunit.hrl"). +-include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/asserts.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -47,7 +48,8 @@ persistent_session_testcases() -> t_persistent_sessions2, t_persistent_sessions3, t_persistent_sessions4, - t_persistent_sessions5 + t_persistent_sessions5, + t_list_clients_v2 ]. client_msgs_testcases() -> [ @@ -56,11 +58,23 @@ client_msgs_testcases() -> ]. init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite(), - Config. + ok = snabbkaffe:start_trace(), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), + [{apps, Apps} | Config]. -end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite(). +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), + ok. init_per_group(persistent_sessions, Config) -> AppSpecs = [ @@ -109,9 +123,12 @@ end_per_testcase(TC, _Config) when ?LINE, fun() -> [] =:= emqx_cm:lookup_channels(local, ClientId) end, 5000 - ); + ), + ok = snabbkaffe:stop(), + ok; end_per_testcase(_TC, _Config) -> - ok = snabbkaffe:stop(). + ok = snabbkaffe:stop(), + ok. t_clients(_) -> process_flag(trap_exit, true), @@ -522,6 +539,12 @@ t_persistent_sessions5(Config) -> ), lists:foreach(fun emqtt:stop/1, [C3, C4]), + lists:foreach( + fun(ClientId) -> + ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]) + end, + [ClientId1, ClientId2, ClientId3, ClientId4] + ), ok end, @@ -1415,6 +1438,319 @@ t_subscribe_shared_topic_nl(_Config) -> PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1}) ). +t_list_clients_v2(Config) -> + [N1, N2] = ?config(nodes, Config), + APIPort = 18084, + Port1 = get_mqtt_port(N1, tcp), + Port2 = get_mqtt_port(N2, tcp), + + ?check_trace( + begin + ClientId1 = <<"ca1">>, + ClientId2 = <<"c2">>, + ClientId3 = <<"c3">>, + ClientId4 = <<"ca4">>, + ClientId5 = <<"ca5">>, + ClientId6 = <<"c6">>, + AllClientIds = [ + ClientId1, + ClientId2, + ClientId3, + ClientId4, + ClientId5, + ClientId6 + ], + C1 = connect_client(#{port => Port1, clientid => ClientId1, clean_start => true}), + C2 = connect_client(#{port => Port2, clientid => ClientId2, clean_start => true}), + C3 = connect_client(#{port => Port1, clientid => ClientId3, clean_start => true}), + C4 = connect_client(#{port => Port2, clientid => ClientId4, clean_start => true}), + %% in-memory clients + C5 = connect_client(#{ + port => Port1, clientid => ClientId5, expiry => 0, clean_start => true + }), + C6 = connect_client(#{ + port => Port2, clientid => ClientId6, expiry => 0, clean_start => true + }), + %% offline persistent clients + ok = emqtt:stop(C3), + ok = emqtt:stop(C4), + + %% one by one + QueryParams1 = #{limit => "1"}, + Res1 = list_all_v2(APIPort, QueryParams1), + ?assertMatch( + [ + #{ + <<"data">> := [_], + <<"meta">> := + #{ + <<"hasnext">> := true, + <<"count">> := 1, + <<"cursor">> := _ + } + }, + #{ + <<"data">> := [_], + <<"meta">> := + #{ + <<"hasnext">> := true, + <<"count">> := 1, + <<"cursor">> := _ + } + }, + #{ + <<"data">> := [_], + <<"meta">> := + #{ + <<"hasnext">> := true, + <<"count">> := 1, + <<"cursor">> := _ + } + }, + #{ + <<"data">> := [_], + <<"meta">> := + #{ + <<"hasnext">> := true, + <<"count">> := 1, + <<"cursor">> := _ + } + }, + #{ + <<"data">> := [_], + <<"meta">> := + #{ + <<"hasnext">> := true, + <<"count">> := 1, + <<"cursor">> := _ + } + }, + #{ + <<"data">> := [_], + <<"meta">> := + #{ + <<"hasnext">> := false, + <<"count">> := 1 + } + } + ], + Res1 + ), + assert_contains_clientids(Res1, AllClientIds), + + %% Reusing the same cursors yield the same pages + traverse_in_reverse_v2(APIPort, QueryParams1, Res1), + + %% paging + QueryParams2 = #{limit => "4"}, + Res2 = list_all_v2(APIPort, QueryParams2), + ?assertMatch( + [ + #{ + <<"data">> := [_, _, _, _], + <<"meta">> := + #{ + <<"hasnext">> := true, + <<"count">> := 4, + <<"cursor">> := _ + } + }, + #{ + <<"data">> := [_, _], + <<"meta">> := + #{ + <<"hasnext">> := false, + <<"count">> := 2 + } + } + ], + Res2 + ), + assert_contains_clientids(Res2, AllClientIds), + traverse_in_reverse_v2(APIPort, QueryParams2, Res2), + + QueryParams3 = #{limit => "2"}, + Res3 = list_all_v2(APIPort, QueryParams3), + ?assertMatch( + [ + #{ + <<"data">> := [_, _], + <<"meta">> := + #{ + <<"hasnext">> := true, + <<"count">> := 2, + <<"cursor">> := _ + } + }, + #{ + <<"data">> := [_, _], + <<"meta">> := + #{ + <<"hasnext">> := true, + <<"count">> := 2, + <<"cursor">> := _ + } + }, + #{ + <<"data">> := [_, _], + <<"meta">> := + #{ + <<"hasnext">> := false, + <<"count">> := 2 + } + } + ], + Res3 + ), + assert_contains_clientids(Res3, AllClientIds), + traverse_in_reverse_v2(APIPort, QueryParams3, Res3), + + %% fuzzy filters + QueryParams4 = #{limit => "100", like_clientid => "ca"}, + Res4 = list_all_v2(APIPort, QueryParams4), + ?assertMatch( + [ + #{ + <<"data">> := [_, _, _], + <<"meta">> := + #{ + <<"hasnext">> := false, + <<"count">> := 3 + } + } + ], + Res4 + ), + assert_contains_clientids(Res4, [ClientId1, ClientId4, ClientId5]), + traverse_in_reverse_v2(APIPort, QueryParams4, Res4), + QueryParams5 = #{limit => "1", like_clientid => "ca"}, + Res5 = list_all_v2(APIPort, QueryParams5), + ?assertMatch( + [ + #{ + <<"data">> := [_], + <<"meta">> := + #{ + <<"hasnext">> := true, + <<"count">> := 1, + <<"cursor">> := _ + } + }, + #{ + <<"data">> := [_], + <<"meta">> := + #{ + <<"hasnext">> := true, + <<"count">> := 1, + <<"cursor">> := _ + } + }, + #{ + <<"data">> := [_], + <<"meta">> := + #{ + <<"hasnext">> := false, + <<"count">> := 1 + } + } + ], + Res5 + ), + assert_contains_clientids(Res5, [ClientId1, ClientId4, ClientId5]), + traverse_in_reverse_v2(APIPort, QueryParams5, Res5), + + lists:foreach( + fun(C) -> + {_, {ok, _}} = + ?wait_async_action( + emqtt:stop(C), + #{?snk_kind := emqx_cm_clean_down} + ) + end, + [C1, C2, C5, C6] + ), + + %% Verify that a malicious cursor that could generate an atom on the node is + %% rejected + EvilAtomBin0 = <<131, 100, 0, 5, "some_atom_that_doesnt_exist_on_the_remote_node">>, + EvilAtomBin = base64:encode(EvilAtomBin0, #{mode => urlsafe, padding => false}), + + ?assertMatch( + {error, {{_, 400, _}, _, #{<<"message">> := <<"bad cursor">>}}}, + list_v2_request(APIPort, #{limit => "1", cursor => EvilAtomBin}) + ), + %% Verify that the atom was not created + erpc:call(N1, fun() -> + ?assertError(badarg, binary_to_term(EvilAtomBin0, [safe])) + end), + ?assert(is_atom(binary_to_term(EvilAtomBin0))), + + lists:foreach( + fun(ClientId) -> + ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]) + end, + AllClientIds + ), + + ok + end, + [] + ), + ok. + +t_cursor_serde_prop(_Config) -> + ?assert(proper:quickcheck(cursor_serde_prop(), [{numtests, 100}, {to_file, user}])). + +cursor_serde_prop() -> + ?FORALL( + NumNodes, + range(1, 10), + ?FORALL( + Cursor, + list_clients_cursor_gen(NumNodes), + begin + Nodes = lists:seq(1, NumNodes), + Bin = emqx_mgmt_api_clients:serialize_cursor(Cursor), + Res = emqx_mgmt_api_clients:parse_cursor(Bin, Nodes), + ?WHENFAIL( + ct:pal("original:\n ~p\nroundtrip:\n ~p", [Cursor, Res]), + {ok, Cursor} =:= Res + ) + end + ) + ). + +list_clients_cursor_gen(NumNodes) -> + oneof([ + lists_clients_ets_cursor_gen(NumNodes), + lists_clients_ds_cursor_gen() + ]). + +-define(CURSOR_TYPE_ETS, 1). +-define(CURSOR_TYPE_DS, 2). + +lists_clients_ets_cursor_gen(NumNodes) -> + ?LET( + {NodeIdx, Cont}, + {range(1, NumNodes), oneof([undefined, tuple()])}, + #{ + type => ?CURSOR_TYPE_ETS, + node => NodeIdx, + node_idx => NodeIdx, + cont => Cont + } + ). + +lists_clients_ds_cursor_gen() -> + ?LET( + Iter, + oneof(['$end_of_table', list(term())]), + #{ + type => ?CURSOR_TYPE_DS, + iterator => Iter + } + ). + time_string_to_epoch_millisecond(DateTime) -> time_string_to_epoch(DateTime, millisecond). @@ -1472,6 +1808,31 @@ list_request(Port, QueryParams) -> Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]), request(get, Path, [], QueryParams). +list_v2_request(Port, QueryParams = #{}) -> + Host = "http://127.0.0.1:" ++ integer_to_list(Port), + Path = emqx_mgmt_api_test_util:api_path(Host, ["clients_v2"]), + QS = uri_string:compose_query(maps:to_list(emqx_utils_maps:binary_key_map(QueryParams))), + request(get, Path, [], QS). + +list_all_v2(Port, QueryParams = #{}) -> + do_list_all_v2(Port, QueryParams, _Acc = []). + +do_list_all_v2(Port, QueryParams, Acc) -> + case list_v2_request(Port, QueryParams) of + {ok, {{_, 200, _}, _, Resp = #{<<"meta">> := #{<<"cursor">> := Cursor}}}} -> + do_list_all_v2(Port, QueryParams#{cursor => Cursor}, [Resp | Acc]); + {ok, {{_, 200, _}, _, Resp = #{<<"meta">> := #{<<"hasnext">> := false}}}} -> + lists:reverse([Resp | Acc]); + Other -> + error( + {unexpected_response, #{ + acc_so_far => Acc, + response => Other, + query_params => QueryParams + }} + ) + end. + lookup_request(ClientId) -> lookup_request(ClientId, 18083). @@ -1535,3 +1896,44 @@ connect_client(Opts) -> ]), {ok, _} = emqtt:connect(C), C. + +assert_contains_clientids(Results, ExpectedClientIds) -> + ContainedClientIds = [ + ClientId + || #{<<"data">> := Rows} <- Results, + #{<<"clientid">> := ClientId} <- Rows + ], + ?assertEqual( + lists:sort(ExpectedClientIds), + lists:sort(ContainedClientIds), + #{results => Results} + ). + +traverse_in_reverse_v2(APIPort, QueryParams0, Results) -> + Cursors0 = + lists:map( + fun(#{<<"meta">> := Meta}) -> + maps:get(<<"cursor">>, Meta, <<"wontbeused">>) + end, + Results + ), + Cursors1 = [<<"none">> | lists:droplast(Cursors0)], + DirectOrderClientIds = [ + ClientId + || #{<<"data">> := Rows} <- Results, + #{<<"clientid">> := ClientId} <- Rows + ], + ReverseCursors = lists:reverse(Cursors1), + do_traverse_in_reverse_v2( + APIPort, QueryParams0, ReverseCursors, DirectOrderClientIds, _Acc = [] + ). + +do_traverse_in_reverse_v2(_APIPort, _QueryParams0, _Cursors = [], DirectOrderClientIds, Acc) -> + ?assertEqual(DirectOrderClientIds, Acc); +do_traverse_in_reverse_v2(APIPort, QueryParams0, [Cursor | Rest], DirectOrderClientIds, Acc) -> + QueryParams = QueryParams0#{cursor => Cursor}, + Res0 = list_v2_request(APIPort, QueryParams), + ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := _}}}, Res0), + {ok, {{_, 200, _}, _, #{<<"data">> := Rows}}} = Res0, + ClientIds = [ClientId || #{<<"clientid">> := ClientId} <- Rows], + do_traverse_in_reverse_v2(APIPort, QueryParams0, Rest, DirectOrderClientIds, ClientIds ++ Acc). diff --git a/changes/ce/feat-12798.en.md b/changes/ce/feat-12798.en.md new file mode 100644 index 000000000..a3b46f5e6 --- /dev/null +++ b/changes/ce/feat-12798.en.md @@ -0,0 +1 @@ +Added new `GET /api/v5/clients_v2` API that uses cursors instead of page numbers for pagination. This should be more efficient than the old API endpoint, which currently traverses tables multiple times.