feat: remove $queue in favor of $shared
This commit removes support for setting shared subscriptions with the non-standard $queue feature. Shared subscriptions is now part of the MQTT spec (using $shared) and we will only support that from now on.
This commit is contained in:
parent
7c048081b1
commit
95faf56077
|
@ -40,15 +40,12 @@
|
|||
-define(ERTS_MINIMUM_REQUIRED, "10.0").
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Topics' prefix: $SYS | $queue | $share
|
||||
%% Topics' prefix: $SYS | $share
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% System topic
|
||||
-define(SYSTOP, <<"$SYS/">>).
|
||||
|
||||
%% Queue topic
|
||||
-define(QUEUE, <<"$queue/">>).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% alarms
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -210,12 +210,8 @@ parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
|
|||
parse(TopicFilter, Options).
|
||||
|
||||
-spec parse(topic(), map()) -> {topic(), map()}.
|
||||
parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
|
||||
error({invalid_topic_filter, TopicFilter});
|
||||
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
|
||||
error({invalid_topic_filter, TopicFilter});
|
||||
parse(<<"$queue/", TopicFilter/binary>>, Options) ->
|
||||
parse(TopicFilter, Options#{share => <<"$queue">>});
|
||||
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
|
||||
case binary:split(Rest, <<"/">>) of
|
||||
[_Any] ->
|
||||
|
|
|
@ -424,7 +424,7 @@ systopic_mon() ->
|
|||
sharetopic() ->
|
||||
?LET(
|
||||
{Type, Grp, T},
|
||||
{oneof([<<"$queue">>, <<"$share">>]), list(latin_char()), normal_topic()},
|
||||
{oneof([<<"$share">>]), list(latin_char()), normal_topic()},
|
||||
<<Type/binary, "/", (iolist_to_binary(Grp))/binary, "/", T/binary>>
|
||||
).
|
||||
|
||||
|
|
|
@ -186,10 +186,6 @@ t_systop(_) ->
|
|||
?assertEqual(SysTop2, systop(<<"abc">>)).
|
||||
|
||||
t_feed_var(_) ->
|
||||
?assertEqual(
|
||||
<<"$queue/client/clientId">>,
|
||||
feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)
|
||||
),
|
||||
?assertEqual(
|
||||
<<"username/test/client/x">>,
|
||||
feed_var(
|
||||
|
@ -211,10 +207,6 @@ long_topic() ->
|
|||
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]).
|
||||
|
||||
t_parse(_) ->
|
||||
?assertError(
|
||||
{invalid_topic_filter, <<"$queue/t">>},
|
||||
parse(<<"$queue/t">>, #{share => <<"g">>})
|
||||
),
|
||||
?assertError(
|
||||
{invalid_topic_filter, <<"$share/g/t">>},
|
||||
parse(<<"$share/g/t">>, #{share => <<"g">>})
|
||||
|
@ -229,11 +221,9 @@ t_parse(_) ->
|
|||
),
|
||||
?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)),
|
||||
?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})),
|
||||
?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)),
|
||||
?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)),
|
||||
%% The '$local' and '$fastlane' topics have been deprecated.
|
||||
?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)),
|
||||
?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),
|
||||
?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)),
|
||||
?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)).
|
||||
|
||||
|
|
|
@ -182,8 +182,6 @@ format({_Subscriber, Topic, Options}) ->
|
|||
maps:with([qos, nl, rap, rh], Options)
|
||||
).
|
||||
|
||||
get_topic(Topic, #{share := <<"$queue">> = Group}) ->
|
||||
filename:join([Group, Topic]);
|
||||
get_topic(Topic, #{share := Group}) ->
|
||||
filename:join([<<"$share">>, Group, Topic]);
|
||||
get_topic(Topic, _) ->
|
||||
|
|
Loading…
Reference in New Issue