feat(queue): add shared sub support to the management API

This commit is contained in:
Ilya Averyanov 2024-07-12 17:22:11 +03:00
parent 9b30320ddb
commit f0dd1bc4f4
3 changed files with 151 additions and 28 deletions

View File

@ -0,0 +1,96 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_mgmt_api_subscription_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CLIENTID, <<"api_clientid">>).
-define(USERNAME, <<"api_username">>).
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"durable_sessions {\n"
" enable = true\n"
" renew_streams_interval = 10ms\n"
"}"},
{emqx_ds_shared_sub, #{
config => #{
<<"durable_queues">> => #{
<<"enable">> => true,
<<"session_find_leader_timeout_ms">> => "1200ms"
}
}
}},
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(_TC, Config) ->
ClientConfig = #{
username => ?USERNAME,
clientid => ?CLIENTID,
proto_ver => v5,
clean_start => true,
properties => #{'Session-Expiry-Interval' => 300}
},
{ok, Client} = emqtt:start_link(ClientConfig),
{ok, _} = emqtt:connect(Client),
[{client_config, ClientConfig}, {client, Client} | Config].
end_per_testcase(_TC, Config) ->
Client = proplists:get_value(client, Config),
emqtt:disconnect(Client).
t_list_with_shared_sub(_Config) ->
Client = proplists:get_value(client, _Config),
RealTopic = <<"t/+">>,
Topic = <<"$share/g1/", RealTopic/binary>>,
{ok, _, _} = emqtt:subscribe(Client, Topic),
{ok, _, _} = emqtt:subscribe(Client, RealTopic),
QS0 = [
{"clientid", ?CLIENTID},
{"match_topic", "t/#"}
],
Headers = emqx_mgmt_api_test_util:auth_header_(),
?assertMatch(
#{<<"data">> := [#{<<"clientid">> := ?CLIENTID}, #{<<"clientid">> := ?CLIENTID}]},
request_json(get, QS0, Headers)
),
QS1 = [
{"clientid", ?CLIENTID},
{"share_group", "g1"}
],
?assertMatch(
#{<<"data">> := [#{<<"clientid">> := ?CLIENTID, <<"topic">> := <<"$share/g1/t/+">>}]},
request_json(get, QS1, Headers)
).
request_json(Method, Query, Headers) when is_list(Query) ->
Qs = uri_string:compose_query(Query),
{ok, MatchRes} = emqx_mgmt_api_test_util:request_api(Method, path(), Qs, Headers),
emqx_utils_json:decode(MatchRes, [return_maps]).
path() ->
emqx_mgmt_api_test_util:api_path(["subscriptions"]).

View File

@ -242,20 +242,25 @@ do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} =
%% TODO: filtering by client ID can be implemented more efficiently:
FilterTopic = maps:get(<<"topic">>, QString, '_'),
Stream0 = emqx_persistent_session_ds_router:stream(FilterTopic),
SubPred = fun(Sub) ->
compare_optional(<<"topic">>, QString, topic, Sub) andalso
compare_optional(<<"topic">>, QString, '_real_topic', Sub) andalso
compare_optional(<<"clientid">>, QString, clientid, Sub) andalso
compare_optional(<<"qos">>, QString, qos, Sub) andalso
compare_match_topic_optional(<<"match_topic">>, QString, topic, Sub)
compare_optional(<<"share_group">>, QString, '_group', Sub) andalso
compare_match_topic_optional(<<"match_topic">>, QString, '_real_topic', Sub)
end,
NDropped = (Page - 1) * Limit,
{_, Stream} = consume_n_matching(
fun persistent_route_to_subscription/1, SubPred, NDropped, Stream0
),
{Subscriptions, Stream1} = consume_n_matching(
{Subscriptions0, Stream1} = consume_n_matching(
fun persistent_route_to_subscription/1, SubPred, Limit, Stream
),
HasNext = Stream1 =/= [],
Subscriptions1 = lists:map(
fun remove_temp_match_fields/1, Subscriptions0
),
Meta =
case maps:is_key(<<"match_topic">>, QString) orelse maps:is_key(<<"qos">>, QString) of
true ->
@ -276,7 +281,7 @@ do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} =
#{
meta => Meta,
data => Subscriptions
data => Subscriptions1
}.
compare_optional(QField, Query, SField, Subscription) ->
@ -329,6 +334,7 @@ consume_n_matching(Map, Pred, N, S0, Acc) ->
end.
persistent_route_to_subscription(#route{dest = Dest} = Route) ->
Sub =
case get_client_subscription(Route) of
#{subopts := SubOpts} ->
#{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts,
@ -350,16 +356,36 @@ persistent_route_to_subscription(#route{dest = Dest} = Route) ->
node => all,
durable => true
}
end.
end,
add_temp_match_fields(Route, Sub).
get_client_subscription(#route{topic = Topic, dest = #share_dest{session_id = SessionId, group = Group}}) ->
emqx_persistent_session_ds:get_client_subscription(SessionId, #share{topic = Topic, group = Group});
get_client_subscription(#route{
topic = Topic, dest = #share_dest{session_id = SessionId, group = Group}
}) ->
emqx_persistent_session_ds:get_client_subscription(SessionId, #share{
topic = Topic, group = Group
});
get_client_subscription(#route{topic = Topic, dest = SessionId}) ->
emqx_persistent_session_ds:get_client_subscription(SessionId, Topic).
session_id(#share_dest{session_id = SessionId}) -> SessionId;
session_id(SessionId) -> SessionId.
add_temp_match_fields(Route, Sub) ->
add_temp_match_fields(['_real_topic', '_group'], Route, Sub).
add_temp_match_fields([], _Route, Sub) ->
Sub;
add_temp_match_fields(['_real_topic' | Rest], #route{topic = Topic} = Route, Sub) ->
add_temp_match_fields(Rest, Route, Sub#{'_real_topic' => Topic});
add_temp_match_fields(['_group' | Rest], #route{dest = #share_dest{group = Group}} = Route, Sub) ->
add_temp_match_fields(Rest, Route, Sub#{'_group' => Group});
add_temp_match_fields(['_group' | Rest], Route, Sub) ->
add_temp_match_fields(Rest, Route, Sub#{'_group' => undefined}).
remove_temp_match_fields(Sub) ->
maps:without(['_real_topic', '_group'], Sub).
format_topic(#route{topic = Topic, dest = #share_dest{group = Group}}) ->
<<"$share/", Group/binary, "/", Topic/binary>>;
format_topic(#route{topic = Topic}) ->

View File

@ -47,7 +47,8 @@ groups() ->
CommonTCs = AllTCs -- persistent_only_tcs(),
[
{mem, CommonTCs},
%% Shared subscriptions are currently not supported:
%% Persistent shared subscriptions are an EE app.
%% So they are tested outside emqx_management app which is CE.
{persistent,
(CommonTCs -- [t_list_with_shared_sub, t_subscription_api]) ++ persistent_only_tcs()}
].