Merge pull request #9412 from olcai/remove-dollar-queue

feat: remove $queue in favor of $share
This commit is contained in:
Zaiming (Stone) Shi 2022-11-28 15:54:56 +01:00 committed by GitHub
commit 6a76a2d885
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 9 additions and 22 deletions

View File

@ -40,15 +40,12 @@
-define(ERTS_MINIMUM_REQUIRED, "10.0").
%%--------------------------------------------------------------------
%% Topics' prefix: $SYS | $queue | $share
%% Topics' prefix: $SYS | $share
%%--------------------------------------------------------------------
%% System topic
-define(SYSTOP, <<"$SYS/">>).
%% Queue topic
-define(QUEUE, <<"$queue/">>).
%%--------------------------------------------------------------------
%% alarms
%%--------------------------------------------------------------------

View File

@ -210,12 +210,8 @@ parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
parse(TopicFilter, Options).
-spec parse(topic(), map()) -> {topic(), map()}.
parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter});
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter});
parse(<<"$queue/", TopicFilter/binary>>, Options) ->
parse(TopicFilter, Options#{share => <<"$queue">>});
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
case binary:split(Rest, <<"/">>) of
[_Any] ->

View File

@ -424,7 +424,7 @@ systopic_mon() ->
sharetopic() ->
?LET(
{Type, Grp, T},
{oneof([<<"$queue">>, <<"$share">>]), list(latin_char()), normal_topic()},
{oneof([<<"$share">>]), list(latin_char()), normal_topic()},
<<Type/binary, "/", (iolist_to_binary(Grp))/binary, "/", T/binary>>
).

View File

@ -186,10 +186,6 @@ t_systop(_) ->
?assertEqual(SysTop2, systop(<<"abc">>)).
t_feed_var(_) ->
?assertEqual(
<<"$queue/client/clientId">>,
feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)
),
?assertEqual(
<<"username/test/client/x">>,
feed_var(
@ -211,10 +207,6 @@ long_topic() ->
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]).
t_parse(_) ->
?assertError(
{invalid_topic_filter, <<"$queue/t">>},
parse(<<"$queue/t">>, #{share => <<"g">>})
),
?assertError(
{invalid_topic_filter, <<"$share/g/t">>},
parse(<<"$share/g/t">>, #{share => <<"g">>})
@ -229,11 +221,9 @@ t_parse(_) ->
),
?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)),
?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})),
?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)),
?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)),
%% The '$local' and '$fastlane' topics have been deprecated.
?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)),
?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),
?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)),
?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)).

View File

@ -182,10 +182,8 @@ format({_Subscriber, Topic, Options}) ->
maps:with([qos, nl, rap, rh], Options)
).
get_topic(Topic, #{share := <<"$queue">> = Group}) ->
filename:join([Group, Topic]);
get_topic(Topic, #{share := Group}) ->
filename:join([<<"$share">>, Group, Topic]);
emqx_topic:join([<<"$share">>, Group, Topic]);
get_topic(Topic, _) ->
Topic.

View File

@ -8,4 +8,7 @@
- Start building MacOS packages for Apple Silicon hadrdware [#9423](https://github.com/emqx/emqx/pull/9423).
- Remove support for setting shared subscriptions using the non-standard `$queue` feature [#9412](https://github.com/emqx/emqx/pull/9412).
Shared subscriptions are now part of the MQTT spec. Use `$share` instead.
## Bug fixes

View File

@ -4,6 +4,9 @@
- 通过 `node.global_gc_interval = disabled` 来禁用全局垃圾回收 [#9418](https://github.com/emqx/emqx/pull/9418)。
- 删除了老的共享订阅支持方式, 不再使用 `$queue` 前缀 [#9412](https://github.com/emqx/emqx/pull/9412)。
共享订阅自 MQTT v5.0 开始已成为协议标准,可以使用 `$share` 前缀代替 `$queue`
## 修复
- 优化命令行实现, 避免输入错误指令时, 产生不必要的原子表消耗 [#9416](https://github.com/emqx/emqx/pull/9416)。