Merge pull request #12734 from keynslug/fix/EMQX-12030/topics-api

fix(api-topics): expose persistent session topics in APIs
This commit is contained in:
Andrew Mayorov 2024-03-19 21:08:23 +01:00 committed by GitHub
commit e2db038e7c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 199 additions and 83 deletions

View File

@ -21,11 +21,12 @@
-record(ps_route, { -record(ps_route, {
topic :: binary(), topic :: binary(),
dest :: emqx_persistent_session_ds:id() dest :: emqx_persistent_session_ds:id() | '_'
}). }).
-record(ps_routeidx, { -record(ps_routeidx, {
entry :: '$1' | emqx_topic_index:key(emqx_persistent_session_ds_router:dest()), entry :: '$1' | emqx_topic_index:key(emqx_persistent_session_ds_router:dest()),
unused = [] :: nil() unused = [] :: nil() | '_'
}). }).
-endif. -endif.

View File

@ -32,6 +32,9 @@
foldl_routes/2 foldl_routes/2
]). ]).
%% Topics API
-export([stream/1]).
-export([cleanup_routes/1]). -export([cleanup_routes/1]).
-export([print_routes/1]). -export([print_routes/1]).
-export([topics/0]). -export([topics/0]).
@ -196,6 +199,18 @@ foldl_routes(FoldFun, AccIn) ->
foldr_routes(FoldFun, AccIn) -> foldr_routes(FoldFun, AccIn) ->
fold_routes(foldr, FoldFun, AccIn). fold_routes(foldr, FoldFun, AccIn).
%%--------------------------------------------------------------------
%% Topic API
%%--------------------------------------------------------------------
%% @doc Create a `emqx_utils_stream:stream(#route{})` out of the router state,
%% potentially filtered by a topic or topic filter. The stream emits `#route{}`
%% records since this is what `emqx_mgmt_api_topics` knows how to deal with.
-spec stream(_MTopic :: '_' | emqx_types:topic()) ->
emqx_utils_stream:stream(emqx_types:route()).
stream(MTopic) ->
emqx_utils_stream:chain(stream(?PS_ROUTER_TAB, MTopic), stream(?PS_FILTERS_TAB, MTopic)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal fns %% Internal fns
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -225,6 +240,12 @@ get_dest_session_id({_, DSSessionId}) ->
get_dest_session_id(DSSessionId) -> get_dest_session_id(DSSessionId) ->
DSSessionId. DSSessionId.
export_route(#ps_route{topic = Topic, dest = Dest}) ->
#route{topic = Topic, dest = Dest}.
export_routeidx(#ps_routeidx{entry = M}) ->
#route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
match_to_route(M) -> match_to_route(M) ->
#ps_route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. #ps_route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
@ -242,3 +263,35 @@ list_route_tab_topics() ->
mria_route_tab_delete(Route) -> mria_route_tab_delete(Route) ->
mria:dirty_delete_object(?PS_ROUTER_TAB, Route). mria:dirty_delete_object(?PS_ROUTER_TAB, Route).
%% @doc Create a `emqx_utils_stream:stream(#route{})` out of contents of either of
%% 2 route tables, optionally filtered by a topic or topic filter. If the latter is
%% specified, then it doesn't make sense to scan through `?PS_ROUTER_TAB` if it's
%% a wildcard topic, and vice versa for `?PS_FILTERS_TAB` if it's not, so we optimize
%% it away by returning an empty stream in those cases.
stream(Tab = ?PS_ROUTER_TAB, MTopic) ->
case MTopic == '_' orelse not emqx_topic:wildcard(MTopic) of
true ->
MatchSpec = #ps_route{topic = MTopic, _ = '_'},
mk_tab_stream(Tab, MatchSpec, fun export_route/1);
false ->
emqx_utils_stream:empty()
end;
stream(Tab = ?PS_FILTERS_TAB, MTopic) ->
case MTopic == '_' orelse emqx_topic:wildcard(MTopic) of
true ->
MatchSpec = #ps_routeidx{entry = emqx_trie_search:make_pat(MTopic, '_'), _ = '_'},
mk_tab_stream(Tab, MatchSpec, fun export_routeidx/1);
false ->
emqx_utils_stream:empty()
end.
mk_tab_stream(Tab, MatchSpec, Mapper) ->
%% NOTE: Currently relying on the fact that tables are backed by ETSes.
emqx_utils_stream:map(
Mapper,
emqx_utils_stream:ets(fun
(undefined) -> ets:match_object(Tab, MatchSpec, 1);
(Cont) -> ets:match_object(Cont)
end)
).

View File

@ -58,7 +58,7 @@
]). ]).
%% Topics API %% Topics API
-export([select/3]). -export([stream/1]).
-export([print_routes/1]). -export([print_routes/1]).
@ -266,18 +266,15 @@ mria_batch_v1(Batch) ->
batch_get_action(Op) -> batch_get_action(Op) ->
element(1, Op). element(1, Op).
-spec select(Spec, _Limit :: pos_integer(), Continuation) -> -spec stream(_Spec :: {_TopicPat, _DestPat}) ->
{[emqx_types:route()], Continuation} | '$end_of_table' emqx_utils_stream:stream(emqx_types:route()).
when stream(MatchSpec) ->
Spec :: {_TopicPat, _DestPat}, stream(get_schema_vsn(), MatchSpec).
Continuation :: term() | '$end_of_table'.
select(MatchSpec, Limit, Cont) ->
select(get_schema_vsn(), MatchSpec, Limit, Cont).
select(v2, MatchSpec, Limit, Cont) -> stream(v2, MatchSpec) ->
select_v2(MatchSpec, Limit, Cont); stream_v2(MatchSpec);
select(v1, MatchSpec, Limit, Cont) -> stream(v1, MatchSpec) ->
select_v1(MatchSpec, Limit, Cont). stream_v1(MatchSpec).
-spec topics() -> list(emqx_types:topic()). -spec topics() -> list(emqx_types:topic()).
topics() -> topics() ->
@ -452,10 +449,8 @@ cleanup_routes_v1_fallback(Node) ->
] ]
end). end).
select_v1({MTopic, MDest}, Limit, undefined) -> stream_v1(Spec) ->
ets:match_object(?ROUTE_TAB, #route{topic = MTopic, dest = MDest}, Limit); mk_route_stream(?ROUTE_TAB, Spec).
select_v1(_Spec, _Limit, Cont) ->
ets:select(Cont).
list_topics_v1() -> list_topics_v1() ->
list_route_tab_topics(). list_route_tab_topics().
@ -591,36 +586,27 @@ make_route_rec_pat(DestPattern) ->
[{1, route}, {#route.dest, DestPattern}] [{1, route}, {#route.dest, DestPattern}]
). ).
select_v2(Spec, Limit, undefined) -> stream_v2(Spec) ->
Stream = mk_route_stream(Spec),
select_next(Limit, Stream);
select_v2(_Spec, Limit, Stream) ->
select_next(Limit, Stream).
select_next(N, Stream) ->
case emqx_utils_stream:consume(N, Stream) of
{Routes, SRest} ->
{Routes, SRest};
Routes ->
{Routes, '$end_of_table'}
end.
mk_route_stream(Spec) ->
emqx_utils_stream:chain( emqx_utils_stream:chain(
mk_route_stream(route, Spec), mk_route_stream(?ROUTE_TAB, Spec),
mk_route_stream(filter, Spec) mk_route_stream(?ROUTE_TAB_FILTERS, Spec)
). ).
mk_route_stream(route, Spec) -> mk_route_stream(Tab = ?ROUTE_TAB, {MTopic, MDest}) ->
emqx_utils_stream:ets(fun(Cont) -> select_v1(Spec, 1, Cont) end); emqx_utils_stream:ets(fun
mk_route_stream(filter, {MTopic, MDest}) -> (undefined) ->
ets:match_object(Tab, #route{topic = MTopic, dest = MDest}, 1);
(Cont) ->
ets:match_object(Cont)
end);
mk_route_stream(Tab = ?ROUTE_TAB_FILTERS, {MTopic, MDest}) ->
emqx_utils_stream:map( emqx_utils_stream:map(
fun routeidx_to_route/1, fun routeidx_to_route/1,
emqx_utils_stream:ets( emqx_utils_stream:ets(
fun fun
(undefined) -> (undefined) ->
MatchSpec = #routeidx{entry = emqx_trie_search:make_pat(MTopic, MDest)}, MatchSpec = #routeidx{entry = emqx_trie_search:make_pat(MTopic, MDest)},
ets:match_object(?ROUTE_TAB_FILTERS, MatchSpec, 1); ets:match_object(Tab, MatchSpec, 1);
(Cont) -> (Cont) ->
ets:match_object(Cont) ets:match_object(Cont)
end end

View File

@ -96,6 +96,11 @@ fields(topic) ->
hoconsc:mk(binary(), #{ hoconsc:mk(binary(), #{
desc => <<"Node">>, desc => <<"Node">>,
required => true required => true
})},
{session,
hoconsc:mk(binary(), #{
desc => <<"Session ID">>,
required => false
})} })}
]. ].
@ -113,8 +118,8 @@ do_list(Params) ->
try try
Pager = parse_pager_params(Params), Pager = parse_pager_params(Params),
{_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA), {_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA),
QState = Pager#{continuation => undefined}, Stream = mk_topic_stream(qs2ms(Query)),
QResult = eval_topic_query(qs2ms(Query), QState), QResult = eval_topic_query(Stream, Pager, emqx_mgmt_api:init_query_result()),
{200, format_list_response(Pager, Query, QResult)} {200, format_list_response(Pager, Query, QResult)}
catch catch
throw:{error, page_limit_invalid} -> throw:{error, page_limit_invalid} ->
@ -160,31 +165,37 @@ gen_match_spec({topic, '=:=', QTopic}, {_MTopic, MNode}) when is_atom(MNode) ->
gen_match_spec({node, '=:=', QNode}, {MTopic, _MDest}) -> gen_match_spec({node, '=:=', QNode}, {MTopic, _MDest}) ->
{MTopic, QNode}. {MTopic, QNode}.
eval_topic_query(MS, QState) -> mk_topic_stream(Spec = {MTopic, _MDest = '_'}) ->
finalize_query(eval_topic_query(MS, QState, emqx_mgmt_api:init_query_result())). emqx_utils_stream:chain(emqx_router:stream(Spec), mk_persistent_topic_stream(MTopic));
mk_topic_stream(Spec) ->
%% NOTE: Assuming that no persistent topic ever matches a query with `node` filter.
emqx_router:stream(Spec).
eval_topic_query(MS, QState, QResult) -> mk_persistent_topic_stream(Spec) ->
case eval_topic_query_page(MS, QState) of case emqx_persistent_message:is_persistence_enabled() of
{Rows, '$end_of_table'} -> true ->
{_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult), emqx_persistent_session_ds_router:stream(Spec);
NQResult#{complete => true}; false ->
{Rows, NCont} -> emqx_utils_stream:empty()
case emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult) of
{more, NQResult} ->
eval_topic_query(MS, QState#{continuation := NCont}, NQResult);
{enough, NQResult} ->
NQResult#{complete => false}
end;
'$end_of_table' ->
QResult#{complete => true}
end. end.
eval_topic_query_page(MS, #{limit := Limit, continuation := Cont}) -> eval_topic_query(Stream, QState = #{limit := Limit}, QResult) ->
emqx_router:select(MS, Limit, Cont). case emqx_utils_stream:consume(Limit, Stream) of
{Rows, NStream} ->
case emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult) of
{more, NQResult} ->
eval_topic_query(NStream, QState, NQResult);
{enough, NQResult} ->
finalize_query(false, NQResult)
end;
Rows when is_list(Rows) ->
{_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
finalize_query(true, NQResult)
end.
finalize_query(QResult = #{overflow := Overflow, complete := Complete}) -> finalize_query(Complete, QResult = #{overflow := Overflow}) ->
HasNext = Overflow orelse not Complete, HasNext = Overflow orelse not Complete,
QResult#{hasnext => HasNext}. QResult#{complete => Complete, hasnext => HasNext}.
format_list_response(Meta, Query, QResult = #{rows := RowsAcc}) -> format_list_response(Meta, Query, QResult = #{rows := RowsAcc}) ->
#{ #{
@ -205,7 +216,9 @@ format_response_meta(Meta, _Query, #{hasnext := HasNext}) ->
format(#route{topic = Topic, dest = {Group, Node}}) -> format(#route{topic = Topic, dest = {Group, Node}}) ->
#{topic => ?SHARE(Group, Topic), node => Node}; #{topic => ?SHARE(Group, Topic), node => Node};
format(#route{topic = Topic, dest = Node}) when is_atom(Node) -> format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
#{topic => Topic, node => Node}. #{topic => Topic, node => Node};
format(#route{topic = Topic, dest = SessionId}) when is_binary(SessionId) ->
#{topic => Topic, session => SessionId}.
topic_param(In) -> topic_param(In) ->
{ {

View File

@ -56,12 +56,10 @@ client_msgs_testcases() ->
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
ok = snabbkaffe:start_trace(),
emqx_mgmt_api_test_util:init_suite(), emqx_mgmt_api_test_util:init_suite(),
Config. Config.
end_per_suite(_) -> end_per_suite(_) ->
ok = snabbkaffe:stop(),
emqx_mgmt_api_test_util:end_suite(). emqx_mgmt_api_test_util:end_suite().
init_per_group(persistent_sessions, Config) -> init_per_group(persistent_sessions, Config) ->
@ -95,10 +93,15 @@ end_per_group(persistent_sessions, Config) ->
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.
init_per_testcase(_TC, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(TC, _Config) when end_per_testcase(TC, _Config) when
TC =:= t_inflight_messages; TC =:= t_inflight_messages;
TC =:= t_mqueue_messages TC =:= t_mqueue_messages
-> ->
ok = snabbkaffe:stop(),
ClientId = atom_to_binary(TC), ClientId = atom_to_binary(TC),
lists:foreach(fun(P) -> exit(P, kill) end, emqx_cm:lookup_channels(local, ClientId)), lists:foreach(fun(P) -> exit(P, kill) end, emqx_cm:lookup_channels(local, ClientId)),
ok = emqx_common_test_helpers:wait_for( ok = emqx_common_test_helpers:wait_for(
@ -108,7 +111,7 @@ end_per_testcase(TC, _Config) when
5000 5000
); );
end_per_testcase(_TC, _Config) -> end_per_testcase(_TC, _Config) ->
ok. ok = snabbkaffe:stop().
t_clients(_) -> t_clients(_) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
@ -313,8 +316,7 @@ t_persistent_sessions2(Config) ->
%% 2) Client connects to the same node and takes over, listed only once. %% 2) Client connects to the same node and takes over, listed only once.
C2 = connect_client(#{port => Port1, clientid => ClientId}), C2 = connect_client(#{port => Port1, clientid => ClientId}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
ok = emqtt:stop(C2), ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}),
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
?retry( ?retry(
100, 100,
20, 20,
@ -322,9 +324,7 @@ t_persistent_sessions2(Config) ->
{ok, {{_, 200, _}, _, #{<<"data">> := []}}}, {ok, {{_, 200, _}, _, #{<<"data">> := []}}},
list_request(APIPort) list_request(APIPort)
) )
), )
ok
end, end,
[] []
), ),
@ -360,10 +360,7 @@ t_persistent_sessions3(Config) ->
list_request(APIPort, "node=" ++ atom_to_list(N1)) list_request(APIPort, "node=" ++ atom_to_list(N1))
) )
), ),
ok = emqtt:stop(C2), ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0})
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
ok
end, end,
[] []
), ),
@ -403,10 +400,7 @@ t_persistent_sessions4(Config) ->
list_request(APIPort, "node=" ++ atom_to_list(N1)) list_request(APIPort, "node=" ++ atom_to_list(N1))
) )
), ),
ok = emqtt:stop(C2), ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0})
ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
ok
end, end,
[] []
), ),

