diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_mgmt_api_subscription_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_mgmt_api_subscription_SUITE.erl new file mode 100644 index 000000000..ce73aa59f --- /dev/null +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_mgmt_api_subscription_SUITE.erl @@ -0,0 +1,96 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_mgmt_api_subscription_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(CLIENTID, <<"api_clientid">>). +-define(USERNAME, <<"api_username">>). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, + "durable_sessions {\n" + " enable = true\n" + " renew_streams_interval = 10ms\n" + "}"}, + {emqx_ds_shared_sub, #{ + config => #{ + <<"durable_queues">> => #{ + <<"enable">> => true, + <<"session_find_leader_timeout_ms">> => "1200ms" + } + } + }}, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). + +init_per_testcase(_TC, Config) -> + ClientConfig = #{ + username => ?USERNAME, + clientid => ?CLIENTID, + proto_ver => v5, + clean_start => true, + properties => #{'Session-Expiry-Interval' => 300} + }, + + {ok, Client} = emqtt:start_link(ClientConfig), + {ok, _} = emqtt:connect(Client), + [{client_config, ClientConfig}, {client, Client} | Config]. + +end_per_testcase(_TC, Config) -> + Client = proplists:get_value(client, Config), + emqtt:disconnect(Client). + +t_list_with_shared_sub(_Config) -> + Client = proplists:get_value(client, _Config), + RealTopic = <<"t/+">>, + Topic = <<"$share/g1/", RealTopic/binary>>, + + {ok, _, _} = emqtt:subscribe(Client, Topic), + {ok, _, _} = emqtt:subscribe(Client, RealTopic), + + QS0 = [ + {"clientid", ?CLIENTID}, + {"match_topic", "t/#"} + ], + Headers = emqx_mgmt_api_test_util:auth_header_(), + + ?assertMatch( + #{<<"data">> := [#{<<"clientid">> := ?CLIENTID}, #{<<"clientid">> := ?CLIENTID}]}, + request_json(get, QS0, Headers) + ), + + QS1 = [ + {"clientid", ?CLIENTID}, + {"share_group", "g1"} + ], + + ?assertMatch( + #{<<"data">> := [#{<<"clientid">> := ?CLIENTID, <<"topic">> := <<"$share/g1/t/+">>}]}, + request_json(get, QS1, Headers) + ). + +request_json(Method, Query, Headers) when is_list(Query) -> + Qs = uri_string:compose_query(Query), + {ok, MatchRes} = emqx_mgmt_api_test_util:request_api(Method, path(), Qs, Headers), + emqx_utils_json:decode(MatchRes, [return_maps]). + +path() -> + emqx_mgmt_api_test_util:api_path(["subscriptions"]). diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index c4aa55463..b662061a6 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -242,20 +242,25 @@ do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} = %% TODO: filtering by client ID can be implemented more efficiently: FilterTopic = maps:get(<<"topic">>, QString, '_'), Stream0 = emqx_persistent_session_ds_router:stream(FilterTopic), + SubPred = fun(Sub) -> - compare_optional(<<"topic">>, QString, topic, Sub) andalso + compare_optional(<<"topic">>, QString, '_real_topic', Sub) andalso compare_optional(<<"clientid">>, QString, clientid, Sub) andalso compare_optional(<<"qos">>, QString, qos, Sub) andalso - compare_match_topic_optional(<<"match_topic">>, QString, topic, Sub) + compare_optional(<<"share_group">>, QString, '_group', Sub) andalso + compare_match_topic_optional(<<"match_topic">>, QString, '_real_topic', Sub) end, NDropped = (Page - 1) * Limit, {_, Stream} = consume_n_matching( fun persistent_route_to_subscription/1, SubPred, NDropped, Stream0 ), - {Subscriptions, Stream1} = consume_n_matching( + {Subscriptions0, Stream1} = consume_n_matching( fun persistent_route_to_subscription/1, SubPred, Limit, Stream ), HasNext = Stream1 =/= [], + Subscriptions1 = lists:map( + fun remove_temp_match_fields/1, Subscriptions0 + ), Meta = case maps:is_key(<<"match_topic">>, QString) orelse maps:is_key(<<"qos">>, QString) of true -> @@ -276,7 +281,7 @@ do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} = #{ meta => Meta, - data => Subscriptions + data => Subscriptions1 }. compare_optional(QField, Query, SField, Subscription) -> @@ -329,37 +334,58 @@ consume_n_matching(Map, Pred, N, S0, Acc) -> end. persistent_route_to_subscription(#route{dest = Dest} = Route) -> - case get_client_subscription(Route) of - #{subopts := SubOpts} -> - #{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts, - #{ - topic => format_topic(Route), - clientid => session_id(Dest), - node => all, + Sub = + case get_client_subscription(Route) of + #{subopts := SubOpts} -> + #{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts, + #{ + topic => format_topic(Route), + clientid => session_id(Dest), + node => all, - qos => Qos, - nl => Nl, - rh => Rh, - rap => Rap, - durable => true - }; - undefined -> - #{ - topic => format_topic(Route), - clientid => session_id(Dest), - node => all, - durable => true - } - end. + qos => Qos, + nl => Nl, + rh => Rh, + rap => Rap, + durable => true + }; + undefined -> + #{ + topic => format_topic(Route), + clientid => session_id(Dest), + node => all, + durable => true + } + end, + add_temp_match_fields(Route, Sub). -get_client_subscription(#route{topic = Topic, dest = #share_dest{session_id = SessionId, group = Group}}) -> - emqx_persistent_session_ds:get_client_subscription(SessionId, #share{topic = Topic, group = Group}); +get_client_subscription(#route{ + topic = Topic, dest = #share_dest{session_id = SessionId, group = Group} +}) -> + emqx_persistent_session_ds:get_client_subscription(SessionId, #share{ + topic = Topic, group = Group + }); get_client_subscription(#route{topic = Topic, dest = SessionId}) -> emqx_persistent_session_ds:get_client_subscription(SessionId, Topic). session_id(#share_dest{session_id = SessionId}) -> SessionId; session_id(SessionId) -> SessionId. +add_temp_match_fields(Route, Sub) -> + add_temp_match_fields(['_real_topic', '_group'], Route, Sub). + +add_temp_match_fields([], _Route, Sub) -> + Sub; +add_temp_match_fields(['_real_topic' | Rest], #route{topic = Topic} = Route, Sub) -> + add_temp_match_fields(Rest, Route, Sub#{'_real_topic' => Topic}); +add_temp_match_fields(['_group' | Rest], #route{dest = #share_dest{group = Group}} = Route, Sub) -> + add_temp_match_fields(Rest, Route, Sub#{'_group' => Group}); +add_temp_match_fields(['_group' | Rest], Route, Sub) -> + add_temp_match_fields(Rest, Route, Sub#{'_group' => undefined}). + +remove_temp_match_fields(Sub) -> + maps:without(['_real_topic', '_group'], Sub). + format_topic(#route{topic = Topic, dest = #share_dest{group = Group}}) -> <<"$share/", Group/binary, "/", Topic/binary>>; format_topic(#route{topic = Topic}) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl index 9a55fa1a0..274e0c5dd 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl @@ -47,7 +47,8 @@ groups() -> CommonTCs = AllTCs -- persistent_only_tcs(), [ {mem, CommonTCs}, - %% Shared subscriptions are currently not supported: + %% Persistent shared subscriptions are an EE app. + %% So they are tested outside emqx_management app which is CE. {persistent, (CommonTCs -- [t_list_with_shared_sub, t_subscription_api]) ++ persistent_only_tcs()} ].