fix: topics/subscripton mgmt api searching

This commit is contained in:
JimMoen 2023-10-30 00:04:27 +08:00
parent a2015f37ae
commit 814e22feb3
No known key found for this signature in database
GPG Key ID: 87A520B4F76BA86D
3 changed files with 46 additions and 27 deletions

View File

@ -83,6 +83,8 @@ match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
match(words(Name), words(Filter)); match(words(Name), words(Filter));
match(#share{} = Name, Filter) -> match(#share{} = Name, Filter) ->
match_share(Name, Filter); match_share(Name, Filter);
match(Name, #share{} = Filter) ->
match_share(Name, Filter);
match([], []) -> match([], []) ->
true; true;
match([H | T1], [H | T2]) -> match([H | T1], [H | T2]) ->
@ -109,7 +111,10 @@ match_share(#share{group = Group, topic = Name}, #share{group = Group, topic = F
match(words(Name), words(Filter)); match(words(Name), words(Filter));
match_share(#share{}, _) -> match_share(#share{}, _) ->
%% Otherwise, non-matched. %% Otherwise, non-matched.
false. false;
match_share(Name, #share{topic = Filter}) when is_binary(Name) ->
%% Only match real topic filter for normal topic_filter/topic_name.
match(Name, Filter).
-spec match_any(Name, [Filter]) -> boolean() when -spec match_any(Name, [Filter]) -> boolean() when
Name :: topic() | words(), Name :: topic() | words(),

View File

@ -142,31 +142,25 @@ parameters() ->
subscriptions(get, #{query_string := QString}) -> subscriptions(get, #{query_string := QString}) ->
Response = Response =
case maps:get(<<"node">>, QString, undefined) of try
undefined -> begin
emqx_mgmt_api:cluster_query( case maps:get(<<"match_topic">>, QString, undefined) of
?SUBOPTION, undefined ->
QString, do_subscriptions_query(QString);
?SUBS_QSCHEMA, MatchTopic ->
fun ?MODULE:qs2ms/2, case emqx_topic:parse(MatchTopic) of
fun ?MODULE:format/2 {#share{}, _} -> {error, invalid_match_topic};
); _ -> do_subscriptions_query(QString)
Node0 -> end
case emqx_utils:safe_to_existing_atom(Node0) of
{ok, Node1} ->
emqx_mgmt_api:node_query(
Node1,
?SUBOPTION,
QString,
?SUBS_QSCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format/2
);
{error, _} ->
{error, Node0, {badrpc, <<"invalid node">>}}
end end
end
catch
error:{invalid_topic_filter, _} ->
{error, invalid_match_topic}
end, end,
case Response of case Response of
{error, invalid_match_topic} ->
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"match_topic_invalid">>}};
{error, page_limit_invalid} -> {error, page_limit_invalid} ->
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}}; {400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
{error, Node, {badrpc, R}} -> {error, Node, {badrpc, R}} ->
@ -176,6 +170,20 @@ subscriptions(get, #{query_string := QString}) ->
{200, Result} {200, Result}
end. end.
do_subscriptions_query(QString) ->
Args = [?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format/2],
case maps:get(<<"node">>, QString, undefined) of
undefined ->
erlang:apply(fun emqx_mgmt_api:cluster_query/5, Args);
Node0 ->
case emqx_utils:safe_to_existing_atom(Node0) of
{ok, Node1} ->
erlang:apply(fun emqx_mgmt_api:node_query/6, [Node1 | Args]);
{error, _} ->
{error, Node0, {badrpc, <<"invalid node">>}}
end
end.
format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> format(WhichNode, {{Topic, _Subscriber}, SubOpts}) ->
maps:merge( maps:merge(
#{ #{
@ -228,5 +236,4 @@ fuzzy_filter_fun(Fuzzy) ->
run_fuzzy_filter(_, []) -> run_fuzzy_filter(_, []) ->
true; true;
run_fuzzy_filter(E = {{SubedTopic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) -> run_fuzzy_filter(E = {{SubedTopic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
{Filter, _SubOpts} = emqx_topic:parse(TopicFilter), emqx_topic:match(SubedTopic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy).
emqx_topic:match(SubedTopic, Filter) andalso run_fuzzy_filter(E, Fuzzy).

View File

@ -149,8 +149,15 @@ qs2ms(_Tab, {Qs, _}) ->
gen_match_spec([], Res) -> gen_match_spec([], Res) ->
Res; Res;
gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) -> gen_match_spec([{topic, '=:=', T0} | Qs], [{{route, _, Node}, [], ['$_']}]) when is_atom(Node) ->
gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]); {T, D} =
case emqx_topic:parse(T0) of
{#share{group = Group, topic = Topic}, _SubOpts} ->
{Topic, {Group, Node}};
{T1, _SubOpts} ->
{T1, Node}
end,
gen_match_spec(Qs, [{{route, T, D}, [], ['$_']}]);
gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) -> gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) ->
gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]). gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]).