fix(mgmt): $queue shared topics format in mgmt topics api
This commit is contained in:
parent
04ba2aaf8a
commit
47e0f3bb1f
|
@ -673,7 +673,6 @@ end).
|
||||||
|
|
||||||
-define(SHARE, "$share").
|
-define(SHARE, "$share").
|
||||||
-define(QUEUE, "$queue").
|
-define(QUEUE, "$queue").
|
||||||
-define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
|
|
||||||
|
|
||||||
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
||||||
|
|
||||||
|
|
|
@ -1004,9 +1004,9 @@ t_different_groups_same_topic(Config) when is_list(Config) ->
|
||||||
GroupB = <<"bb">>,
|
GroupB = <<"bb">>,
|
||||||
Topic = <<"t/1">>,
|
Topic = <<"t/1">>,
|
||||||
|
|
||||||
SharedTopicGroupA = ?SHARE(GroupA, Topic),
|
SharedTopicGroupA = format_share(GroupA, Topic),
|
||||||
?UPDATE_SUB_QOS(C, SharedTopicGroupA, ?QOS_2),
|
?UPDATE_SUB_QOS(C, SharedTopicGroupA, ?QOS_2),
|
||||||
SharedTopicGroupB = ?SHARE(GroupB, Topic),
|
SharedTopicGroupB = format_share(GroupB, Topic),
|
||||||
?UPDATE_SUB_QOS(C, SharedTopicGroupB, ?QOS_2),
|
?UPDATE_SUB_QOS(C, SharedTopicGroupB, ?QOS_2),
|
||||||
|
|
||||||
?retry(
|
?retry(
|
||||||
|
@ -1050,11 +1050,11 @@ t_different_groups_update_subopts(Config) when is_list(Config) ->
|
||||||
Topic = <<"t/1">>,
|
Topic = <<"t/1">>,
|
||||||
GroupA = <<"aa">>,
|
GroupA = <<"aa">>,
|
||||||
GroupB = <<"bb">>,
|
GroupB = <<"bb">>,
|
||||||
SharedTopicGroupA = ?SHARE(GroupA, Topic),
|
SharedTopicGroupA = format_share(GroupA, Topic),
|
||||||
SharedTopicGroupB = ?SHARE(GroupB, Topic),
|
SharedTopicGroupB = format_share(GroupB, Topic),
|
||||||
|
|
||||||
Fun = fun(Group, QoS) ->
|
Fun = fun(Group, QoS) ->
|
||||||
?UPDATE_SUB_QOS(C, ?SHARE(Group, Topic), QoS),
|
?UPDATE_SUB_QOS(C, format_share(Group, Topic), QoS),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{qos := QoS},
|
#{qos := QoS},
|
||||||
emqx_broker:get_subopts(ClientId, emqx_topic:make_shared_record(Group, Topic))
|
emqx_broker:get_subopts(ClientId, emqx_topic:make_shared_record(Group, Topic))
|
||||||
|
@ -1153,6 +1153,9 @@ t_queue_subscription(Config) when is_list(Config) ->
|
||||||
%% help functions
|
%% help functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
format_share(Group, Topic) ->
|
||||||
|
emqx_topic:maybe_format_share(emqx_topic:make_shared_record(Group, Topic)).
|
||||||
|
|
||||||
kill_process(Pid) ->
|
kill_process(Pid) ->
|
||||||
kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end).
|
kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end).
|
||||||
|
|
||||||
|
|
|
@ -225,7 +225,10 @@ format_response_meta(Meta, _Query, #{hasnext := HasNext}) ->
|
||||||
Meta#{hasnext => HasNext}.
|
Meta#{hasnext => HasNext}.
|
||||||
|
|
||||||
format(#route{topic = Topic, dest = {Group, Node}}) ->
|
format(#route{topic = Topic, dest = {Group, Node}}) ->
|
||||||
#{topic => ?SHARE(Group, Topic), node => Node};
|
#{
|
||||||
|
topic => emqx_topic:maybe_format_share(emqx_topic:make_shared_record(Group, Topic)),
|
||||||
|
node => Node
|
||||||
|
};
|
||||||
format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
|
format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
|
||||||
#{topic => Topic, node => Node};
|
#{topic => Topic, node => Node};
|
||||||
format(#route{topic = Topic, dest = SessionId}) when is_binary(SessionId) ->
|
format(#route{topic = Topic, dest = SessionId}) when is_binary(SessionId) ->
|
||||||
|
|
|
@ -187,6 +187,30 @@ t_shared_topics(_Configs) ->
|
||||||
|
|
||||||
ok = emqtt:stop(Client).
|
ok = emqtt:stop(Client).
|
||||||
|
|
||||||
|
t_queue_topics(_Configs) ->
|
||||||
|
Node = atom_to_binary(node(), utf8),
|
||||||
|
RealTopic = <<"t/+">>,
|
||||||
|
Topic = <<"$queue/", RealTopic/binary>>,
|
||||||
|
|
||||||
|
Client = client(?FUNCTION_NAME),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, RealTopic),
|
||||||
|
|
||||||
|
%% exact match with shared topic
|
||||||
|
MatchData = request_json(get, ["topics"], [
|
||||||
|
{"topic", Topic},
|
||||||
|
{"node", atom_to_list(node())}
|
||||||
|
]),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"data">> := [#{<<"topic">> := Topic, <<"node">> := Node}],
|
||||||
|
<<"meta">> := #{<<"page">> := 1, <<"limit">> := 100, <<"count">> := 1}
|
||||||
|
},
|
||||||
|
MatchData
|
||||||
|
),
|
||||||
|
|
||||||
|
ok = emqtt:stop(Client).
|
||||||
|
|
||||||
t_shared_topics_invalid(_Config) ->
|
t_shared_topics_invalid(_Config) ->
|
||||||
%% no real topic
|
%% no real topic
|
||||||
InvalidShareTopicFilter = <<"$share/group">>,
|
InvalidShareTopicFilter = <<"$share/group">>,
|
||||||
|
|
Loading…
Reference in New Issue