From 180130d684af17cdf87e097a48cd3f983b0f7d36 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 16 Apr 2024 16:46:31 +0200 Subject: [PATCH] feat(sessds): List persistent subscriptions in the REST API --- .../src/emqx_mgmt_api_subscriptions.erl | 210 +++++++++++++++++- .../test/emqx_mgmt_api_subscription_SUITE.erl | 91 ++++++-- changes/ce/fix-12874.en.md | 7 + 3 files changed, 285 insertions(+), 23 deletions(-) create mode 100644 changes/ce/fix-12874.en.md diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 9976bf881..b1a8fbce2 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -176,7 +176,8 @@ format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> #{ topic => emqx_topic:maybe_format_share(Topic), clientid => maps:get(subid, SubOpts, null), - node => WhichNode + node => WhichNode, + durable => false }, maps:with([qos, nl, rap, rh], SubOpts) ). @@ -196,7 +197,22 @@ check_match_topic(#{<<"match_topic">> := MatchTopic}) -> check_match_topic(_) -> ok. -do_subscriptions_query(QString) -> +do_subscriptions_query(QString0) -> + {IsDurable, QString} = maps:take( + <<"durable">>, maps:merge(#{<<"durable">> => undefined}, QString0) + ), + case emqx_persistent_message:is_persistence_enabled() andalso IsDurable of + false -> + do_subscriptions_query_mem(QString); + true -> + do_subscriptions_query_persistent(QString); + undefined -> + merge_queries( + QString, fun do_subscriptions_query_mem/1, fun do_subscriptions_query_persistent/1 + ) + end. + +do_subscriptions_query_mem(QString) -> Args = [?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format/2], case maps:get(<<"node">>, QString, undefined) of undefined -> @@ -210,8 +226,196 @@ do_subscriptions_query(QString) -> end end. +do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} = QString) -> + Count = emqx_persistent_session_ds_router:stats(n_routes), + %% TODO: filtering by client ID can be implemented more efficiently: + FilterTopic = maps:get(<<"topic">>, QString, '_'), + Stream0 = emqx_persistent_session_ds_router:stream(FilterTopic), + SubPred = fun(Sub) -> + compare_optional(<<"topic">>, QString, topic, Sub) andalso + compare_optional(<<"clientid">>, QString, clientid, Sub) andalso + compare_optional(<<"qos">>, QString, qos, Sub) andalso + compare_match_topic_optional(<<"match_topic">>, QString, topic, Sub) + end, + NDropped = (Page - 1) * Limit, + {_, Stream} = consume_n_matching( + fun persistent_route_to_subscription/1, SubPred, NDropped, Stream0 + ), + {Subscriptions, Stream1} = consume_n_matching( + fun persistent_route_to_subscription/1, SubPred, Limit, Stream + ), + HasNext = Stream1 =/= [], + Meta = + case maps:is_key(<<"match_topic">>, QString) orelse maps:is_key(<<"qos">>, QString) of + true -> + %% Fuzzy searches shouldn't return count: + #{ + limit => Limit, + page => Page, + hasnext => HasNext + }; + false -> + #{ + count => Count, + limit => Limit, + page => Page, + hasnext => HasNext + } + end, + + #{ + meta => Meta, + data => Subscriptions + }. + +compare_optional(QField, Query, SField, Subscription) -> + case Query of + #{QField := Expected} -> + maps:get(SField, Subscription) =:= Expected; + _ -> + true + end. + +compare_match_topic_optional(QField, Query, SField, Subscription) -> + case Query of + #{QField := TopicFilter} -> + Topic = maps:get(SField, Subscription), + emqx_topic:match(Topic, TopicFilter); + _ -> + true + end. + +%% @doc Drop elements from the stream until encountered N elements +%% matching the predicate function. +-spec consume_n_matching( + fun((T) -> Q), + fun((Q) -> boolean()), + non_neg_integer(), + emqx_utils_stream:stream(T) +) -> {[Q], emqx_utils_stream:stream(T) | empty}. +consume_n_matching(Map, Pred, N, S) -> + consume_n_matching(Map, Pred, N, S, []). + +consume_n_matching(_Map, _Pred, _N, [], Acc) -> + {lists:reverse(Acc), []}; +consume_n_matching(_Map, _Pred, 0, S, Acc) -> + {lists:reverse(Acc), S}; +consume_n_matching(Map, Pred, N, S0, Acc) -> + case emqx_utils_stream:next(S0) of + [] -> + consume_n_matching(Map, Pred, N, [], Acc); + [Elem | S] -> + Mapped = Map(Elem), + case Pred(Mapped) of + true -> consume_n_matching(Map, Pred, N - 1, S, [Mapped | Acc]); + false -> consume_n_matching(Map, Pred, N, S, Acc) + end + end. + +persistent_route_to_subscription(#route{topic = Topic, dest = SessionId}) -> + case emqx_persistent_session_ds:get_client_subscription(SessionId, Topic) of + #{subopts := SubOpts} -> + #{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts, + #{ + topic => Topic, + clientid => SessionId, + node => all, + + qos => Qos, + nl => Nl, + rh => Rh, + rap => Rap, + durable => true + }; + undefined -> + #{ + topic => Topic, + clientid => SessionId, + node => all, + durable => true + } + end. + +%% @private This function merges paginated results from two sources. +%% +%% Note: this implementation is far from ideal: `count' for the +%% queries may be missing, it may be larger than the actual number of +%% elements. This may lead to empty pages that can confuse the user. +%% +%% Not much can be done to mitigate that, though: since the count may +%% be incorrect, we cannot run simple math to determine when one +%% stream begins and another ends: it requires actual iteration. +%% +%% Ideally, the dashboard must be split between durable and mem +%% subscriptions, and this function should be removed for good. +merge_queries(QString0, Q1, Q2) -> + #{<<"limit">> := Limit, <<"page">> := Page} = QString0, + C1 = resp_count(QString0, Q1), + C2 = resp_count(QString0, Q2), + Meta = + case is_number(C1) andalso is_number(C2) of + true -> + #{ + count => C1 + C2, + limit => Limit, + page => Page + }; + false -> + #{ + limit => Limit, + page => Page + } + end, + case {C1, C2} of + {_, 0} -> + %% The second query is empty. Just return the result of Q1 as usual: + Q1(QString0); + {0, _} -> + %% The first query is empty. Just return the result of Q2 as usual: + Q2(QString0); + _ when is_number(C1) -> + %% Both queries are potentially non-empty, but we at least + %% have the page number for the first query. We try to + %% stich the pages together and thus respect the limit + %% (except for the page where the results switch from Q1 + %% to Q2). + + %% Page where data from the second query is estimated to + %% begin: + Q2Page = ceil(C1 / Limit), + case Page =< Q2Page of + true -> + #{data := Data, meta := #{hasnext := HN}} = Q1(QString0), + #{ + data => Data, + meta => Meta#{hasnext => HN orelse C2 > 0} + }; + false -> + QString = QString0#{<<"page">> => Page - Q2Page}, + #{data := Data, meta := #{hasnext := HN}} = Q2(QString), + #{data => Data, meta => Meta#{hasnext => HN}} + end; + _ -> + %% We don't know how many items is there in the first + %% query, and the second query is not empty (this includes + %% the case where `C2' is `undefined'). Best we can do is + %% to interleave the queries. This may produce less + %% results per page than `Limit'. + QString = QString0#{<<"limit">> => ceil(Limit / 2)}, + #{data := D1, meta := #{hasnext := HN1}} = Q1(QString), + #{data := D2, meta := #{hasnext := HN2}} = Q2(QString), + #{ + meta => Meta#{hasnext => HN1 or HN2}, + data => D1 ++ D2 + } + end. + +resp_count(Query, QFun) -> + #{meta := Meta} = QFun(Query#{<<"limit">> => 1, <<"page">> => 1}), + maps:get(count, Meta, undefined). + %%-------------------------------------------------------------------- -%% QueryString to MatchSpec +%% QueryString to MatchSpec (mem sessions) %%-------------------------------------------------------------------- -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). diff --git a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl index 356ae97e4..435a837e3 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl @@ -36,17 +36,72 @@ -define(TOPIC_SORT, #{?TOPIC1 => 1, ?TOPIC2 => 2}). all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, mem}, + {group, persistent} + ]. + +groups() -> + CommonTCs = emqx_common_test_helpers:all(?MODULE), + [ + {mem, CommonTCs}, + %% Shared subscriptions are currently not supported: + {persistent, CommonTCs -- [t_list_with_shared_sub, t_subscription_api]} + ]. init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite(), + Apps = emqx_cth_suite:start( + [ + {emqx, + "session_persistence {\n" + " enable = true\n" + " renew_streams_interval = 10ms\n" + "}"}, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). + +init_per_group(persistent, Config) -> + ClientConfig = #{ + username => ?USERNAME, + clientid => ?CLIENTID, + proto_ver => v5, + clean_start => true, + properties => #{'Session-Expiry-Interval' => 300} + }, + [{client_config, ClientConfig}, {durable, true} | Config]; +init_per_group(mem, Config) -> + ClientConfig = #{ + username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5, clean_start => true + }, + [{client_config, ClientConfig}, {durable, false} | Config]. + +end_per_group(_, Config) -> Config. -end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite(). +init_per_testcase(_TC, Config) -> + case ?config(client_config, Config) of + ClientConfig when is_map(ClientConfig) -> + {ok, Client} = emqtt:start_link(ClientConfig), + {ok, _} = emqtt:connect(Client), + [{client, Client} | Config]; + _ -> + Config + end. + +end_per_testcase(_TC, Config) -> + Client = proplists:get_value(client, Config), + emqtt:disconnect(Client). t_subscription_api(Config) -> Client = proplists:get_value(client, Config), + Durable = atom_to_list(?config(durable, Config)), {ok, _, _} = emqtt:subscribe( Client, [ {?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]} @@ -54,12 +109,13 @@ t_subscription_api(Config) -> ), {ok, _, _} = emqtt:subscribe(Client, ?TOPIC2), Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]), + timer:sleep(100), {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path), Data = emqx_utils_json:decode(Response, [return_maps]), Meta = maps:get(<<"meta">>, Data), ?assertEqual(1, maps:get(<<"page">>, Meta)), ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)), - ?assertEqual(2, maps:get(<<"count">>, Meta)), + ?assertEqual(2, maps:get(<<"count">>, Meta), Data), Subscriptions = maps:get(<<"data">>, Data), ?assertEqual(length(Subscriptions), 2), Sort = @@ -90,7 +146,8 @@ t_subscription_api(Config) -> {"node", atom_to_list(node())}, {"qos", "0"}, {"share_group", "test_group"}, - {"match_topic", "t/#"} + {"match_topic", "t/#"}, + {"durable", Durable} ], Headers = emqx_mgmt_api_test_util:auth_header_(), @@ -103,6 +160,7 @@ t_subscription_api(Config) -> t_subscription_fuzzy_search(Config) -> Client = proplists:get_value(client, Config), + Durable = atom_to_list(?config(durable, Config)), Topics = [ <<"t/foo">>, <<"t/foo/bar">>, @@ -116,7 +174,8 @@ t_subscription_fuzzy_search(Config) -> MatchQs = [ {"clientid", ?CLIENTID}, {"node", atom_to_list(node())}, - {"match_topic", "t/#"} + {"match_topic", "t/#"}, + {"durable", Durable} ], MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers), @@ -130,12 +189,13 @@ t_subscription_fuzzy_search(Config) -> LimitMatchQuery = [ {"clientid", ?CLIENTID}, {"match_topic", "+/+/+"}, - {"limit", "3"} + {"limit", "3"}, + {"durable", Durable} ], MatchData2 = #{<<"meta">> := MatchMeta2} = request_json(get, LimitMatchQuery, Headers), ?assertEqual(#{<<"page">> => 1, <<"limit">> => 3, <<"hasnext">> => true}, MatchMeta2), - ?assertEqual(3, length(maps:get(<<"data">>, MatchData2))), + ?assertEqual(3, length(maps:get(<<"data">>, MatchData2)), MatchData2), MatchData2P2 = #{<<"meta">> := MatchMeta2P2} = @@ -176,8 +236,8 @@ t_list_with_shared_sub(_Config) -> ok. -t_list_with_invalid_match_topic(_Config) -> - Client = proplists:get_value(client, _Config), +t_list_with_invalid_match_topic(Config) -> + Client = proplists:get_value(client, Config), RealTopic = <<"t/+">>, Topic = <<"$share/g1/", RealTopic/binary>>, @@ -212,12 +272,3 @@ request_json(Method, Query, Headers) when is_list(Query) -> path() -> emqx_mgmt_api_test_util:api_path(["subscriptions"]). - -init_per_testcase(_TC, Config) -> - {ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}), - {ok, _} = emqtt:connect(Client), - [{client, Client} | Config]. - -end_per_testcase(_TC, Config) -> - Client = proplists:get_value(client, Config), - emqtt:disconnect(Client). diff --git a/changes/ce/fix-12874.en.md b/changes/ce/fix-12874.en.md new file mode 100644 index 000000000..1a5814b07 --- /dev/null +++ b/changes/ce/fix-12874.en.md @@ -0,0 +1,7 @@ +- Ensure consistency of the durable message replay when the subscriptions are modified before session reconnects + +- Persistent sessions save inflight packet IDs for the received QoS2 messages + +- Make behavior of the persistent sessions consistent with the non-persistent sessions in regard to overlapping subscriptions + +- List persistent subscriptions in the REST API