diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index faf3f4828..ac9d297de 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -33,12 +33,15 @@ -define(ERTS_MINIMUM_REQUIRED, "10.0"). %%-------------------------------------------------------------------- -%% Topics' prefix: $SYS | $share +%% Topics' prefix: $SYS | $queue | $share %%-------------------------------------------------------------------- %% System topic -define(SYSTOP, <<"$SYS/">>). +%% Queue topic +-define(QUEUE, <<"$queue/">>). + %%-------------------------------------------------------------------- %% alarms %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index c1515e14b..6d232c68d 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -244,8 +244,12 @@ parse({TopicFilter, Options}) when is_binary(TopicFilter) -> parse(TopicFilter, Options). -spec parse(topic(), map()) -> {topic(), map()}. +parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) -> + error({invalid_topic_filter, TopicFilter}); parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) -> error({invalid_topic_filter, TopicFilter}); +parse(<<"$queue/", TopicFilter/binary>>, Options) -> + parse(TopicFilter, Options#{share => <<"$queue">>}); parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> case binary:split(Rest, <<"/">>) of [_Any] -> diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 6d1ced486..0e9d3032c 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -444,7 +444,7 @@ systopic_mon() -> sharetopic() -> ?LET( {Type, Grp, T}, - {oneof([<<"$share">>]), list(latin_char()), normal_topic()}, + {oneof([<<"$queue">>, <<"$share">>]), list(latin_char()), normal_topic()}, <> ). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index e280f4fe5..4726f1111 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -20,8 +20,10 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(SUITE, ?MODULE). @@ -986,6 +988,112 @@ t_session_kicked(Config) when is_list(Config) -> ?assertEqual([], collect_msgs(0)), ok. +%% FIXME: currently doesn't work +%% t_different_groups_same_topic({init, Config}) -> +%% TestName = atom_to_binary(?FUNCTION_NAME), +%% ClientId = <>, +%% {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]), +%% {ok, _} = emqtt:connect(C), +%% [{client, C}, {clientid, ClientId} | Config]; +%% t_different_groups_same_topic({'end', Config}) -> +%% C = ?config(client, Config), +%% emqtt:stop(C), +%% ok; +%% t_different_groups_same_topic(Config) when is_list(Config) -> +%% C = ?config(client, Config), +%% ClientId = ?config(clientid, Config), +%% %% Subscribe and unsubscribe to both $queue and $shared topics +%% Topic = <<"t/1">>, +%% SharedTopic0 = <<"$share/aa/", Topic/binary>>, +%% SharedTopic1 = <<"$share/bb/", Topic/binary>>, +%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic0, 2}), +%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic1, 2}), + +%% Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>), +%% emqx:publish(Message0), +%% ?assertMatch([ {publish, #{payload := <<"hi">>}} +%% , {publish, #{payload := <<"hi">>}} +%% ], collect_msgs(5_000), #{routes => ets:tab2list(emqx_route)}), + +%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic0), +%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic1), + +%% ok. + +t_queue_subscription({init, Config}) -> + TestName = atom_to_binary(?FUNCTION_NAME), + ClientId = <>, + + {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C), + + [{client, C}, {clientid, ClientId} | Config]; +t_queue_subscription({'end', Config}) -> + C = ?config(client, Config), + emqtt:stop(C), + ok; +t_queue_subscription(Config) when is_list(Config) -> + C = ?config(client, Config), + ClientId = ?config(clientid, Config), + %% Subscribe and unsubscribe to both $queue and $shared topics + Topic = <<"t/1">>, + QueueTopic = <<"$queue/", Topic/binary>>, + SharedTopic = <<"$share/aa/", Topic/binary>>, + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {QueueTopic, 2}), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {SharedTopic, 2}), + + %% FIXME: we should actually see 2 routes, one for each group + %% ($queue and aa), but currently the latest subscription + %% overwrites the existing one. + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + begin + ct:pal("routes: ~p", [ets:tab2list(emqx_route)]), + %% FIXME: should ensure we have 2 subscriptions + true = emqx_router:has_routes(Topic) + end + ), + + %% now publish to the underlying topic + Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>), + emqx:publish(Message0), + ?assertMatch( + [ + {publish, #{payload := <<"hi">>}} + %% FIXME: should receive one message from each group + %% , {publish, #{payload := <<"hi">>}} + ], + collect_msgs(5_000) + ), + + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, QueueTopic), + %% FIXME: return code should be success instead of 17 ("no_subscription_existed") + {ok, _, [?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(C, SharedTopic), + + %% FIXME: this should eventually be true, but currently we leak + %% the previous group subscription... + %% ?retry( + %% _Sleep0 = 100, + %% _Attempts0 = 50, + %% begin + %% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]), + %% false = emqx_router:has_routes(Topic) + %% end + %% ), + ct:sleep(500), + + Message1 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hello">>), + emqx:publish(Message1), + %% FIXME: we should *not* receive any messages... + %% ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}), + %% This is from the leaked group... + ?assertMatch([{publish, #{topic := Topic}}], collect_msgs(1_000), #{ + routes => ets:tab2list(emqx_route) + }), + + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_topic_SUITE.erl b/apps/emqx/test/emqx_topic_SUITE.erl index 521efe751..c49c93fb2 100644 --- a/apps/emqx/test/emqx_topic_SUITE.erl +++ b/apps/emqx/test/emqx_topic_SUITE.erl @@ -211,6 +211,10 @@ t_systop(_) -> ?assertEqual(SysTop2, systop(<<"abc">>)). t_feed_var(_) -> + ?assertEqual( + <<"$queue/client/clientId">>, + feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>) + ), ?assertEqual( <<"username/test/client/x">>, feed_var( @@ -232,6 +236,10 @@ long_topic() -> iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]). t_parse(_) -> + ?assertError( + {invalid_topic_filter, <<"$queue/t">>}, + parse(<<"$queue/t">>, #{share => <<"g">>}) + ), ?assertError( {invalid_topic_filter, <<"$share/g/t">>}, parse(<<"$share/g/t">>, #{share => <<"g">>}) @@ -246,9 +254,11 @@ t_parse(_) -> ), ?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)), ?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})), + ?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)), ?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)), %% The '$local' and '$fastlane' topics have been deprecated. ?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)), + ?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)), ?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)), ?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)). diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index dc35f5b97..4f43d0588 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -187,8 +187,10 @@ format(WhichNode, {{Topic, _Subscriber}, Options}) -> maps:with([qos, nl, rap, rh], Options) ). +get_topic(Topic, #{share := <<"$queue">> = Group}) -> + filename:join([Group, Topic]); get_topic(Topic, #{share := Group}) -> - emqx_topic:join([<<"$share">>, Group, Topic]); + filename:join([<<"$share">>, Group, Topic]); get_topic(Topic, _) -> Topic. diff --git a/changes/ce/fix-11281.en.md b/changes/ce/fix-11281.en.md new file mode 100644 index 000000000..a73159343 --- /dev/null +++ b/changes/ce/fix-11281.en.md @@ -0,0 +1 @@ +Restored support for the special `$queue/` shared subscription.