From 814e22feb3038db995456447801529b7e5817a4c Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 30 Oct 2023 00:04:27 +0800 Subject: [PATCH 1/3] fix: topics/subscripton mgmt api searching --- apps/emqx/src/emqx_topic.erl | 7 ++- .../src/emqx_mgmt_api_subscriptions.erl | 55 +++++++++++-------- .../src/emqx_mgmt_api_topics.erl | 11 +++- 3 files changed, 46 insertions(+), 27 deletions(-) diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index 20dfd4316..76c6ef34e 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -83,6 +83,8 @@ match(Name, Filter) when is_binary(Name), is_binary(Filter) -> match(words(Name), words(Filter)); match(#share{} = Name, Filter) -> match_share(Name, Filter); +match(Name, #share{} = Filter) -> + match_share(Name, Filter); match([], []) -> true; match([H | T1], [H | T2]) -> @@ -109,7 +111,10 @@ match_share(#share{group = Group, topic = Name}, #share{group = Group, topic = F match(words(Name), words(Filter)); match_share(#share{}, _) -> %% Otherwise, non-matched. - false. + false; +match_share(Name, #share{topic = Filter}) when is_binary(Name) -> + %% Only match real topic filter for normal topic_filter/topic_name. + match(Name, Filter). -spec match_any(Name, [Filter]) -> boolean() when Name :: topic() | words(), diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index d10c9d068..39ffb5972 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -142,31 +142,25 @@ parameters() -> subscriptions(get, #{query_string := QString}) -> Response = - case maps:get(<<"node">>, QString, undefined) of - undefined -> - emqx_mgmt_api:cluster_query( - ?SUBOPTION, - QString, - ?SUBS_QSCHEMA, - fun ?MODULE:qs2ms/2, - fun ?MODULE:format/2 - ); - Node0 -> - case emqx_utils:safe_to_existing_atom(Node0) of - {ok, Node1} -> - emqx_mgmt_api:node_query( - Node1, - ?SUBOPTION, - QString, - ?SUBS_QSCHEMA, - fun ?MODULE:qs2ms/2, - fun ?MODULE:format/2 - ); - {error, _} -> - {error, Node0, {badrpc, <<"invalid node">>}} + try + begin + case maps:get(<<"match_topic">>, QString, undefined) of + undefined -> + do_subscriptions_query(QString); + MatchTopic -> + case emqx_topic:parse(MatchTopic) of + {#share{}, _} -> {error, invalid_match_topic}; + _ -> do_subscriptions_query(QString) + end end + end + catch + error:{invalid_topic_filter, _} -> + {error, invalid_match_topic} end, case Response of + {error, invalid_match_topic} -> + {400, #{code => <<"INVALID_PARAMETER">>, message => <<"match_topic_invalid">>}}; {error, page_limit_invalid} -> {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; {error, Node, {badrpc, R}} -> @@ -176,6 +170,20 @@ subscriptions(get, #{query_string := QString}) -> {200, Result} end. +do_subscriptions_query(QString) -> + Args = [?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format/2], + case maps:get(<<"node">>, QString, undefined) of + undefined -> + erlang:apply(fun emqx_mgmt_api:cluster_query/5, Args); + Node0 -> + case emqx_utils:safe_to_existing_atom(Node0) of + {ok, Node1} -> + erlang:apply(fun emqx_mgmt_api:node_query/6, [Node1 | Args]); + {error, _} -> + {error, Node0, {badrpc, <<"invalid node">>}} + end + end. + format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> maps:merge( #{ @@ -228,5 +236,4 @@ fuzzy_filter_fun(Fuzzy) -> run_fuzzy_filter(_, []) -> true; run_fuzzy_filter(E = {{SubedTopic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> - {Filter, _SubOpts} = emqx_topic:parse(TopicFilter), - emqx_topic:match(SubedTopic, Filter) andalso run_fuzzy_filter(E, Fuzzy). + emqx_topic:match(SubedTopic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy). diff --git a/apps/emqx_management/src/emqx_mgmt_api_topics.erl b/apps/emqx_management/src/emqx_mgmt_api_topics.erl index 94bedd39f..31c70573f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_topics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_topics.erl @@ -149,8 +149,15 @@ qs2ms(_Tab, {Qs, _}) -> gen_match_spec([], Res) -> Res; -gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) -> - gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]); +gen_match_spec([{topic, '=:=', T0} | Qs], [{{route, _, Node}, [], ['$_']}]) when is_atom(Node) -> + {T, D} = + case emqx_topic:parse(T0) of + {#share{group = Group, topic = Topic}, _SubOpts} -> + {Topic, {Group, Node}}; + {T1, _SubOpts} -> + {T1, Node} + end, + gen_match_spec(Qs, [{{route, T, D}, [], ['$_']}]); gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]). From e9de7316b62354ba8c2c6b98baa2249c3665bcb9 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 30 Oct 2023 14:41:57 +0800 Subject: [PATCH 2/3] test: shared-sub topics/subscription api --- .../test/emqx_mgmt_api_subscription_SUITE.erl | 50 +++++++++++++++++++ .../test/emqx_mgmt_api_topics_SUITE.erl | 32 ++++++++++++ 2 files changed, 82 insertions(+) diff --git a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl index a23d70f2f..9ca3bb876 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl @@ -155,6 +155,56 @@ t_list_with_internal_subscription(_Config) -> ), ok. +t_list_with_shared_sub(_Config) -> + Client = proplists:get_value(client, _Config), + RealTopic = <<"t/+">>, + Topic = <<"$share/g1/", RealTopic/binary>>, + + {ok, _, _} = emqtt:subscribe(Client, Topic), + {ok, _, _} = emqtt:subscribe(Client, RealTopic), + + QS = [ + {"clientid", ?CLIENTID}, + {"match_topic", "t/#"} + ], + Headers = emqx_mgmt_api_test_util:auth_header_(), + + ?assertMatch( + #{<<"data">> := [#{<<"clientid">> := ?CLIENTID}, #{<<"clientid">> := ?CLIENTID}]}, + request_json(get, QS, Headers) + ), + + ok. + +t_list_with_invalid_match_topic(_Config) -> + Client = proplists:get_value(client, _Config), + RealTopic = <<"t/+">>, + Topic = <<"$share/g1/", RealTopic/binary>>, + + {ok, _, _} = emqtt:subscribe(Client, Topic), + {ok, _, _} = emqtt:subscribe(Client, RealTopic), + + QS = [ + {"clientid", ?CLIENTID}, + {"match_topic", "$share/g1/t/1"} + ], + Headers = emqx_mgmt_api_test_util:auth_header_(), + + ?assertMatch( + {error, + {{_, 400, _}, _, #{ + <<"message">> := <<"match_topic_invalid">>, + <<"code">> := <<"INVALID_PARAMETER">> + }}}, + begin + {error, {R, _H, Body}} = emqx_mgmt_api_test_util:request_api( + get, path(), uri_string:compose_query(QS), Headers, [], #{return_all => true} + ), + {error, {R, _H, emqx_utils_json:decode(Body, [return_maps])}} + end + ), + ok. + request_json(Method, Query, Headers) when is_list(Query) -> Qs = uri_string:compose_query(Query), {ok, MatchRes} = emqx_mgmt_api_test_util:request_api(Method, path(), Qs, Headers), diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl index a2f546267..854f1133b 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl @@ -123,3 +123,35 @@ t_percent_topics(_Config) -> ), ok = emqtt:stop(Client). + +t_shared_topics(_Configs) -> + Node = atom_to_binary(node(), utf8), + RealTopic = <<"t/+">>, + Topic = <<"$share/g1/", RealTopic/binary>>, + + {ok, Client} = emqtt:start_link(#{ + username => <<"routes_username">>, clientid => <<"routes_cid">> + }), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, Topic), + {ok, _, _} = emqtt:subscribe(Client, RealTopic), + + %% exact match with shared topic + Path = emqx_mgmt_api_test_util:api_path(["topics"]), + QS = uri_string:compose_query([ + {"topic", Topic}, + {"node", atom_to_list(node())} + ]), + Headers = emqx_mgmt_api_test_util:auth_header_(), + {ok, MatchResponse1} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers), + MatchData = emqx_utils_json:decode(MatchResponse1, [return_maps]), + ?assertMatch( + #{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100}, + maps:get(<<"meta">>, MatchData) + ), + ?assertMatch( + [#{<<"topic">> := Topic, <<"node">> := Node}], + maps:get(<<"data">>, MatchData) + ), + + ok = emqtt:stop(Client). From 3a09fdc495058bef64335698c1de5184c24a2cad Mon Sep 17 00:00:00 2001 From: JimMoen Date: Mon, 30 Oct 2023 16:40:56 +0800 Subject: [PATCH 3/3] refactor: check match topic before do subscriptions query --- .../src/emqx_mgmt_api_subscriptions.erl | 53 ++++++++++--------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 39ffb5972..ca0a7a625 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -142,21 +142,9 @@ parameters() -> subscriptions(get, #{query_string := QString}) -> Response = - try - begin - case maps:get(<<"match_topic">>, QString, undefined) of - undefined -> - do_subscriptions_query(QString); - MatchTopic -> - case emqx_topic:parse(MatchTopic) of - {#share{}, _} -> {error, invalid_match_topic}; - _ -> do_subscriptions_query(QString) - end - end - end - catch - error:{invalid_topic_filter, _} -> - {error, invalid_match_topic} + case check_match_topic(QString) of + ok -> do_subscriptions_query(QString); + {error, _} = Err -> Err end, case Response of {error, invalid_match_topic} -> @@ -170,6 +158,31 @@ subscriptions(get, #{query_string := QString}) -> {200, Result} end. +format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> + maps:merge( + #{ + topic => emqx_topic:maybe_format_share(Topic), + clientid => maps:get(subid, SubOpts, null), + node => WhichNode + }, + maps:with([qos, nl, rap, rh], SubOpts) + ). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +check_match_topic(#{<<"match_topic">> := MatchTopic}) -> + try emqx_topic:parse(MatchTopic) of + {#share{}, _} -> {error, invalid_match_topic}; + _ -> ok + catch + error:{invalid_topic_filter, _} -> + {error, invalid_match_topic} + end; +check_match_topic(_) -> + ok. + do_subscriptions_query(QString) -> Args = [?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format/2], case maps:get(<<"node">>, QString, undefined) of @@ -184,16 +197,6 @@ do_subscriptions_query(QString) -> end end. -format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> - maps:merge( - #{ - topic => emqx_topic:maybe_format_share(Topic), - clientid => maps:get(subid, SubOpts, null), - node => WhichNode - }, - maps:with([qos, nl, rap, rh], SubOpts) - ). - %%-------------------------------------------------------------------- %% QueryString to MatchSpec %%--------------------------------------------------------------------