View File

@ -27,7 +27,7 @@ all() ->
init_per_suite(Config) -> init_per_suite(Config) ->
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
[ [
emqx, {emqx, "session_persistence.enable = true"},
emqx_management, emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard() emqx_mgmt_api_test_util:emqx_dashboard()
], ],
@ -204,13 +204,82 @@ t_shared_topics_invalid(_Config) ->
emqx_utils_json:decode(Body, [return_maps]) emqx_utils_json:decode(Body, [return_maps])
). ).
t_persistent_topics(_Config) ->
PersistentOpts = #{
proto_ver => v5,
properties => #{'Session-Expiry-Interval' => 300}
},
Client1 = client(t_persistent_topics_m1),
Client2 = client(t_persistent_topics_m2),
SessionId1 = <<"t_persistent_topics_p1">>,
SessionId2 = <<"t_persistent_topics_p2">>,
ClientPersistent1 = client(SessionId1, PersistentOpts),
ClientPersistent2 = client(SessionId2, PersistentOpts),
_ = [
?assertMatch({ok, _, _}, emqtt:subscribe(Client, Topic))
|| {Client, Topics} <- [
{Client1, [<<"t/client/mem">>, <<"t/+">>]},
{Client2, [<<"t/client/mem">>, <<"t/+">>]},
{ClientPersistent1, [<<"t/persistent/#">>, <<"t/client/ps">>, <<"t/+">>]},
{ClientPersistent2, [<<"t/persistent/#">>, <<"t/client/ps">>, <<"t/+">>]}
],
Topic <- Topics
],
Matched = request_json(get, ["topics"]),
?assertMatch(
#{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 8},
maps:get(<<"meta">>, Matched)
),
%% Get back both topics for both persistent and in-memory subscriptions.
Expected = [
#{<<"topic">> => <<"t/+">>, <<"node">> => atom_to_binary(node())},
#{<<"topic">> => <<"t/+">>, <<"session">> => SessionId1},
#{<<"topic">> => <<"t/+">>, <<"session">> => SessionId2},
#{<<"topic">> => <<"t/client/mem">>, <<"node">> => atom_to_binary(node())},
#{<<"topic">> => <<"t/client/ps">>, <<"session">> => SessionId1},
#{<<"topic">> => <<"t/client/ps">>, <<"session">> => SessionId2},
#{<<"topic">> => <<"t/persistent/#">>, <<"session">> => SessionId1},
#{<<"topic">> => <<"t/persistent/#">>, <<"session">> => SessionId2}
],
?assertEqual(
lists:sort(Expected),
lists:sort(maps:get(<<"data">>, Matched))
),
%% Are results the same when paginating?
#{<<"data">> := Page1} = request_json(get, ["topics"], [{"page", "1"}, {"limit", "3"}]),
#{<<"data">> := Page2} = request_json(get, ["topics"], [{"page", "2"}, {"limit", "3"}]),
#{<<"data">> := Page3} = request_json(get, ["topics"], [{"page", "3"}, {"limit", "3"}]),
?assertEqual(
lists:sort(Expected),
lists:sort(Page1 ++ Page2 ++ Page3)
),
%% Filtering by node makes no sense for persistent sessions.
?assertMatch(
#{
<<"data">> := [
#{<<"topic">> := <<"t/client/mem">>, <<"node">> := _},
#{<<"topic">> := <<"t/+">>, <<"node">> := _}
],
<<"meta">> := #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 2}
},
request_json(get, ["topics"], [{"node", atom_to_list(node())}])
).
%% Utilities %% Utilities
client(Name) -> client(Name) ->
{ok, Client} = emqtt:start_link(#{ client(Name, #{}).
username => emqx_utils_conv:bin(Name),
clientid => emqx_utils_conv:bin(Name) client(Name, Overrides) ->
}), {ok, Client} = emqtt:start_link(
maps:merge(
#{
username => emqx_utils_conv:bin(Name),
clientid => emqx_utils_conv:bin(Name)
},
Overrides
)
),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
Client. Client.