Merge pull request #12855 from JimMoen/fix-share-queue-format
fix(mgmt): $queue shared topics format in mgmt topics api
This commit is contained in:
commit
91ffc95f29
|
@ -673,7 +673,6 @@ end).
|
||||||
|
|
||||||
-define(SHARE, "$share").
|
-define(SHARE, "$share").
|
||||||
-define(QUEUE, "$queue").
|
-define(QUEUE, "$queue").
|
||||||
-define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
|
|
||||||
|
|
||||||
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
-include("types.hrl").
|
-include("types.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
-include("emqx_hooks.hrl").
|
-include("emqx_hooks.hrl").
|
||||||
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
start_link/0,
|
start_link/0,
|
||||||
|
@ -279,7 +280,7 @@ on_client_subscribed(
|
||||||
clientid => ClientId,
|
clientid => ClientId,
|
||||||
username => Username,
|
username => Username,
|
||||||
protocol => Protocol,
|
protocol => Protocol,
|
||||||
topic => Topic,
|
topic => emqx_topic:maybe_format_share(Topic),
|
||||||
subopts => SubOpts,
|
subopts => SubOpts,
|
||||||
ts => erlang:system_time(millisecond)
|
ts => erlang:system_time(millisecond)
|
||||||
},
|
},
|
||||||
|
@ -298,7 +299,7 @@ on_client_unsubscribed(
|
||||||
clientid => ClientId,
|
clientid => ClientId,
|
||||||
username => Username,
|
username => Username,
|
||||||
protocol => Protocol,
|
protocol => Protocol,
|
||||||
topic => Topic,
|
topic => emqx_topic:maybe_format_share(Topic),
|
||||||
ts => erlang:system_time(millisecond)
|
ts => erlang:system_time(millisecond)
|
||||||
},
|
},
|
||||||
publish(unsubscribed, Payload).
|
publish(unsubscribed, Payload).
|
||||||
|
|
|
@ -1004,9 +1004,9 @@ t_different_groups_same_topic(Config) when is_list(Config) ->
|
||||||
GroupB = <<"bb">>,
|
GroupB = <<"bb">>,
|
||||||
Topic = <<"t/1">>,
|
Topic = <<"t/1">>,
|
||||||
|
|
||||||
SharedTopicGroupA = ?SHARE(GroupA, Topic),
|
SharedTopicGroupA = format_share(GroupA, Topic),
|
||||||
?UPDATE_SUB_QOS(C, SharedTopicGroupA, ?QOS_2),
|
?UPDATE_SUB_QOS(C, SharedTopicGroupA, ?QOS_2),
|
||||||
SharedTopicGroupB = ?SHARE(GroupB, Topic),
|
SharedTopicGroupB = format_share(GroupB, Topic),
|
||||||
?UPDATE_SUB_QOS(C, SharedTopicGroupB, ?QOS_2),
|
?UPDATE_SUB_QOS(C, SharedTopicGroupB, ?QOS_2),
|
||||||
|
|
||||||
?retry(
|
?retry(
|
||||||
|
@ -1050,11 +1050,11 @@ t_different_groups_update_subopts(Config) when is_list(Config) ->
|
||||||
Topic = <<"t/1">>,
|
Topic = <<"t/1">>,
|
||||||
GroupA = <<"aa">>,
|
GroupA = <<"aa">>,
|
||||||
GroupB = <<"bb">>,
|
GroupB = <<"bb">>,
|
||||||
SharedTopicGroupA = ?SHARE(GroupA, Topic),
|
SharedTopicGroupA = format_share(GroupA, Topic),
|
||||||
SharedTopicGroupB = ?SHARE(GroupB, Topic),
|
SharedTopicGroupB = format_share(GroupB, Topic),
|
||||||
|
|
||||||
Fun = fun(Group, QoS) ->
|
Fun = fun(Group, QoS) ->
|
||||||
?UPDATE_SUB_QOS(C, ?SHARE(Group, Topic), QoS),
|
?UPDATE_SUB_QOS(C, format_share(Group, Topic), QoS),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{qos := QoS},
|
#{qos := QoS},
|
||||||
emqx_broker:get_subopts(ClientId, emqx_topic:make_shared_record(Group, Topic))
|
emqx_broker:get_subopts(ClientId, emqx_topic:make_shared_record(Group, Topic))
|
||||||
|
@ -1153,6 +1153,9 @@ t_queue_subscription(Config) when is_list(Config) ->
|
||||||
%% help functions
|
%% help functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
format_share(Group, Topic) ->
|
||||||
|
emqx_topic:maybe_format_share(emqx_topic:make_shared_record(Group, Topic)).
|
||||||
|
|
||||||
kill_process(Pid) ->
|
kill_process(Pid) ->
|
||||||
kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end).
|
kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end).
|
||||||
|
|
||||||
|
|
|
@ -225,7 +225,10 @@ format_response_meta(Meta, _Query, #{hasnext := HasNext}) ->
|
||||||
Meta#{hasnext => HasNext}.
|
Meta#{hasnext => HasNext}.
|
||||||
|
|
||||||
format(#route{topic = Topic, dest = {Group, Node}}) ->
|
format(#route{topic = Topic, dest = {Group, Node}}) ->
|
||||||
#{topic => ?SHARE(Group, Topic), node => Node};
|
#{
|
||||||
|
topic => emqx_topic:maybe_format_share(emqx_topic:make_shared_record(Group, Topic)),
|
||||||
|
node => Node
|
||||||
|
};
|
||||||
format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
|
format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
|
||||||
#{topic => Topic, node => Node};
|
#{topic => Topic, node => Node};
|
||||||
format(#route{topic = Topic, dest = SessionId}) when is_binary(SessionId) ->
|
format(#route{topic = Topic, dest = SessionId}) when is_binary(SessionId) ->
|
||||||
|
|
|
@ -187,6 +187,30 @@ t_shared_topics(_Configs) ->
|
||||||
|
|
||||||
ok = emqtt:stop(Client).
|
ok = emqtt:stop(Client).
|
||||||
|
|
||||||
|
t_queue_topics(_Configs) ->
|
||||||
|
Node = atom_to_binary(node(), utf8),
|
||||||
|
RealTopic = <<"t/+">>,
|
||||||
|
Topic = <<"$queue/", RealTopic/binary>>,
|
||||||
|
|
||||||
|
Client = client(?FUNCTION_NAME),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, RealTopic),
|
||||||
|
|
||||||
|
%% exact match with shared topic
|
||||||
|
MatchData = request_json(get, ["topics"], [
|
||||||
|
{"topic", Topic},
|
||||||
|
{"node", atom_to_list(node())}
|
||||||
|
]),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"data">> := [#{<<"topic">> := Topic, <<"node">> := Node}],
|
||||||
|
<<"meta">> := #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 1}
|
||||||
|
},
|
||||||
|
MatchData
|
||||||
|
),
|
||||||
|
|
||||||
|
ok = emqtt:stop(Client).
|
||||||
|
|
||||||
t_shared_topics_invalid(_Config) ->
|
t_shared_topics_invalid(_Config) ->
|
||||||
%% no real topic
|
%% no real topic
|
||||||
InvalidShareTopicFilter = <<"$share/group">>,
|
InvalidShareTopicFilter = <<"$share/group">>,
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Fix when the client subscribes/unsubscribes to a shared topic, the system topic messages for Client subscribed/unsubscribed notification cannot be serialized correctly.
|
||||||
|
Fix the `$queue` shared topics format error in endpoint `/topics`.
|
Loading…
Reference in New Issue