diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl index f06e8369a..dc487376b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl @@ -21,11 +21,12 @@ -record(ps_route, { topic :: binary(), - dest :: emqx_persistent_session_ds:id() + dest :: emqx_persistent_session_ds:id() | '_' }). + -record(ps_routeidx, { entry :: '$1' | emqx_topic_index:key(emqx_persistent_session_ds_router:dest()), - unused = [] :: nil() + unused = [] :: nil() | '_' }). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_router.erl b/apps/emqx/src/emqx_persistent_session_ds_router.erl index 9a1897c8c..46e45233d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_router.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_router.erl @@ -32,6 +32,9 @@ foldl_routes/2 ]). +%% Topics API +-export([stream/1]). + -export([cleanup_routes/1]). -export([print_routes/1]). -export([topics/0]). @@ -196,6 +199,18 @@ foldl_routes(FoldFun, AccIn) -> foldr_routes(FoldFun, AccIn) -> fold_routes(foldr, FoldFun, AccIn). +%%-------------------------------------------------------------------- +%% Topic API +%%-------------------------------------------------------------------- + +%% @doc Create a `emqx_utils_stream:stream(#route{})` out of the router state, +%% potentially filtered by a topic or topic filter. The stream emits `#route{}` +%% records since this is what `emqx_mgmt_api_topics` knows how to deal with. +-spec stream(_MTopic :: '_' | emqx_types:topic()) -> + emqx_utils_stream:stream(emqx_types:route()). +stream(MTopic) -> + emqx_utils_stream:chain(stream(?PS_ROUTER_TAB, MTopic), stream(?PS_FILTERS_TAB, MTopic)). + %%-------------------------------------------------------------------- %% Internal fns %%-------------------------------------------------------------------- @@ -225,6 +240,12 @@ get_dest_session_id({_, DSSessionId}) -> get_dest_session_id(DSSessionId) -> DSSessionId. +export_route(#ps_route{topic = Topic, dest = Dest}) -> + #route{topic = Topic, dest = Dest}. + +export_routeidx(#ps_routeidx{entry = M}) -> + #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. + match_to_route(M) -> #ps_route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}. @@ -242,3 +263,35 @@ list_route_tab_topics() -> mria_route_tab_delete(Route) -> mria:dirty_delete_object(?PS_ROUTER_TAB, Route). + +%% @doc Create a `emqx_utils_stream:stream(#route{})` out of contents of either of +%% 2 route tables, optionally filtered by a topic or topic filter. If the latter is +%% specified, then it doesn't make sense to scan through `?PS_ROUTER_TAB` if it's +%% a wildcard topic, and vice versa for `?PS_FILTERS_TAB` if it's not, so we optimize +%% it away by returning an empty stream in those cases. +stream(Tab = ?PS_ROUTER_TAB, MTopic) -> + case MTopic == '_' orelse not emqx_topic:wildcard(MTopic) of + true -> + MatchSpec = #ps_route{topic = MTopic, _ = '_'}, + mk_tab_stream(Tab, MatchSpec, fun export_route/1); + false -> + emqx_utils_stream:empty() + end; +stream(Tab = ?PS_FILTERS_TAB, MTopic) -> + case MTopic == '_' orelse emqx_topic:wildcard(MTopic) of + true -> + MatchSpec = #ps_routeidx{entry = emqx_trie_search:make_pat(MTopic, '_'), _ = '_'}, + mk_tab_stream(Tab, MatchSpec, fun export_routeidx/1); + false -> + emqx_utils_stream:empty() + end. + +mk_tab_stream(Tab, MatchSpec, Mapper) -> + %% NOTE: Currently relying on the fact that tables are backed by ETSes. + emqx_utils_stream:map( + Mapper, + emqx_utils_stream:ets(fun + (undefined) -> ets:match_object(Tab, MatchSpec, 1); + (Cont) -> ets:match_object(Cont) + end) + ). diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 3f5b0181f..ef9aac628 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -58,7 +58,7 @@ ]). %% Topics API --export([select/3]). +-export([stream/1]). -export([print_routes/1]). @@ -266,18 +266,15 @@ mria_batch_v1(Batch) -> batch_get_action(Op) -> element(1, Op). --spec select(Spec, _Limit :: pos_integer(), Continuation) -> - {[emqx_types:route()], Continuation} | '$end_of_table' -when - Spec :: {_TopicPat, _DestPat}, - Continuation :: term() | '$end_of_table'. -select(MatchSpec, Limit, Cont) -> - select(get_schema_vsn(), MatchSpec, Limit, Cont). +-spec stream(_Spec :: {_TopicPat, _DestPat}) -> + emqx_utils_stream:stream(emqx_types:route()). +stream(MatchSpec) -> + stream(get_schema_vsn(), MatchSpec). -select(v2, MatchSpec, Limit, Cont) -> - select_v2(MatchSpec, Limit, Cont); -select(v1, MatchSpec, Limit, Cont) -> - select_v1(MatchSpec, Limit, Cont). +stream(v2, MatchSpec) -> + stream_v2(MatchSpec); +stream(v1, MatchSpec) -> + stream_v1(MatchSpec). -spec topics() -> list(emqx_types:topic()). topics() -> @@ -452,10 +449,8 @@ cleanup_routes_v1_fallback(Node) -> ] end). -select_v1({MTopic, MDest}, Limit, undefined) -> - ets:match_object(?ROUTE_TAB, #route{topic = MTopic, dest = MDest}, Limit); -select_v1(_Spec, _Limit, Cont) -> - ets:select(Cont). +stream_v1(Spec) -> + mk_route_stream(?ROUTE_TAB, Spec). list_topics_v1() -> list_route_tab_topics(). @@ -591,36 +586,27 @@ make_route_rec_pat(DestPattern) -> [{1, route}, {#route.dest, DestPattern}] ). -select_v2(Spec, Limit, undefined) -> - Stream = mk_route_stream(Spec), - select_next(Limit, Stream); -select_v2(_Spec, Limit, Stream) -> - select_next(Limit, Stream). - -select_next(N, Stream) -> - case emqx_utils_stream:consume(N, Stream) of - {Routes, SRest} -> - {Routes, SRest}; - Routes -> - {Routes, '$end_of_table'} - end. - -mk_route_stream(Spec) -> +stream_v2(Spec) -> emqx_utils_stream:chain( - mk_route_stream(route, Spec), - mk_route_stream(filter, Spec) + mk_route_stream(?ROUTE_TAB, Spec), + mk_route_stream(?ROUTE_TAB_FILTERS, Spec) ). -mk_route_stream(route, Spec) -> - emqx_utils_stream:ets(fun(Cont) -> select_v1(Spec, 1, Cont) end); -mk_route_stream(filter, {MTopic, MDest}) -> +mk_route_stream(Tab = ?ROUTE_TAB, {MTopic, MDest}) -> + emqx_utils_stream:ets(fun + (undefined) -> + ets:match_object(Tab, #route{topic = MTopic, dest = MDest}, 1); + (Cont) -> + ets:match_object(Cont) + end); +mk_route_stream(Tab = ?ROUTE_TAB_FILTERS, {MTopic, MDest}) -> emqx_utils_stream:map( fun routeidx_to_route/1, emqx_utils_stream:ets( fun (undefined) -> MatchSpec = #routeidx{entry = emqx_trie_search:make_pat(MTopic, MDest)}, - ets:match_object(?ROUTE_TAB_FILTERS, MatchSpec, 1); + ets:match_object(Tab, MatchSpec, 1); (Cont) -> ets:match_object(Cont) end diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 9ab111fed..845ac5af2 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -96,6 +96,11 @@ fields(topic) -> hoconsc:mk(binary(), #{ desc => <<"Node">>, required => true + })}, + {session, + hoconsc:mk(binary(), #{ + desc => <<"Session ID">>, + required => false })} ]. @@ -113,8 +118,8 @@ do_list(Params) -> try Pager = parse_pager_params(Params), {_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA), - QState = Pager#{continuation => undefined}, - QResult = eval_topic_query(qs2ms(Query), QState), + Stream = mk_topic_stream(qs2ms(Query)), + QResult = eval_topic_query(Stream, Pager, emqx_mgmt_api:init_query_result()), {200, format_list_response(Pager, Query, QResult)} catch throw:{error, page_limit_invalid} -> @@ -160,31 +165,37 @@ gen_match_spec({topic, '=:=', QTopic}, {_MTopic, MNode}) when is_atom(MNode) -> gen_match_spec({node, '=:=', QNode}, {MTopic, _MDest}) -> {MTopic, QNode}. -eval_topic_query(MS, QState) -> - finalize_query(eval_topic_query(MS, QState, emqx_mgmt_api:init_query_result())). +mk_topic_stream(Spec = {MTopic, _MDest = '_'}) -> + emqx_utils_stream:chain(emqx_router:stream(Spec), mk_persistent_topic_stream(MTopic)); +mk_topic_stream(Spec) -> + %% NOTE: Assuming that no persistent topic ever matches a query with `node` filter. + emqx_router:stream(Spec). -eval_topic_query(MS, QState, QResult) -> - case eval_topic_query_page(MS, QState) of - {Rows, '$end_of_table'} -> - {_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult), - NQResult#{complete => true}; - {Rows, NCont} -> - case emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult) of - {more, NQResult} -> - eval_topic_query(MS, QState#{continuation := NCont}, NQResult); - {enough, NQResult} -> - NQResult#{complete => false} - end; - '$end_of_table' -> - QResult#{complete => true} +mk_persistent_topic_stream(Spec) -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + emqx_persistent_session_ds_router:stream(Spec); + false -> + emqx_utils_stream:empty() end. -eval_topic_query_page(MS, #{limit := Limit, continuation := Cont}) -> - emqx_router:select(MS, Limit, Cont). +eval_topic_query(Stream, QState = #{limit := Limit}, QResult) -> + case emqx_utils_stream:consume(Limit, Stream) of + {Rows, NStream} -> + case emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult) of + {more, NQResult} -> + eval_topic_query(NStream, QState, NQResult); + {enough, NQResult} -> + finalize_query(false, NQResult) + end; + Rows when is_list(Rows) -> + {_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult), + finalize_query(true, NQResult) + end. -finalize_query(QResult = #{overflow := Overflow, complete := Complete}) -> +finalize_query(Complete, QResult = #{overflow := Overflow}) -> HasNext = Overflow orelse not Complete, - QResult#{hasnext => HasNext}. + QResult#{complete => Complete, hasnext => HasNext}. format_list_response(Meta, Query, QResult = #{rows := RowsAcc}) -> #{ @@ -205,7 +216,9 @@ format_response_meta(Meta, _Query, #{hasnext := HasNext}) -> format(#route{topic = Topic, dest = {Group, Node}}) -> #{topic => ?SHARE(Group, Topic), node => Node}; format(#route{topic = Topic, dest = Node}) when is_atom(Node) -> - #{topic => Topic, node => Node}. + #{topic => Topic, node => Node}; +format(#route{topic = Topic, dest = SessionId}) when is_binary(SessionId) -> + #{topic => Topic, session => SessionId}. topic_param(In) -> { diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 574f790fc..e650b802b 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -56,12 +56,10 @@ client_msgs_testcases() -> ]. init_per_suite(Config) -> - ok = snabbkaffe:start_trace(), emqx_mgmt_api_test_util:init_suite(), Config. end_per_suite(_) -> - ok = snabbkaffe:stop(), emqx_mgmt_api_test_util:end_suite(). init_per_group(persistent_sessions, Config) -> @@ -95,10 +93,15 @@ end_per_group(persistent_sessions, Config) -> end_per_group(_Group, _Config) -> ok. +init_per_testcase(_TC, Config) -> + ok = snabbkaffe:start_trace(), + Config. + end_per_testcase(TC, _Config) when TC =:= t_inflight_messages; TC =:= t_mqueue_messages -> + ok = snabbkaffe:stop(), ClientId = atom_to_binary(TC), lists:foreach(fun(P) -> exit(P, kill) end, emqx_cm:lookup_channels(local, ClientId)), ok = emqx_common_test_helpers:wait_for( @@ -108,7 +111,7 @@ end_per_testcase(TC, _Config) when 5000 ); end_per_testcase(_TC, _Config) -> - ok. + ok = snabbkaffe:stop(). t_clients(_) -> process_flag(trap_exit, true), @@ -313,8 +316,7 @@ t_persistent_sessions2(Config) -> %% 2) Client connects to the same node and takes over, listed only once. C2 = connect_client(#{port => Port1, clientid => ClientId}), assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), - ok = emqtt:stop(C2), - ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]), + ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}), ?retry( 100, 20, @@ -322,9 +324,7 @@ t_persistent_sessions2(Config) -> {ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort) ) - ), - - ok + ) end, [] ), @@ -360,10 +360,7 @@ t_persistent_sessions3(Config) -> list_request(APIPort, "node=" ++ atom_to_list(N1)) ) ), - ok = emqtt:stop(C2), - ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]), - - ok + ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}) end, [] ), @@ -403,10 +400,7 @@ t_persistent_sessions4(Config) -> list_request(APIPort, "node=" ++ atom_to_list(N1)) ) ), - ok = emqtt:stop(C2), - ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]), - - ok + ok = emqtt:disconnect(C2, ?RC_SUCCESS, #{'Session-Expiry-Interval' => 0}) end, [] ), diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl index 8b362af1b..7426f9734 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl @@ -27,7 +27,7 @@ all() -> init_per_suite(Config) -> Apps = emqx_cth_suite:start( [ - emqx, + {emqx, "session_persistence.enable = true"}, emqx_management, emqx_mgmt_api_test_util:emqx_dashboard() ], @@ -204,13 +204,82 @@ t_shared_topics_invalid(_Config) -> emqx_utils_json:decode(Body, [return_maps]) ). +t_persistent_topics(_Config) -> + PersistentOpts = #{ + proto_ver => v5, + properties => #{'Session-Expiry-Interval' => 300} + }, + Client1 = client(t_persistent_topics_m1), + Client2 = client(t_persistent_topics_m2), + SessionId1 = <<"t_persistent_topics_p1">>, + SessionId2 = <<"t_persistent_topics_p2">>, + ClientPersistent1 = client(SessionId1, PersistentOpts), + ClientPersistent2 = client(SessionId2, PersistentOpts), + _ = [ + ?assertMatch({ok, _, _}, emqtt:subscribe(Client, Topic)) + || {Client, Topics} <- [ + {Client1, [<<"t/client/mem">>, <<"t/+">>]}, + {Client2, [<<"t/client/mem">>, <<"t/+">>]}, + {ClientPersistent1, [<<"t/persistent/#">>, <<"t/client/ps">>, <<"t/+">>]}, + {ClientPersistent2, [<<"t/persistent/#">>, <<"t/client/ps">>, <<"t/+">>]} + ], + Topic <- Topics + ], + Matched = request_json(get, ["topics"]), + ?assertMatch( + #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 8}, + maps:get(<<"meta">>, Matched) + ), + %% Get back both topics for both persistent and in-memory subscriptions. + Expected = [ + #{<<"topic">> => <<"t/+">>, <<"node">> => atom_to_binary(node())}, + #{<<"topic">> => <<"t/+">>, <<"session">> => SessionId1}, + #{<<"topic">> => <<"t/+">>, <<"session">> => SessionId2}, + #{<<"topic">> => <<"t/client/mem">>, <<"node">> => atom_to_binary(node())}, + #{<<"topic">> => <<"t/client/ps">>, <<"session">> => SessionId1}, + #{<<"topic">> => <<"t/client/ps">>, <<"session">> => SessionId2}, + #{<<"topic">> => <<"t/persistent/#">>, <<"session">> => SessionId1}, + #{<<"topic">> => <<"t/persistent/#">>, <<"session">> => SessionId2} + ], + ?assertEqual( + lists:sort(Expected), + lists:sort(maps:get(<<"data">>, Matched)) + ), + %% Are results the same when paginating? + #{<<"data">> := Page1} = request_json(get, ["topics"], [{"page", "1"}, {"limit", "3"}]), + #{<<"data">> := Page2} = request_json(get, ["topics"], [{"page", "2"}, {"limit", "3"}]), + #{<<"data">> := Page3} = request_json(get, ["topics"], [{"page", "3"}, {"limit", "3"}]), + ?assertEqual( + lists:sort(Expected), + lists:sort(Page1 ++ Page2 ++ Page3) + ), + %% Filtering by node makes no sense for persistent sessions. + ?assertMatch( + #{ + <<"data">> := [ + #{<<"topic">> := <<"t/client/mem">>, <<"node">> := _}, + #{<<"topic">> := <<"t/+">>, <<"node">> := _} + ], + <<"meta">> := #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 2} + }, + request_json(get, ["topics"], [{"node", atom_to_list(node())}]) + ). + %% Utilities client(Name) -> - {ok, Client} = emqtt:start_link(#{ - username => emqx_utils_conv:bin(Name), - clientid => emqx_utils_conv:bin(Name) - }), + client(Name, #{}). + +client(Name, Overrides) -> + {ok, Client} = emqtt:start_link( + maps:merge( + #{ + username => emqx_utils_conv:bin(Name), + clientid => emqx_utils_conv:bin(Name) + }, + Overrides + ) + ), {ok, _} = emqtt:connect(Client), Client.