Merge pull request #12798 from thalesmg/ds-client-api-v2-m-20240327

feat(client mgmt api): add cursor-based list API
This commit is contained in:
Thales Macedo Garitezi 2024-04-04 15:10:49 -03:00 committed by GitHub
commit 217b35bce5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 805 additions and 113 deletions

View File

@ -164,6 +164,14 @@ fields(limit) ->
]), ]),
Meta = #{in => query, desc => Desc, default => ?DEFAULT_ROW, example => 50}, Meta = #{in => query, desc => Desc, default => ?DEFAULT_ROW, example => 50},
[{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}]; [{limit, hoconsc:mk(range(1, ?MAX_ROW_LIMIT), Meta)}];
fields(cursor) ->
Desc = <<"Opaque value representing the current iteration state.">>,
Meta = #{default => none, in => query, desc => Desc},
[{cursor, hoconsc:mk(hoconsc:union([none, binary()]), Meta)}];
fields(cursor_response) ->
Desc = <<"Opaque value representing the current iteration state.">>,
Meta = #{desc => Desc, required => false},
[{cursor, hoconsc:mk(binary(), Meta)}];
fields(count) -> fields(count) ->
Desc = << Desc = <<
"Total number of records matching the query.<br/>" "Total number of records matching the query.<br/>"
@ -197,6 +205,8 @@ fields(start) ->
[{start, hoconsc:mk(hoconsc:union([none, binary()]), Meta)}]; [{start, hoconsc:mk(hoconsc:union([none, binary()]), Meta)}];
fields(meta) -> fields(meta) ->
fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext); fields(page) ++ fields(limit) ++ fields(count) ++ fields(hasnext);
fields(meta_with_cursor) ->
fields(count) ++ fields(hasnext) ++ fields(cursor_response);
fields(continuation_meta) -> fields(continuation_meta) ->
fields(start) ++ fields(position). fields(start) ++ fields(position).

View File

