fix(shared_sub): restore support for `$queue/` shared subscription
Partially addresses: https://emqx.atlassian.net/browse/EMQX-4589 There's still a problem with the handling of shared groups that is _not particular to the `$queue/` syntax_ and preexistent that the same client cannot subscribe to _different groups_ that have the same _topic filter_.
This commit is contained in:
parent
ab5fd1e5c3
commit
1fdcfba629
|
@ -33,12 +33,15 @@
|
||||||
-define(ERTS_MINIMUM_REQUIRED, "10.0").
|
-define(ERTS_MINIMUM_REQUIRED, "10.0").
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Topics' prefix: $SYS | $share
|
%% Topics' prefix: $SYS | $queue | $share
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% System topic
|
%% System topic
|
||||||
-define(SYSTOP, <<"$SYS/">>).
|
-define(SYSTOP, <<"$SYS/">>).
|
||||||
|
|
||||||
|
%% Queue topic
|
||||||
|
-define(QUEUE, <<"$queue/">>).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% alarms
|
%% alarms
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -244,8 +244,12 @@ parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
|
||||||
parse(TopicFilter, Options).
|
parse(TopicFilter, Options).
|
||||||
|
|
||||||
-spec parse(topic(), map()) -> {topic(), map()}.
|
-spec parse(topic(), map()) -> {topic(), map()}.
|
||||||
|
parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
|
||||||
|
error({invalid_topic_filter, TopicFilter});
|
||||||
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
|
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
|
||||||
error({invalid_topic_filter, TopicFilter});
|
error({invalid_topic_filter, TopicFilter});
|
||||||
|
parse(<<"$queue/", TopicFilter/binary>>, Options) ->
|
||||||
|
parse(TopicFilter, Options#{share => <<"$queue">>});
|
||||||
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
|
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
|
||||||
case binary:split(Rest, <<"/">>) of
|
case binary:split(Rest, <<"/">>) of
|
||||||
[_Any] ->
|
[_Any] ->
|
||||||
|
|
|
@ -444,7 +444,7 @@ systopic_mon() ->
|
||||||
sharetopic() ->
|
sharetopic() ->
|
||||||
?LET(
|
?LET(
|
||||||
{Type, Grp, T},
|
{Type, Grp, T},
|
||||||
{oneof([<<"$share">>]), list(latin_char()), normal_topic()},
|
{oneof([<<"$queue">>, <<"$share">>]), list(latin_char()), normal_topic()},
|
||||||
<<Type/binary, "/", (iolist_to_binary(Grp))/binary, "/", T/binary>>
|
<<Type/binary, "/", (iolist_to_binary(Grp))/binary, "/", T/binary>>
|
||||||
).
|
).
|
||||||
|
|
||||||
|
|
|
@ -20,8 +20,10 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-define(SUITE, ?MODULE).
|
-define(SUITE, ?MODULE).
|
||||||
|
|
||||||
|
@ -986,6 +988,112 @@ t_session_kicked(Config) when is_list(Config) ->
|
||||||
?assertEqual([], collect_msgs(0)),
|
?assertEqual([], collect_msgs(0)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% FIXME: currently doesn't work
|
||||||
|
%% t_different_groups_same_topic({init, Config}) ->
|
||||||
|
%% TestName = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
%% ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
|
||||||
|
%% {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
|
||||||
|
%% {ok, _} = emqtt:connect(C),
|
||||||
|
%% [{client, C}, {clientid, ClientId} | Config];
|
||||||
|
%% t_different_groups_same_topic({'end', Config}) ->
|
||||||
|
%% C = ?config(client, Config),
|
||||||
|
%% emqtt:stop(C),
|
||||||
|
%% ok;
|
||||||
|
%% t_different_groups_same_topic(Config) when is_list(Config) ->
|
||||||
|
%% C = ?config(client, Config),
|
||||||
|
%% ClientId = ?config(clientid, Config),
|
||||||
|
%% %% Subscribe and unsubscribe to both $queue and $shared topics
|
||||||
|
%% Topic = <<"t/1">>,
|
||||||
|
%% SharedTopic0 = <<"$share/aa/", Topic/binary>>,
|
||||||
|
%% SharedTopic1 = <<"$share/bb/", Topic/binary>>,
|
||||||
|
%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic0, 2}),
|
||||||
|
%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic1, 2}),
|
||||||
|
|
||||||
|
%% Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
|
||||||
|
%% emqx:publish(Message0),
|
||||||
|
%% ?assertMatch([ {publish, #{payload := <<"hi">>}}
|
||||||
|
%% , {publish, #{payload := <<"hi">>}}
|
||||||
|
%% ], collect_msgs(5_000), #{routes => ets:tab2list(emqx_route)}),
|
||||||
|
|
||||||
|
%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic0),
|
||||||
|
%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic1),
|
||||||
|
|
||||||
|
%% ok.
|
||||||
|
|
||||||
|
t_queue_subscription({init, Config}) ->
|
||||||
|
TestName = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
|
||||||
|
|
||||||
|
{ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(C),
|
||||||
|
|
||||||
|
[{client, C}, {clientid, ClientId} | Config];
|
||||||
|
t_queue_subscription({'end', Config}) ->
|
||||||
|
C = ?config(client, Config),
|
||||||
|
emqtt:stop(C),
|
||||||
|
ok;
|
||||||
|
t_queue_subscription(Config) when is_list(Config) ->
|
||||||
|
C = ?config(client, Config),
|
||||||
|
ClientId = ?config(clientid, Config),
|
||||||
|
%% Subscribe and unsubscribe to both $queue and $shared topics
|
||||||
|
Topic = <<"t/1">>,
|
||||||
|
QueueTopic = <<"$queue/", Topic/binary>>,
|
||||||
|
SharedTopic = <<"$share/aa/", Topic/binary>>,
|
||||||
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {QueueTopic, 2}),
|
||||||
|
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {SharedTopic, 2}),
|
||||||
|
|
||||||
|
%% FIXME: we should actually see 2 routes, one for each group
|
||||||
|
%% ($queue and aa), but currently the latest subscription
|
||||||
|
%% overwrites the existing one.
|
||||||
|
?retry(
|
||||||
|
_Sleep0 = 100,
|
||||||
|
_Attempts0 = 50,
|
||||||
|
begin
|
||||||
|
ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
|
||||||
|
%% FIXME: should ensure we have 2 subscriptions
|
||||||
|
true = emqx_router:has_routes(Topic)
|
||||||
|
end
|
||||||
|
),
|
||||||
|
|
||||||
|
%% now publish to the underlying topic
|
||||||
|
Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
|
||||||
|
emqx:publish(Message0),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
{publish, #{payload := <<"hi">>}}
|
||||||
|
%% FIXME: should receive one message from each group
|
||||||
|
%% , {publish, #{payload := <<"hi">>}}
|
||||||
|
],
|
||||||
|
collect_msgs(5_000)
|
||||||
|
),
|
||||||
|
|
||||||
|
{ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, QueueTopic),
|
||||||
|
%% FIXME: return code should be success instead of 17 ("no_subscription_existed")
|
||||||
|
{ok, _, [?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(C, SharedTopic),
|
||||||
|
|
||||||
|
%% FIXME: this should eventually be true, but currently we leak
|
||||||
|
%% the previous group subscription...
|
||||||
|
%% ?retry(
|
||||||
|
%% _Sleep0 = 100,
|
||||||
|
%% _Attempts0 = 50,
|
||||||
|
%% begin
|
||||||
|
%% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
|
||||||
|
%% false = emqx_router:has_routes(Topic)
|
||||||
|
%% end
|
||||||
|
%% ),
|
||||||
|
ct:sleep(500),
|
||||||
|
|
||||||
|
Message1 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hello">>),
|
||||||
|
emqx:publish(Message1),
|
||||||
|
%% FIXME: we should *not* receive any messages...
|
||||||
|
%% ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}),
|
||||||
|
%% This is from the leaked group...
|
||||||
|
?assertMatch([{publish, #{topic := Topic}}], collect_msgs(1_000), #{
|
||||||
|
routes => ets:tab2list(emqx_route)
|
||||||
|
}),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% help functions
|
%% help functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -211,6 +211,10 @@ t_systop(_) ->
|
||||||
?assertEqual(SysTop2, systop(<<"abc">>)).
|
?assertEqual(SysTop2, systop(<<"abc">>)).
|
||||||
|
|
||||||
t_feed_var(_) ->
|
t_feed_var(_) ->
|
||||||
|
?assertEqual(
|
||||||
|
<<"$queue/client/clientId">>,
|
||||||
|
feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)
|
||||||
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
<<"username/test/client/x">>,
|
<<"username/test/client/x">>,
|
||||||
feed_var(
|
feed_var(
|
||||||
|
@ -232,6 +236,10 @@ long_topic() ->
|
||||||
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]).
|
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]).
|
||||||
|
|
||||||
t_parse(_) ->
|
t_parse(_) ->
|
||||||
|
?assertError(
|
||||||
|
{invalid_topic_filter, <<"$queue/t">>},
|
||||||
|
parse(<<"$queue/t">>, #{share => <<"g">>})
|
||||||
|
),
|
||||||
?assertError(
|
?assertError(
|
||||||
{invalid_topic_filter, <<"$share/g/t">>},
|
{invalid_topic_filter, <<"$share/g/t">>},
|
||||||
parse(<<"$share/g/t">>, #{share => <<"g">>})
|
parse(<<"$share/g/t">>, #{share => <<"g">>})
|
||||||
|
@ -246,9 +254,11 @@ t_parse(_) ->
|
||||||
),
|
),
|
||||||
?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)),
|
?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)),
|
||||||
?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})),
|
?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})),
|
||||||
|
?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)),
|
||||||
?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)),
|
?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)),
|
||||||
%% The '$local' and '$fastlane' topics have been deprecated.
|
%% The '$local' and '$fastlane' topics have been deprecated.
|
||||||
?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)),
|
?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)),
|
||||||
|
?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),
|
||||||
?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)),
|
?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)),
|
||||||
?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)).
|
?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)).
|
||||||
|
|
||||||
|
|
|
@ -187,8 +187,10 @@ format(WhichNode, {{Topic, _Subscriber}, Options}) ->
|
||||||
maps:with([qos, nl, rap, rh], Options)
|
maps:with([qos, nl, rap, rh], Options)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
get_topic(Topic, #{share := <<"$queue">> = Group}) ->
|
||||||
|
filename:join([Group, Topic]);
|
||||||
get_topic(Topic, #{share := Group}) ->
|
get_topic(Topic, #{share := Group}) ->
|
||||||
emqx_topic:join([<<"$share">>, Group, Topic]);
|
filename:join([<<"$share">>, Group, Topic]);
|
||||||
get_topic(Topic, _) ->
|
get_topic(Topic, _) ->
|
||||||
Topic.
|
Topic.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Restored support for the special `$queue/` shared subscription.
|
Loading…
Reference in New Issue