diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 63e2799fd..09f7495ea 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -673,7 +673,6 @@ end). -define(SHARE, "$share"). -define(QUEUE, "$queue"). --define(SHARE(Group, Topic), emqx_topic:join([<>, Group, Topic])). -define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index df713ac74..040b3d295 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -1004,9 +1004,9 @@ t_different_groups_same_topic(Config) when is_list(Config) -> GroupB = <<"bb">>, Topic = <<"t/1">>, - SharedTopicGroupA = ?SHARE(GroupA, Topic), + SharedTopicGroupA = format_share(GroupA, Topic), ?UPDATE_SUB_QOS(C, SharedTopicGroupA, ?QOS_2), - SharedTopicGroupB = ?SHARE(GroupB, Topic), + SharedTopicGroupB = format_share(GroupB, Topic), ?UPDATE_SUB_QOS(C, SharedTopicGroupB, ?QOS_2), ?retry( @@ -1050,11 +1050,11 @@ t_different_groups_update_subopts(Config) when is_list(Config) -> Topic = <<"t/1">>, GroupA = <<"aa">>, GroupB = <<"bb">>, - SharedTopicGroupA = ?SHARE(GroupA, Topic), - SharedTopicGroupB = ?SHARE(GroupB, Topic), + SharedTopicGroupA = format_share(GroupA, Topic), + SharedTopicGroupB = format_share(GroupB, Topic), Fun = fun(Group, QoS) -> - ?UPDATE_SUB_QOS(C, ?SHARE(Group, Topic), QoS), + ?UPDATE_SUB_QOS(C, format_share(Group, Topic), QoS), ?assertMatch( #{qos := QoS}, emqx_broker:get_subopts(ClientId, emqx_topic:make_shared_record(Group, Topic)) @@ -1153,6 +1153,9 @@ t_queue_subscription(Config) when is_list(Config) -> %% help functions %%-------------------------------------------------------------------- +format_share(Group, Topic) -> + emqx_topic:maybe_format_share(emqx_topic:make_shared_record(Group, Topic)). + kill_process(Pid) -> kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end). diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 1cb12f8f3..ff935ce10 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -225,7 +225,10 @@ format_response_meta(Meta, _Query, #{hasnext := HasNext}) -> Meta#{hasnext => HasNext}. format(#route{topic = Topic, dest = {Group, Node}}) -> - #{topic => ?SHARE(Group, Topic), node => Node}; + #{ + topic => emqx_topic:maybe_format_share(emqx_topic:make_shared_record(Group, Topic)), + node => Node + }; format(#route{topic = Topic, dest = Node}) when is_atom(Node) -> #{topic => Topic, node => Node}; format(#route{topic = Topic, dest = SessionId}) when is_binary(SessionId) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl index 55113c9e2..a8f912802 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl @@ -187,6 +187,30 @@ t_shared_topics(_Configs) -> ok = emqtt:stop(Client). +t_queue_topics(_Configs) -> + Node = atom_to_binary(node(), utf8), + RealTopic = <<"t/+">>, + Topic = <<"$queue/", RealTopic/binary>>, + + Client = client(?FUNCTION_NAME), + {ok, _, _} = emqtt:subscribe(Client, Topic), + {ok, _, _} = emqtt:subscribe(Client, RealTopic), + + %% exact match with shared topic + MatchData = request_json(get, ["topics"], [ + {"topic", Topic}, + {"node", atom_to_list(node())} + ]), + ?assertMatch( + #{ + <<"data">> := [#{<<"topic">> := Topic, <<"node">> := Node}], + <<"meta">> := #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 1} + }, + MatchData + ), + + ok = emqtt:stop(Client). + t_shared_topics_invalid(_Config) -> %% no real topic InvalidShareTopicFilter = <<"$share/group">>,