@ -38,6 +38,7 @@
-export([ -export([
clients/2, clients/2,
list_clients_v2/2,
kickout_clients/2, kickout_clients/2,
client/2, client/2,
subscriptions/2, subscriptions/2,
@ -63,6 +64,10 @@
%% for batch operation %% for batch operation
-export([do_subscribe/3]). -export([do_subscribe/3]).
-ifdef(TEST).
-export([parse_cursor/2, serialize_cursor/1]).
-endif.
-define(TAGS, [<<"Clients">>]). -define(TAGS, [<<"Clients">>]).
-define(CLIENT_QSCHEMA, [ -define(CLIENT_QSCHEMA, [
@ -95,6 +100,14 @@
message => <<"Client connection has been shutdown">> message => <<"Client connection has been shutdown">>
}). }).
%% tags
-define(CURSOR_VSN1, 1).
-define(CURSOR_TYPE_ETS, 1).
-define(CURSOR_TYPE_DS, 2).
%% field keys
-define(CURSOR_ETS_NODE_IDX, 1).
-define(CURSOR_ETS_CONT, 2).
namespace() -> undefined. namespace() -> undefined.
api_spec() -> api_spec() ->
@ -103,6 +116,7 @@ api_spec() ->
paths() -> paths() ->
[ [
"/clients", "/clients",
"/clients_v2",
"/clients/kickout/bulk", "/clients/kickout/bulk",
"/clients/:clientid", "/clients/:clientid",
"/clients/:clientid/authorization/cache", "/clients/:clientid/authorization/cache",
@ -117,115 +131,38 @@ paths() ->
"/sessions_count" "/sessions_count"
]. ].
schema("/clients_v2") ->
#{
'operationId' => list_clients_v2,
get => #{
security => [],
description => ?DESC(list_clients),
tags => ?TAGS,
parameters => fields(list_clients_v2_inputs),
responses => #{
200 =>
emqx_dashboard_swagger:schema_with_example(?R_REF(list_clients_v2_response), #{
<<"data">> => [client_example()],
<<"meta">> => #{
<<"count">> => 1,
<<"cursor">> => <<"g2wAAAADYQFhAm0AAAACYzJq">>,
<<"hasnext">> => true
}
}),
400 =>
emqx_dashboard_swagger:error_codes(
['INVALID_PARAMETER'], <<"Invalid parameters">>
)
}
}
};
schema("/clients") -> schema("/clients") ->
#{ #{
'operationId' => clients, 'operationId' => clients,
get => #{ get => #{
description => ?DESC(list_clients), description => ?DESC(list_clients),
tags => ?TAGS, tags => ?TAGS,
parameters => [ parameters => fields(list_clients_v1_inputs),
hoconsc:ref(emqx_dashboard_swagger, page),
hoconsc:ref(emqx_dashboard_swagger, limit),
{node,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Node name">>,
example => <<"emqx@127.0.0.1">>
})},
{username,
hoconsc:mk(hoconsc:array(binary()), #{
in => query,
required => false,
desc => <<
"User name, multiple values can be specified by"
" repeating the parameter: username=u1&username=u2"
>>
})},
{ip_address,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Client's IP address">>,
example => <<"127.0.0.1">>
})},
{conn_state,
hoconsc:mk(hoconsc:enum([connected, idle, disconnected]), #{
in => query,
required => false,
desc =>
<<"The current connection status of the client, ",
"the possible values are connected,idle,disconnected">>
})},
{clean_start,
hoconsc:mk(boolean(), #{
in => query,
required => false,
description => <<"Whether the client uses a new session">>
})},
{proto_ver,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Client protocol version">>
})},
{like_clientid,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Fuzzy search `clientid` as substring">>
})},
{like_username,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Fuzzy search `username` as substring">>
})},
{gte_created_at,
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc =>
<<"Search client session creation time by greater",
" than or equal method, rfc3339 or timestamp(millisecond)">>
})},
{lte_created_at,
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc =>
<<"Search client session creation time by less",
" than or equal method, rfc3339 or timestamp(millisecond)">>
})},
{gte_connected_at,
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc => <<
"Search client connection creation time by greater"
" than or equal method, rfc3339 or timestamp(epoch millisecond)"
>>
})},
{lte_connected_at,
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc => <<
"Search client connection creation time by less"
" than or equal method, rfc3339 or timestamp(millisecond)"
>>
})},
{clientid,
hoconsc:mk(hoconsc:array(binary()), #{
in => query,
required => false,
desc => <<
"Client ID, multiple values can be specified by"
" repeating the parameter: clientid=c1&clientid=c2"
>>
})},
?R_REF(requested_client_fields)
],
responses => #{ responses => #{
200 => 200 =>
emqx_dashboard_swagger:schema_with_example(?R_REF(clients), #{ emqx_dashboard_swagger:schema_with_example(?R_REF(clients), #{
@ -453,11 +390,129 @@ schema("/sessions_count") ->
} }
}. }.
fields(list_clients_v2_inputs) ->
[
hoconsc:ref(emqx_dashboard_swagger, cursor)
| fields(common_list_clients_input)
];
fields(list_clients_v1_inputs) ->
[
hoconsc:ref(emqx_dashboard_swagger, page),
{node,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Node name">>,
example => <<"emqx@127.0.0.1">>
})}
| fields(common_list_clients_input)
];
fields(common_list_clients_input) ->
[
hoconsc:ref(emqx_dashboard_swagger, limit),
{username,
hoconsc:mk(hoconsc:array(binary()), #{
in => query,
required => false,
desc => <<
"User name, multiple values can be specified by"
" repeating the parameter: username=u1&username=u2"
>>
})},
{ip_address,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Client's IP address">>,
example => <<"127.0.0.1">>
})},
{conn_state,
hoconsc:mk(hoconsc:enum([connected, idle, disconnected]), #{
in => query,
required => false,
desc =>
<<"The current connection status of the client, ",
"the possible values are connected,idle,disconnected">>
})},
{clean_start,
hoconsc:mk(boolean(), #{
in => query,
required => false,
description => <<"Whether the client uses a new session">>
})},
{proto_ver,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Client protocol version">>
})},
{like_clientid,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Fuzzy search `clientid` as substring">>
})},
{like_username,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Fuzzy search `username` as substring">>
})},
{gte_created_at,
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc =>
<<"Search client session creation time by greater",
" than or equal method, rfc3339 or timestamp(millisecond)">>
})},
{lte_created_at,
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc =>
<<"Search client session creation time by less",
" than or equal method, rfc3339 or timestamp(millisecond)">>
})},
{gte_connected_at,
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc => <<
"Search client connection creation time by greater"
" than or equal method, rfc3339 or timestamp(epoch millisecond)"
>>
})},
{lte_connected_at,
hoconsc:mk(emqx_utils_calendar:epoch_millisecond(), #{
in => query,
required => false,
desc => <<
"Search client connection creation time by less"
" than or equal method, rfc3339 or timestamp(millisecond)"
>>
})},
{clientid,
hoconsc:mk(hoconsc:array(binary()), #{
in => query,
required => false,
desc => <<
"Client ID, multiple values can be specified by"
" repeating the parameter: clientid=c1&clientid=c2"
>>
})},
?R_REF(requested_client_fields)
];
fields(clients) -> fields(clients) ->
[ [
{data, hoconsc:mk(hoconsc:array(?REF(client)), #{})}, {data, hoconsc:mk(hoconsc:array(?REF(client)), #{})},
{meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta), #{})} {meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta), #{})}
]; ];
fields(list_clients_v2_response) ->
[
{data, hoconsc:mk(hoconsc:array(?REF(client)), #{})},
{meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta_with_cursor), #{})}
];
fields(client) -> fields(client) ->
[ [
{awaiting_rel_cnt, {awaiting_rel_cnt,
@ -890,6 +945,218 @@ list_clients(QString) ->
{200, Response} {200, Response}
end. end.
list_clients_v2(get, #{query_string := QString0}) ->
Nodes = emqx:running_nodes(),
case maps:get(<<"cursor">>, QString0, none) of
none ->
Cursor = initial_ets_cursor(Nodes),
do_list_clients_v2(Nodes, Cursor, QString0);
CursorBin when is_binary(CursorBin) ->
case parse_cursor(CursorBin, Nodes) of
{ok, Cursor} ->
do_list_clients_v2(Nodes, Cursor, QString0);
{error, bad_cursor} ->
?BAD_REQUEST(<<"bad cursor">>)
end
end.
do_list_clients_v2(Nodes, Cursor, QString0) ->
Limit = maps:get(<<"limit">>, QString0, 100),
Acc = #{
rows => [],
n => 0,
limit => Limit
},
do_list_clients_v2(Nodes, Cursor, QString0, Acc).
do_list_clients_v2(_Nodes, Cursor = done, _QString, Acc) ->
format_results(Acc, Cursor);
do_list_clients_v2(Nodes, Cursor = #{type := ?CURSOR_TYPE_ETS, node := Node}, QString0, Acc0) ->
{Rows, NewCursor} = do_ets_select(Nodes, QString0, Cursor),
Acc1 = maps:update_with(rows, fun(Rs) -> [{Node, Rows} | Rs] end, Acc0),
Acc = #{limit := Limit, n := N} = maps:update_with(n, fun(N) -> N + length(Rows) end, Acc1),
case N >= Limit of
true ->
format_results(Acc, NewCursor);
false ->
do_list_clients_v2(Nodes, NewCursor, QString0, Acc)
end;
do_list_clients_v2(Nodes, _Cursor = #{type := ?CURSOR_TYPE_DS, iterator := Iter0}, QString0, Acc0) ->
#{limit := Limit} = Acc0,
{Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
NewCursor = next_ds_cursor(Iter),
Rows1 = drop_live_and_expired(Rows0),
Rows = maybe_run_fuzzy_filter(Rows1, QString0),
Acc1 = maps:update_with(rows, fun(Rs) -> [{undefined, Rows} | Rs] end, Acc0),
Acc = #{n := N} = maps:update_with(n, fun(N) -> N + length(Rows) end, Acc1),
case N >= Limit of
true ->
format_results(Acc, NewCursor);
false ->
do_list_clients_v2(Nodes, NewCursor, QString0, Acc)
end.
format_results(Acc, Cursor) ->
#{
rows := NodeRows,
n := N
} = Acc,
Meta =
case Cursor of
done ->
#{
hasnext => false,
count => N
};
_ ->
#{
hasnext => true,
count => N,
cursor => serialize_cursor(Cursor)
}
end,
Resp = #{
meta => Meta,
data => [
format_channel_info(Node, Row)
|| {Node, Rows} <- NodeRows,
Row <- Rows
]
},
?OK(Resp).
do_ets_select(Nodes, QString0, #{node := Node, node_idx := NodeIdx, cont := Cont} = _Cursor) ->
{_, QString1} = emqx_mgmt_api:parse_qstring(QString0, ?CLIENT_QSCHEMA),
Limit = maps:get(<<"limit">>, QString0, 10),
{Rows, #{cont := NewCont, node_idx := NewNodeIdx}} = ets_select(
QString1, Limit, Node, NodeIdx, Cont
),
{Rows, next_ets_cursor(Nodes, NewNodeIdx, NewCont)}.
maybe_run_fuzzy_filter(Rows, QString0) ->
{_, {_, FuzzyQString}} = emqx_mgmt_api:parse_qstring(QString0, ?CLIENT_QSCHEMA),
FuzzyFilterFn = fuzzy_filter_fun(FuzzyQString),
case FuzzyFilterFn of
undefined ->
Rows;
{Fn, Args} ->
lists:filter(
fun(E) -> erlang:apply(Fn, [E | Args]) end,
Rows
)
end.
initial_ets_cursor([Node | _Rest] = _Nodes) ->
#{
type => ?CURSOR_TYPE_ETS,
node => Node,
node_idx => 1,
cont => undefined
}.
initial_ds_cursor() ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
#{
type => ?CURSOR_TYPE_DS,
iterator => init_persistent_session_iterator()
};
false ->
done
end.
next_ets_cursor(Nodes, NodeIdx, Cont) ->
case NodeIdx > length(Nodes) of
true ->
initial_ds_cursor();
false ->
Node = lists:nth(NodeIdx, Nodes),
#{
type => ?CURSOR_TYPE_ETS,
node_idx => NodeIdx,
node => Node,
cont => Cont
}
end.
next_ds_cursor('$end_of_table') ->
done;
next_ds_cursor(Iter) ->
#{
type => ?CURSOR_TYPE_DS,
iterator => Iter
}.
parse_cursor(CursorBin, Nodes) ->
try base64:decode(CursorBin, #{mode => urlsafe, padding => false}) of
Bin ->
parse_cursor1(Bin, Nodes)
catch
_:_ ->
{error, bad_cursor}
end.
parse_cursor1(CursorBin, Nodes) ->
try binary_to_term(CursorBin, [safe]) of
[
?CURSOR_VSN1,
?CURSOR_TYPE_ETS,
#{?CURSOR_ETS_NODE_IDX := NodeIdx, ?CURSOR_ETS_CONT := Cont}
] ->
case NodeIdx > length(Nodes) of
true ->
{error, bad_cursor};
false ->
Node = lists:nth(NodeIdx, Nodes),
Cursor = #{
type => ?CURSOR_TYPE_ETS,
node => Node,
node_idx => NodeIdx,
cont => Cont
},
{ok, Cursor}
end;
[?CURSOR_VSN1, ?CURSOR_TYPE_DS, DSIter] ->
Cursor = #{type => ?CURSOR_TYPE_DS, iterator => DSIter},
{ok, Cursor};
_ ->
{error, bad_cursor}
catch
error:badarg ->
{error, bad_cursor}
end.
serialize_cursor(#{type := ?CURSOR_TYPE_ETS, node_idx := NodeIdx, cont := Cont}) ->
Cursor0 = [
?CURSOR_VSN1,
?CURSOR_TYPE_ETS,
#{?CURSOR_ETS_NODE_IDX => NodeIdx, ?CURSOR_ETS_CONT => Cont}
],
Bin = term_to_binary(Cursor0, [{compressed, 9}]),
base64:encode(Bin, #{mode => urlsafe, padding => false});
serialize_cursor(#{type := ?CURSOR_TYPE_DS, iterator := Iter}) ->
Cursor0 = [?CURSOR_VSN1, ?CURSOR_TYPE_DS, Iter],
Bin = term_to_binary(Cursor0, [{compressed, 9}]),
base64:encode(Bin, #{mode => urlsafe, padding => false}).
%% An adapter function so we can reutilize all the logic in `emqx_mgmt_api' for
%% selecting/fuzzy filters, and also reutilize its BPAPI for selecting rows.
ets_select(NQString, Limit, Node, NodeIdx, Cont) ->
QueryState0 = emqx_mgmt_api:init_query_state(
?CHAN_INFO_TAB,
NQString,
fun ?MODULE:qs2ms/2,
_Meta = #{page => unused, limit => Limit},
_Options = #{}
),
QueryState = QueryState0#{continuation => Cont},
case emqx_mgmt_api:do_query(Node, QueryState) of
{Rows, #{complete := true}} ->
{Rows, #{node_idx => NodeIdx + 1, cont => undefined}};
{Rows, #{continuation := NCont}} ->
{Rows, #{node_idx => NodeIdx, cont => NCont}}
end.
lookup(#{clientid := ClientID}) -> lookup(#{clientid := ClientID}) ->
case emqx_mgmt:lookup_client({clientid, ClientID}, ?FORMAT_FUN) of case emqx_mgmt:lookup_client({clientid, ClientID}, ?FORMAT_FUN) of
[] -> [] ->
@ -1410,13 +1677,25 @@ fuzzy_filter_fun(Fuzzy) ->
run_fuzzy_filter(_, []) -> run_fuzzy_filter(_, []) ->
true; true;
run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | Fuzzy]) -> run_fuzzy_filter(
Row = {_, #{metadata := #{clientinfo := ClientInfo}}},
[{Key, like, SubStr} | RestArgs]
) ->
%% Row from DS
run_fuzzy_filter1(ClientInfo, Key, SubStr) andalso
run_fuzzy_filter(Row, RestArgs);
run_fuzzy_filter(Row = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} | RestArgs]) ->
%% Row from ETS
run_fuzzy_filter1(ClientInfo, Key, SubStr) andalso
run_fuzzy_filter(Row, RestArgs).
run_fuzzy_filter1(ClientInfo, Key, SubStr) ->
Val = Val =
case maps:get(Key, ClientInfo, <<>>) of case maps:get(Key, ClientInfo, <<>>) of
undefined -> <<>>; undefined -> <<>>;
V -> V V -> V
end, end,
binary:match(Val, SubStr) /= nomatch andalso run_fuzzy_filter(E, Fuzzy). binary:match(Val, SubStr) /= nomatch.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% format funcs %% format funcs

View File

@ -19,8 +19,9 @@
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_router.hrl"). -include_lib("emqx/include/emqx_router.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("proper/include/proper.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl"). -include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
@ -47,7 +48,8 @@ persistent_session_testcases() ->
t_persistent_sessions2, t_persistent_sessions2,
t_persistent_sessions3, t_persistent_sessions3,
t_persistent_sessions4, t_persistent_sessions4,
t_persistent_sessions5 t_persistent_sessions5,
t_list_clients_v2
]. ].
client_msgs_testcases() -> client_msgs_testcases() ->
[ [
@ -56,11 +58,23 @@ client_msgs_testcases() ->
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(), ok = snabbkaffe:start_trace(),
Config. Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[{apps, Apps} | Config].
end_per_suite(_) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite(). Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
init_per_group(persistent_sessions, Config) -> init_per_group(persistent_sessions, Config) ->
AppSpecs = [ AppSpecs = [
@ -109,9 +123,12 @@ end_per_testcase(TC, _Config) when
?LINE, ?LINE,
fun() -> [] =:= emqx_cm:lookup_channels(local, ClientId) end, fun() -> [] =:= emqx_cm:lookup_channels(local, ClientId) end,
5000 5000
); ),
ok = snabbkaffe:stop(),
ok;
end_per_testcase(_TC, _Config) -> end_per_testcase(_TC, _Config) ->
ok = snabbkaffe:stop(). ok = snabbkaffe:stop(),
ok.
t_clients(_) -> t_clients(_) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
@ -522,6 +539,12 @@ t_persistent_sessions5(Config) ->
), ),
lists:foreach(fun emqtt:stop/1, [C3, C4]), lists:foreach(fun emqtt:stop/1, [C3, C4]),
lists:foreach(
fun(ClientId) ->
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId])
end,
[ClientId1, ClientId2, ClientId3, ClientId4]
),
ok ok
end, end,
@ -1415,6 +1438,319 @@ t_subscribe_shared_topic_nl(_Config) ->
PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1}) PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1})
). ).
t_list_clients_v2(Config) ->
[N1, N2] = ?config(nodes, Config),
APIPort = 18084,
Port1 = get_mqtt_port(N1, tcp),
Port2 = get_mqtt_port(N2, tcp),
?check_trace(
begin
ClientId1 = <<"ca1">>,
ClientId2 = <<"c2">>,
ClientId3 = <<"c3">>,
ClientId4 = <<"ca4">>,
ClientId5 = <<"ca5">>,
ClientId6 = <<"c6">>,
AllClientIds = [
ClientId1,
ClientId2,
ClientId3,
ClientId4,
ClientId5,
ClientId6
],
C1 = connect_client(#{port => Port1, clientid => ClientId1, clean_start => true}),
C2 = connect_client(#{port => Port2, clientid => ClientId2, clean_start => true}),
C3 = connect_client(#{port => Port1, clientid => ClientId3, clean_start => true}),
C4 = connect_client(#{port => Port2, clientid => ClientId4, clean_start => true}),
%% in-memory clients
C5 = connect_client(#{
port => Port1, clientid => ClientId5, expiry => 0, clean_start => true
}),
C6 = connect_client(#{
port => Port2, clientid => ClientId6, expiry => 0, clean_start => true
}),
%% offline persistent clients
ok = emqtt:stop(C3),
ok = emqtt:stop(C4),
%% one by one
QueryParams1 = #{limit => "1"},
Res1 = list_all_v2(APIPort, QueryParams1),
?assertMatch(
[
#{
<<"data">> := [_],
<<"meta">> :=
#{
<<"hasnext">> := true,
<<"count">> := 1,
<<"cursor">> := _
}
},
#{
<<"data">> := [_],
<<"meta">> :=
#{
<<"hasnext">> := true,
<<"count">> := 1,
<<"cursor">> := _
}
},
#{
<<"data">> := [_],
<<"meta">> :=
#{
<<"hasnext">> := true,
<<"count">> := 1,
<<"cursor">> := _
}
},
#{
<<"data">> := [_],
<<"meta">> :=
#{
<<"hasnext">> := true,
<<"count">> := 1,
<<"cursor">> := _
}
},
#{
<<"data">> := [_],
<<"meta">> :=
#{
<<"hasnext">> := true,
<<"count">> := 1,
<<"cursor">> := _
}
},
#{
<<"data">> := [_],
<<"meta">> :=
#{
<<"hasnext">> := false,
<<"count">> := 1
}
}
],
Res1
),
assert_contains_clientids(Res1, AllClientIds),
%% Reusing the same cursors yield the same pages
traverse_in_reverse_v2(APIPort, QueryParams1, Res1),
%% paging
QueryParams2 = #{limit => "4"},
Res2 = list_all_v2(APIPort, QueryParams2),
?assertMatch(
[
#{
<<"data">> := [_, _, _, _],
<<"meta">> :=
#{
<<"hasnext">> := true,
<<"count">> := 4,
<<"cursor">> := _
}
},
#{
<<"data">> := [_, _],
<<"meta">> :=
#{
<<"hasnext">> := false,
<<"count">> := 2
}
}
],
Res2
),
assert_contains_clientids(Res2, AllClientIds),
traverse_in_reverse_v2(APIPort, QueryParams2, Res2),
QueryParams3 = #{limit => "2"},
Res3 = list_all_v2(APIPort, QueryParams3),
?assertMatch(
[
#{
<<"data">> := [_, _],
<<"meta">> :=
#{
<<"hasnext">> := true,
<<"count">> := 2,
<<"cursor">> := _
}
},
#{
<<"data">> := [_, _],
<<"meta">> :=
#{
<<"hasnext">> := true,
<<"count">> := 2,
<<"cursor">> := _
}
},
#{
<<"data">> := [_, _],
<<"meta">> :=
#{
<<"hasnext">> := false,
<<"count">> := 2
}
}
],
Res3
),
assert_contains_clientids(Res3, AllClientIds),
traverse_in_reverse_v2(APIPort, QueryParams3, Res3),
%% fuzzy filters
QueryParams4 = #{limit => "100", like_clientid => "ca"},
Res4 = list_all_v2(APIPort, QueryParams4),
?assertMatch(
[
#{
<<"data">> := [_, _, _],
<<"meta">> :=
#{
<<"hasnext">> := false,
<<"count">> := 3
}
}
],
Res4
),
assert_contains_clientids(Res4, [ClientId1, ClientId4, ClientId5]),
traverse_in_reverse_v2(APIPort, QueryParams4, Res4),
QueryParams5 = #{limit => "1", like_clientid => "ca"},
Res5 = list_all_v2(APIPort, QueryParams5),
?assertMatch(
[
#{
<<"data">> := [_],
<<"meta">> :=
#{
<<"hasnext">> := true,
<<"count">> := 1,
<<"cursor">> := _
}
},
#{
<<"data">> := [_],
<<"meta">> :=
#{
<<"hasnext">> := true,
<<"count">> := 1,
<<"cursor">> := _
}
},
#{
<<"data">> := [_],
<<"meta">> :=
#{
<<"hasnext">> := false,
<<"count">> := 1
}
}
],
Res5
),
assert_contains_clientids(Res5, [ClientId1, ClientId4, ClientId5]),
traverse_in_reverse_v2(APIPort, QueryParams5, Res5),
lists:foreach(
fun(C) ->
{_, {ok, _}} =
?wait_async_action(
emqtt:stop(C),
#{?snk_kind := emqx_cm_clean_down}
)
end,
[C1, C2, C5, C6]
),
%% Verify that a malicious cursor that could generate an atom on the node is
%% rejected
EvilAtomBin0 = <<131, 100, 0, 5, "some_atom_that_doesnt_exist_on_the_remote_node">>,
EvilAtomBin = base64:encode(EvilAtomBin0, #{mode => urlsafe, padding => false}),
?assertMatch(
{error, {{_, 400, _}, _, #{<<"message">> := <<"bad cursor">>}}},
list_v2_request(APIPort, #{limit => "1", cursor => EvilAtomBin})
),
%% Verify that the atom was not created
erpc:call(N1, fun() ->
?assertError(badarg, binary_to_term(EvilAtomBin0, [safe]))
end),
?assert(is_atom(binary_to_term(EvilAtomBin0))),
lists:foreach(
fun(ClientId) ->
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId])
end,
AllClientIds
),
ok
end,
[]
),
ok.
t_cursor_serde_prop(_Config) ->
?assert(proper:quickcheck(cursor_serde_prop(), [{numtests, 100}, {to_file, user}])).
cursor_serde_prop() ->
?FORALL(
NumNodes,
range(1, 10),
?FORALL(
Cursor,
list_clients_cursor_gen(NumNodes),
begin
Nodes = lists:seq(1, NumNodes),
Bin = emqx_mgmt_api_clients:serialize_cursor(Cursor),
Res = emqx_mgmt_api_clients:parse_cursor(Bin, Nodes),
?WHENFAIL(
ct:pal("original:\n ~p\nroundtrip:\n ~p", [Cursor, Res]),
{ok, Cursor} =:= Res
)
end
)
).
list_clients_cursor_gen(NumNodes) ->
oneof([
lists_clients_ets_cursor_gen(NumNodes),
lists_clients_ds_cursor_gen()
]).
-define(CURSOR_TYPE_ETS, 1).
-define(CURSOR_TYPE_DS, 2).
lists_clients_ets_cursor_gen(NumNodes) ->
?LET(
{NodeIdx, Cont},
{range(1, NumNodes), oneof([undefined, tuple()])},
#{
type => ?CURSOR_TYPE_ETS,
node => NodeIdx,
node_idx => NodeIdx,
cont => Cont
}
).
lists_clients_ds_cursor_gen() ->
?LET(
Iter,
oneof(['$end_of_table', list(term())]),
#{
type => ?CURSOR_TYPE_DS,
iterator => Iter
}
).
time_string_to_epoch_millisecond(DateTime) -> time_string_to_epoch_millisecond(DateTime) ->
time_string_to_epoch(DateTime, millisecond). time_string_to_epoch(DateTime, millisecond).
@ -1472,6 +1808,31 @@ list_request(Port, QueryParams) ->
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]), Path = emqx_mgmt_api_test_util:api_path(Host, ["clients"]),
request(get, Path, [], QueryParams). request(get, Path, [], QueryParams).
list_v2_request(Port, QueryParams = #{}) ->
Host = "http://127.0.0.1:" ++ integer_to_list(Port),
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients_v2"]),
QS = uri_string:compose_query(maps:to_list(emqx_utils_maps:binary_key_map(QueryParams))),
request(get, Path, [], QS).
list_all_v2(Port, QueryParams = #{}) ->
do_list_all_v2(Port, QueryParams, _Acc = []).
do_list_all_v2(Port, QueryParams, Acc) ->
case list_v2_request(Port, QueryParams) of
{ok, {{_, 200, _}, _, Resp = #{<<"meta">> := #{<<"cursor">> := Cursor}}}} ->
do_list_all_v2(Port, QueryParams#{cursor => Cursor}, [Resp | Acc]);
{ok, {{_, 200, _}, _, Resp = #{<<"meta">> := #{<<"hasnext">> := false}}}} ->
lists:reverse([Resp | Acc]);
Other ->
error(
{unexpected_response, #{
acc_so_far => Acc,
response => Other,
query_params => QueryParams
}}
)
end.
lookup_request(ClientId) -> lookup_request(ClientId) ->
lookup_request(ClientId, 18083). lookup_request(ClientId, 18083).
@ -1535,3 +1896,44 @@ connect_client(Opts) ->
]), ]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
C. C.
assert_contains_clientids(Results, ExpectedClientIds) ->
ContainedClientIds = [
ClientId
|| #{<<"data">> := Rows} <- Results,
#{<<"clientid">> := ClientId} <- Rows
],
?assertEqual(
lists:sort(ExpectedClientIds),
lists:sort(ContainedClientIds),
#{results => Results}
).
traverse_in_reverse_v2(APIPort, QueryParams0, Results) ->
Cursors0 =
lists:map(
fun(#{<<"meta">> := Meta}) ->
maps:get(<<"cursor">>, Meta, <<"wontbeused">>)
end,
Results
),
Cursors1 = [<<"none">> | lists:droplast(Cursors0)],
DirectOrderClientIds = [
ClientId
|| #{<<"data">> := Rows} <- Results,
#{<<"clientid">> := ClientId} <- Rows
],
ReverseCursors = lists:reverse(Cursors1),
do_traverse_in_reverse_v2(
APIPort, QueryParams0, ReverseCursors, DirectOrderClientIds, _Acc = []
).
do_traverse_in_reverse_v2(_APIPort, _QueryParams0, _Cursors = [], DirectOrderClientIds, Acc) ->
?assertEqual(DirectOrderClientIds, Acc);
do_traverse_in_reverse_v2(APIPort, QueryParams0, [Cursor | Rest], DirectOrderClientIds, Acc) ->
QueryParams = QueryParams0#{cursor => Cursor},
Res0 = list_v2_request(APIPort, QueryParams),
?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := _}}}, Res0),
{ok, {{_, 200, _}, _, #{<<"data">> := Rows}}} = Res0,
ClientIds = [ClientId || #{<<"clientid">> := ClientId} <- Rows],
do_traverse_in_reverse_v2(APIPort, QueryParams0, Rest, DirectOrderClientIds, ClientIds ++ Acc).

View File

@ -0,0 +1 @@
Added new `GET /api/v5/clients_v2` API that uses cursors instead of page numbers for pagination. This should be more efficient than the old API endpoint, which currently traverses tables multiple times.