From 47e0f3bb1f3b376b58d12af1ad5b828da51ec3de Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 9 Apr 2024 16:40:39 +0800 Subject: [PATCH 1/3] fix(mgmt): $queue shared topics format in mgmt topics api --- apps/emqx/include/emqx_mqtt.hrl | 1 - apps/emqx/test/emqx_shared_sub_SUITE.erl | 13 ++++++---- .../src/emqx_mgmt_api_topics.erl | 5 +++- .../test/emqx_mgmt_api_topics_SUITE.erl | 24 +++++++++++++++++++ 4 files changed, 36 insertions(+), 7 deletions(-) diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 63e2799fd..09f7495ea 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -673,7 +673,6 @@ end). -define(SHARE, "$share"). -define(QUEUE, "$queue"). --define(SHARE(Group, Topic), emqx_topic:join([<>, Group, Topic])). -define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index df713ac74..040b3d295 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -1004,9 +1004,9 @@ t_different_groups_same_topic(Config) when is_list(Config) -> GroupB = <<"bb">>, Topic = <<"t/1">>, - SharedTopicGroupA = ?SHARE(GroupA, Topic), + SharedTopicGroupA = format_share(GroupA, Topic), ?UPDATE_SUB_QOS(C, SharedTopicGroupA, ?QOS_2), - SharedTopicGroupB = ?SHARE(GroupB, Topic), + SharedTopicGroupB = format_share(GroupB, Topic), ?UPDATE_SUB_QOS(C, SharedTopicGroupB, ?QOS_2), ?retry( @@ -1050,11 +1050,11 @@ t_different_groups_update_subopts(Config) when is_list(Config) -> Topic = <<"t/1">>, GroupA = <<"aa">>, GroupB = <<"bb">>, - SharedTopicGroupA = ?SHARE(GroupA, Topic), - SharedTopicGroupB = ?SHARE(GroupB, Topic), + SharedTopicGroupA = format_share(GroupA, Topic), + SharedTopicGroupB = format_share(GroupB, Topic), Fun = fun(Group, QoS) -> - ?UPDATE_SUB_QOS(C, ?SHARE(Group, Topic), QoS), + ?UPDATE_SUB_QOS(C, format_share(Group, Topic), QoS), ?assertMatch( #{qos := QoS}, emqx_broker:get_subopts(ClientId, emqx_topic:make_shared_record(Group, Topic)) @@ -1153,6 +1153,9 @@ t_queue_subscription(Config) when is_list(Config) -> %% help functions %%-------------------------------------------------------------------- +format_share(Group, Topic) -> + emqx_topic:maybe_format_share(emqx_topic:make_shared_record(Group, Topic)). + kill_process(Pid) -> kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end). diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 1cb12f8f3..ff935ce10 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -225,7 +225,10 @@ format_response_meta(Meta, _Query, #{hasnext := HasNext}) -> Meta#{hasnext => HasNext}. format(#route{topic = Topic, dest = {Group, Node}}) -> - #{topic => ?SHARE(Group, Topic), node => Node}; + #{ + topic => emqx_topic:maybe_format_share(emqx_topic:make_shared_record(Group, Topic)), + node => Node + }; format(#route{topic = Topic, dest = Node}) when is_atom(Node) -> #{topic => Topic, node => Node}; format(#route{topic = Topic, dest = SessionId}) when is_binary(SessionId) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl index 55113c9e2..a8f912802 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl @@ -187,6 +187,30 @@ t_shared_topics(_Configs) -> ok = emqtt:stop(Client). +t_queue_topics(_Configs) -> + Node = atom_to_binary(node(), utf8), + RealTopic = <<"t/+">>, + Topic = <<"$queue/", RealTopic/binary>>, + + Client = client(?FUNCTION_NAME), + {ok, _, _} = emqtt:subscribe(Client, Topic), + {ok, _, _} = emqtt:subscribe(Client, RealTopic), + + %% exact match with shared topic + MatchData = request_json(get, ["topics"], [ + {"topic", Topic}, + {"node", atom_to_list(node())} + ]), + ?assertMatch( + #{ + <<"data">> := [#{<<"topic">> := Topic, <<"node">> := Node}], + <<"meta">> := #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 1} + }, + MatchData + ), + + ok = emqtt:stop(Client). + t_shared_topics_invalid(_Config) -> %% no real topic InvalidShareTopicFilter = <<"$share/group">>, From 03a9c46ca7d4bdb4d9e62e2d71166aa9e7fb4f7a Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 9 Apr 2024 16:53:20 +0800 Subject: [PATCH 2/3] fix(sys_topic): format shared topics --- apps/emqx/src/emqx_sys.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index cc8eec3af..f50e23235 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -22,6 +22,7 @@ -include("types.hrl"). -include("logger.hrl"). -include("emqx_hooks.hrl"). +-include("emqx_mqtt.hrl"). -export([ start_link/0, @@ -279,7 +280,7 @@ on_client_subscribed( clientid => ClientId, username => Username, protocol => Protocol, - topic => Topic, + topic => emqx_topic:maybe_format_share(Topic), subopts => SubOpts, ts => erlang:system_time(millisecond) }, @@ -298,7 +299,7 @@ on_client_unsubscribed( clientid => ClientId, username => Username, protocol => Protocol, - topic => Topic, + topic => emqx_topic:maybe_format_share(Topic), ts => erlang:system_time(millisecond) }, publish(unsubscribed, Payload). From a79df4ba690c2dbd83f740cf4fbbb4b5ea5ef1bb Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 9 Apr 2024 18:14:30 +0800 Subject: [PATCH 3/3] chore: add change log for #12855 --- changes/fix-12855.en.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 changes/fix-12855.en.md diff --git a/changes/fix-12855.en.md b/changes/fix-12855.en.md new file mode 100644 index 000000000..422008243 --- /dev/null +++ b/changes/fix-12855.en.md @@ -0,0 +1,2 @@ +Fix when the client subscribes/unsubscribes to a shared topic, the system topic messages for Client subscribed/unsubscribed notification cannot be serialized correctly. +Fix the `$queue` shared topics format error in endpoint `/topics`.