diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index b776adab4..b1b1e2a00 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -40,15 +40,12 @@ -define(ERTS_MINIMUM_REQUIRED, "10.0"). %%-------------------------------------------------------------------- -%% Topics' prefix: $SYS | $queue | $share +%% Topics' prefix: $SYS | $share %%-------------------------------------------------------------------- %% System topic -define(SYSTOP, <<"$SYS/">>). -%% Queue topic --define(QUEUE, <<"$queue/">>). - %%-------------------------------------------------------------------- %% alarms %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 0f0a8bf1c..2941a51fe 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -210,12 +210,8 @@ parse({TopicFilter, Options}) when is_binary(TopicFilter) -> parse(TopicFilter, Options). -spec parse(topic(), map()) -> {topic(), map()}. -parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) -> - error({invalid_topic_filter, TopicFilter}); parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) -> error({invalid_topic_filter, TopicFilter}); -parse(<<"$queue/", TopicFilter/binary>>, Options) -> - parse(TopicFilter, Options#{share => <<"$queue">>}); parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> case binary:split(Rest, <<"/">>) of [_Any] -> diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 78fab0b38..0be10f476 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -424,7 +424,7 @@ systopic_mon() -> sharetopic() -> ?LET( {Type, Grp, T}, - {oneof([<<"$queue">>, <<"$share">>]), list(latin_char()), normal_topic()}, + {oneof([<<"$share">>]), list(latin_char()), normal_topic()}, <> ). diff --git a/apps/emqx/test/emqx_topic_SUITE.erl b/apps/emqx/test/emqx_topic_SUITE.erl index 13383b69e..ff0ec92a0 100644 --- a/apps/emqx/test/emqx_topic_SUITE.erl +++ b/apps/emqx/test/emqx_topic_SUITE.erl @@ -186,10 +186,6 @@ t_systop(_) -> ?assertEqual(SysTop2, systop(<<"abc">>)). t_feed_var(_) -> - ?assertEqual( - <<"$queue/client/clientId">>, - feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>) - ), ?assertEqual( <<"username/test/client/x">>, feed_var( @@ -211,10 +207,6 @@ long_topic() -> iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]). t_parse(_) -> - ?assertError( - {invalid_topic_filter, <<"$queue/t">>}, - parse(<<"$queue/t">>, #{share => <<"g">>}) - ), ?assertError( {invalid_topic_filter, <<"$share/g/t">>}, parse(<<"$share/g/t">>, #{share => <<"g">>}) @@ -229,11 +221,9 @@ t_parse(_) -> ), ?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)), ?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})), - ?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)), ?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)), %% The '$local' and '$fastlane' topics have been deprecated. ?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)), - ?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)), ?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)), ?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)). diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 03b833e84..062b915b2 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -182,10 +182,8 @@ format({_Subscriber, Topic, Options}) -> maps:with([qos, nl, rap, rh], Options) ). -get_topic(Topic, #{share := <<"$queue">> = Group}) -> - filename:join([Group, Topic]); get_topic(Topic, #{share := Group}) -> - filename:join([<<"$share">>, Group, Topic]); + emqx_topic:join([<<"$share">>, Group, Topic]); get_topic(Topic, _) -> Topic. diff --git a/changes/v5.0.12-en.md b/changes/v5.0.12-en.md index bb3df6bc7..5c6f86832 100644 --- a/changes/v5.0.12-en.md +++ b/changes/v5.0.12-en.md @@ -8,4 +8,7 @@ - Start building MacOS packages for Apple Silicon hadrdware [#9423](https://github.com/emqx/emqx/pull/9423). +- Remove support for setting shared subscriptions using the non-standard `$queue` feature [#9412](https://github.com/emqx/emqx/pull/9412). + Shared subscriptions are now part of the MQTT spec. Use `$share` instead. + ## Bug fixes diff --git a/changes/v5.0.12-zh.md b/changes/v5.0.12-zh.md index fad19995e..53d8819e9 100644 --- a/changes/v5.0.12-zh.md +++ b/changes/v5.0.12-zh.md @@ -4,6 +4,9 @@ - 通过 `node.global_gc_interval = disabled` 来禁用全局垃圾回收 [#9418](https://github.com/emqx/emqx/pull/9418)。 +- 删除了老的共享订阅支持方式, 不再使用 `$queue` 前缀 [#9412](https://github.com/emqx/emqx/pull/9412)。 + 共享订阅自 MQTT v5.0 开始已成为协议标准,可以使用 `$share` 前缀代替 `$queue`。 + ## 修复 - 优化命令行实现, 避免输入错误指令时, 产生不必要的原子表消耗 [#9416](https://github.com/emqx/emqx/pull/9416)。