Merge pull request #11833 from JimMoen/fix-shared-sub-topic-or-subscription-searching
Fix shared sub topic or subscription searching
This commit is contained in:
commit
21e4f918aa
|
@ -83,6 +83,8 @@ match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
|
||||||
match(words(Name), words(Filter));
|
match(words(Name), words(Filter));
|
||||||
match(#share{} = Name, Filter) ->
|
match(#share{} = Name, Filter) ->
|
||||||
match_share(Name, Filter);
|
match_share(Name, Filter);
|
||||||
|
match(Name, #share{} = Filter) ->
|
||||||
|
match_share(Name, Filter);
|
||||||
match([], []) ->
|
match([], []) ->
|
||||||
true;
|
true;
|
||||||
match([H | T1], [H | T2]) ->
|
match([H | T1], [H | T2]) ->
|
||||||
|
@ -109,7 +111,10 @@ match_share(#share{group = Group, topic = Name}, #share{group = Group, topic = F
|
||||||
match(words(Name), words(Filter));
|
match(words(Name), words(Filter));
|
||||||
match_share(#share{}, _) ->
|
match_share(#share{}, _) ->
|
||||||
%% Otherwise, non-matched.
|
%% Otherwise, non-matched.
|
||||||
false.
|
false;
|
||||||
|
match_share(Name, #share{topic = Filter}) when is_binary(Name) ->
|
||||||
|
%% Only match real topic filter for normal topic_filter/topic_name.
|
||||||
|
match(Name, Filter).
|
||||||
|
|
||||||
-spec match_any(Name, [Filter]) -> boolean() when
|
-spec match_any(Name, [Filter]) -> boolean() when
|
||||||
Name :: topic() | words(),
|
Name :: topic() | words(),
|
||||||
|
|
|
@ -142,31 +142,13 @@ parameters() ->
|
||||||
|
|
||||||
subscriptions(get, #{query_string := QString}) ->
|
subscriptions(get, #{query_string := QString}) ->
|
||||||
Response =
|
Response =
|
||||||
case maps:get(<<"node">>, QString, undefined) of
|
case check_match_topic(QString) of
|
||||||
undefined ->
|
ok -> do_subscriptions_query(QString);
|
||||||
emqx_mgmt_api:cluster_query(
|
{error, _} = Err -> Err
|
||||||
?SUBOPTION,
|
|
||||||
QString,
|
|
||||||
?SUBS_QSCHEMA,
|
|
||||||
fun ?MODULE:qs2ms/2,
|
|
||||||
fun ?MODULE:format/2
|
|
||||||
);
|
|
||||||
Node0 ->
|
|
||||||
case emqx_utils:safe_to_existing_atom(Node0) of
|
|
||||||
{ok, Node1} ->
|
|
||||||
emqx_mgmt_api:node_query(
|
|
||||||
Node1,
|
|
||||||
?SUBOPTION,
|
|
||||||
QString,
|
|
||||||
?SUBS_QSCHEMA,
|
|
||||||
fun ?MODULE:qs2ms/2,
|
|
||||||
fun ?MODULE:format/2
|
|
||||||
);
|
|
||||||
{error, _} ->
|
|
||||||
{error, Node0, {badrpc, <<"invalid node">>}}
|
|
||||||
end
|
|
||||||
end,
|
end,
|
||||||
case Response of
|
case Response of
|
||||||
|
{error, invalid_match_topic} ->
|
||||||
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"match_topic_invalid">>}};
|
||||||
{error, page_limit_invalid} ->
|
{error, page_limit_invalid} ->
|
||||||
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
||||||
{error, Node, {badrpc, R}} ->
|
{error, Node, {badrpc, R}} ->
|
||||||
|
@ -186,6 +168,35 @@ format(WhichNode, {{Topic, _Subscriber}, SubOpts}) ->
|
||||||
maps:with([qos, nl, rap, rh], SubOpts)
|
maps:with([qos, nl, rap, rh], SubOpts)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
check_match_topic(#{<<"match_topic">> := MatchTopic}) ->
|
||||||
|
try emqx_topic:parse(MatchTopic) of
|
||||||
|
{#share{}, _} -> {error, invalid_match_topic};
|
||||||
|
_ -> ok
|
||||||
|
catch
|
||||||
|
error:{invalid_topic_filter, _} ->
|
||||||
|
{error, invalid_match_topic}
|
||||||
|
end;
|
||||||
|
check_match_topic(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
do_subscriptions_query(QString) ->
|
||||||
|
Args = [?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format/2],
|
||||||
|
case maps:get(<<"node">>, QString, undefined) of
|
||||||
|
undefined ->
|
||||||
|
erlang:apply(fun emqx_mgmt_api:cluster_query/5, Args);
|
||||||
|
Node0 ->
|
||||||
|
case emqx_utils:safe_to_existing_atom(Node0) of
|
||||||
|
{ok, Node1} ->
|
||||||
|
erlang:apply(fun emqx_mgmt_api:node_query/6, [Node1 | Args]);
|
||||||
|
{error, _} ->
|
||||||
|
{error, Node0, {badrpc, <<"invalid node">>}}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% QueryString to MatchSpec
|
%% QueryString to MatchSpec
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -228,5 +239,4 @@ fuzzy_filter_fun(Fuzzy) ->
|
||||||
run_fuzzy_filter(_, []) ->
|
run_fuzzy_filter(_, []) ->
|
||||||
true;
|
true;
|
||||||
run_fuzzy_filter(E = {{SubedTopic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
|
run_fuzzy_filter(E = {{SubedTopic, _}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
|
||||||
{Filter, _SubOpts} = emqx_topic:parse(TopicFilter),
|
emqx_topic:match(SubedTopic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy).
|
||||||
emqx_topic:match(SubedTopic, Filter) andalso run_fuzzy_filter(E, Fuzzy).
|
|
||||||
|
|
|
@ -149,8 +149,15 @@ qs2ms(_Tab, {Qs, _}) ->
|
||||||
|
|
||||||
gen_match_spec([], Res) ->
|
gen_match_spec([], Res) ->
|
||||||
Res;
|
Res;
|
||||||
gen_match_spec([{topic, '=:=', T} | Qs], [{{route, _, N}, [], ['$_']}]) ->
|
gen_match_spec([{topic, '=:=', T0} | Qs], [{{route, _, Node}, [], ['$_']}]) when is_atom(Node) ->
|
||||||
gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]);
|
{T, D} =
|
||||||
|
case emqx_topic:parse(T0) of
|
||||||
|
{#share{group = Group, topic = Topic}, _SubOpts} ->
|
||||||
|
{Topic, {Group, Node}};
|
||||||
|
{T1, _SubOpts} ->
|
||||||
|
{T1, Node}
|
||||||
|
end,
|
||||||
|
gen_match_spec(Qs, [{{route, T, D}, [], ['$_']}]);
|
||||||
gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) ->
|
gen_match_spec([{node, '=:=', N} | Qs], [{{route, T, _}, [], ['$_']}]) ->
|
||||||
gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]).
|
gen_match_spec(Qs, [{{route, T, N}, [], ['$_']}]).
|
||||||
|
|
||||||
|
|
|
@ -155,6 +155,56 @@ t_list_with_internal_subscription(_Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_list_with_shared_sub(_Config) ->
|
||||||
|
Client = proplists:get_value(client, _Config),
|
||||||
|
RealTopic = <<"t/+">>,
|
||||||
|
Topic = <<"$share/g1/", RealTopic/binary>>,
|
||||||
|
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, RealTopic),
|
||||||
|
|
||||||
|
QS = [
|
||||||
|
{"clientid", ?CLIENTID},
|
||||||
|
{"match_topic", "t/#"}
|
||||||
|
],
|
||||||
|
Headers = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"data">> := [#{<<"clientid">> := ?CLIENTID}, #{<<"clientid">> := ?CLIENTID}]},
|
||||||
|
request_json(get, QS, Headers)
|
||||||
|
),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_list_with_invalid_match_topic(_Config) ->
|
||||||
|
Client = proplists:get_value(client, _Config),
|
||||||
|
RealTopic = <<"t/+">>,
|
||||||
|
Topic = <<"$share/g1/", RealTopic/binary>>,
|
||||||
|
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, RealTopic),
|
||||||
|
|
||||||
|
QS = [
|
||||||
|
{"clientid", ?CLIENTID},
|
||||||
|
{"match_topic", "$share/g1/t/1"}
|
||||||
|
],
|
||||||
|
Headers = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{error,
|
||||||
|
{{_, 400, _}, _, #{
|
||||||
|
<<"message">> := <<"match_topic_invalid">>,
|
||||||
|
<<"code">> := <<"INVALID_PARAMETER">>
|
||||||
|
}}},
|
||||||
|
begin
|
||||||
|
{error, {R, _H, Body}} = emqx_mgmt_api_test_util:request_api(
|
||||||
|
get, path(), uri_string:compose_query(QS), Headers, [], #{return_all => true}
|
||||||
|
),
|
||||||
|
{error, {R, _H, emqx_utils_json:decode(Body, [return_maps])}}
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
request_json(Method, Query, Headers) when is_list(Query) ->
|
request_json(Method, Query, Headers) when is_list(Query) ->
|
||||||
Qs = uri_string:compose_query(Query),
|
Qs = uri_string:compose_query(Query),
|
||||||
{ok, MatchRes} = emqx_mgmt_api_test_util:request_api(Method, path(), Qs, Headers),
|
{ok, MatchRes} = emqx_mgmt_api_test_util:request_api(Method, path(), Qs, Headers),
|
||||||
|
|
|
@ -123,3 +123,35 @@ t_percent_topics(_Config) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
ok = emqtt:stop(Client).
|
ok = emqtt:stop(Client).
|
||||||
|
|
||||||
|
t_shared_topics(_Configs) ->
|
||||||
|
Node = atom_to_binary(node(), utf8),
|
||||||
|
RealTopic = <<"t/+">>,
|
||||||
|
Topic = <<"$share/g1/", RealTopic/binary>>,
|
||||||
|
|
||||||
|
{ok, Client} = emqtt:start_link(#{
|
||||||
|
username => <<"routes_username">>, clientid => <<"routes_cid">>
|
||||||
|
}),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, Topic),
|
||||||
|
{ok, _, _} = emqtt:subscribe(Client, RealTopic),
|
||||||
|
|
||||||
|
%% exact match with shared topic
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["topics"]),
|
||||||
|
QS = uri_string:compose_query([
|
||||||
|
{"topic", Topic},
|
||||||
|
{"node", atom_to_list(node())}
|
||||||
|
]),
|
||||||
|
Headers = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
{ok, MatchResponse1} = emqx_mgmt_api_test_util:request_api(get, Path, QS, Headers),
|
||||||
|
MatchData = emqx_utils_json:decode(MatchResponse1, [return_maps]),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"count">> := 1, <<"page">> := 1, <<"limit">> := 100},
|
||||||
|
maps:get(<<"meta">>, MatchData)
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
[#{<<"topic">> := Topic, <<"node">> := Node}],
|
||||||
|
maps:get(<<"data">>, MatchData)
|
||||||
|
),
|
||||||
|
|
||||||
|
ok = emqtt:stop(Client).
|
||||||
|
|
Loading…
Reference in New Issue