From 814e22feb3038db995456447801529b7e5817a4c Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 30 Oct 2023 00:04:27 +0800 Subject: [PATCH] fix: topics/subscripton mgmt api searching --- apps/emqx/src/emqx_topic.erl | 7 ++- .../src/emqx_mgmt_api_subscriptions.erl | 55 +++++++++++-------- .../src/emqx_mgmt_api_topics.erl | 11 +++- 3 files changed, 46 insertions(+), 27 deletions(-) diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 20dfd4316..76c6ef34e 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -83,6 +83,8 @@ match(Name, Filter) when is_binary(Name), is_binary(Filter) -> match(words(Name), words(Filter)); match(#share{} = Name, Filter) -> match_share(Name, Filter); +match(Name, #share{} = Filter) -> + match_share(Name, Filter); match([], []) -> true; match([H | T1], [H | T2]) -> @@ -109,7 +111,10 @@ match_share(#share{group = Group, topic = Name}, #share{group = Group, topic = F match(words(Name), words(Filter)); match_share(#share{}, _) -> %% Otherwise, non-matched. - false. + false; +match_share(Name, #share{topic = Filter}) when is_binary(Name) -> + %% Only match real topic filter for normal topic_filter/topic_name. + match(Name, Filter). -spec match_any(Name, [Filter]) -> boolean() when Name :: topic() | words(), diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index d10c9d068..39ffb5972 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -142,31 +142,25 @@ parameters() -> subscriptions(get, #{query_string := QString}) -> Response = - case maps:get(<<"node">>, QString, undefined) of - undefined -> - emqx_mgmt_api:cluster_query( - ?SUBOPTION, - QString, - ?SUBS_QSCHEMA, - fun ?MODULE:qs2ms/2, - fun ?MODULE:format/2 - ); - Node0 -> - case emqx_utils:safe_to_existing_atom(Node0) of - {ok, Node1} -> - emqx_mgmt_api:node_query( - Node1, - ?SUBOPTION, - QString, - ?SUBS_QSCHEMA, - fun ?MODULE:qs2ms/2, - fun ?MODULE:format/2 - ); - {error, _} -> - {error, Node0, {badrpc, <<"invalid node">>}} + try + begin + case maps:get(<<"match_topic">>, QString, undefined) of + undefined -> + do_subscriptions_query(QString); + MatchTopic -> + case emqx_topic:parse(MatchTopic) of + {#share{}, _} -> {error, invalid_match_topic}; + _ -> do_subscriptions_query(QString) + end end + end + catch + error:{invalid_topic_filter, _} -> + {error, invalid_match_topic} end, case Response of + {error, invalid_match_topic} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"match_topic_invalid">>}}; {error, page_limit_invalid} -> {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; {error, Node, {badrpc, R}} -> @@ -176,6 +170,20 @@ subscriptions(get, #{query_string := QString}) -> {200, Result} end. +do_subscriptions_query(QString) -> + Args = [?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format/2], + case maps:get(<<"node">>, QString, undefined) of + undefined -> + erlang:apply(fun emqx_mgmt_api:cluster_query/5, Args); + Node0 -> + case emqx_utils:safe_to_existing_atom(Node0) of + {ok, Node1} -> + erlang:apply(fun emqx_mgmt_api:node_query/6, [Node1 | Args]); + {error, _} -> + {error, Node0, {badrpc, <<"invalid node">>}} + end + end. + format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> maps:merge( #{ @@ -228,5 +236,4 @@ fuzzy_filter_fun(Fuzzy) -> run_fuzzy_filter(_, []) -> true; run_fuzzy_filter(E = {{SubedTopic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> - {Filter, _SubOpts} = emqx_topic:parse(TopicFilter), - emqx_topic:match(SubedTopic, Filter) andalso run_fuzzy_filter(E, Fuzzy). + emqx_topic:match(SubedTopic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy). diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 94bedd39f..31c70573f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -149,8 +149,15 @@ qs2ms(_Tab, {Qs, _}) -> gen_match_spec([], Res) -> Res; -gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) -> - gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]); +gen_match_spec([{topic, '=:=', T0} | Qs], [{{route, _, Node}, [], ['$_']}]) when is_atom(Node) -> + {T, D} = + case emqx_topic:parse(T0) of + {#share{group = Group, topic = Topic}, _SubOpts} -> + {Topic, {Group, Node}}; + {T1, _SubOpts} -> + {T1, Node} + end, + gen_match_spec(Qs, [{{route, T, D}, [], ['$_']}]); gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]).