Merge pull request #7689 from zhongwencool/fix-sub-search

fix: topic filter with qos/clientid/share
This commit is contained in:
zhongwencool 2022-04-25 09:34:42 +08:00 committed by GitHub
commit f7f1b0ecbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 38 additions and 5 deletions

View File

@ -18,6 +18,10 @@ File format:
* Add support for JWT authorization [#7596] * Add support for JWT authorization [#7596]
Now MQTT clients may be authorized with respect to a specific claim containing publish/subscribe topic whitelists. Now MQTT clients may be authorized with respect to a specific claim containing publish/subscribe topic whitelists.
### Bug fixes
* List subscription topic (/api/v4/subscriptions), the result do not match with multiple conditions.
## v4.3.14 ## v4.3.14
### Enhancements ### Enhancements

View File

@ -1,6 +1,6 @@
{application, emqx_management, {application, emqx_management,
[{description, "EMQ X Management API and CLI"}, [{description, "EMQ X Management API and CLI"},
{vsn, "4.3.12"}, % strict semver, bump manually! {vsn, "4.3.13"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_management_sup]}, {registered, [emqx_management_sup]},
{applications, [kernel,stdlib,minirest]}, {applications, [kernel,stdlib,minirest]},

View File

@ -1,13 +1,13 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[ {<<"4\\.3\\.([0-9]|1[0-1])">>, [ {<<"4\\.3\\.([0-9]|1[0-2])">>,
[ {apply,{minirest,stop_http,['http:management']}}, [ {apply,{minirest,stop_http,['http:management']}},
{apply,{minirest,stop_http,['https:management']}}, {apply,{minirest,stop_http,['https:management']}},
{restart_application, emqx_management} {restart_application, emqx_management}
]}, ]},
{<<".*">>, []} {<<".*">>, []}
], ],
[ {<<"4\\.3\\.([0-9]|1[0-1])">>, [ {<<"4\\.3\\.([0-9]|1[0-2])">>,
[ {apply,{minirest,stop_http,['http:management']}}, [ {apply,{minirest,stop_http,['http:management']}},
{apply,{minirest,stop_http,['https:management']}}, {apply,{minirest,stop_http,['https:management']}},
{restart_application, emqx_management} {restart_application, emqx_management}

View File

@ -67,7 +67,8 @@ list(Bindings, Params) when map_size(Bindings) == 0 ->
Topic0 -> Topic0 ->
Topic = emqx_mgmt_util:urldecode(Topic0), Topic = emqx_mgmt_util:urldecode(Topic0),
Data = emqx_mgmt:list_subscriptions_via_topic(Topic, ?format_fun), Data = emqx_mgmt:list_subscriptions_via_topic(Topic, ?format_fun),
minirest:return({ok, add_meta(Params, Data)}) FilterData = filter_subscriptions(Data, Params),
minirest:return({ok, add_meta(Params, FilterData)})
end; end;
list(#{node := Node} = Bindings, Params) -> list(#{node := Node} = Bindings, Params) ->
@ -85,7 +86,8 @@ list(#{node := Node} = Bindings, Params) ->
Topic0 -> Topic0 ->
Topic = emqx_mgmt_util:urldecode(Topic0), Topic = emqx_mgmt_util:urldecode(Topic0),
Data = emqx_mgmt:list_subscriptions_via_topic(Node, Topic, ?format_fun), Data = emqx_mgmt:list_subscriptions_via_topic(Node, Topic, ?format_fun),
minirest:return({ok, add_meta(Params, Data)}) FilterData = filter_subscriptions(Data, Params),
minirest:return({ok, add_meta(Params, FilterData)})
end. end.
add_meta(Params, List) -> add_meta(Params, List) ->
@ -169,3 +171,30 @@ update_ms(share, X, {{Pid, Topic}, Opts}) ->
{{Pid, Topic}, Opts#{share => X}}; {{Pid, Topic}, Opts#{share => X}};
update_ms(qos, X, {{Pid, Topic}, Opts}) -> update_ms(qos, X, {{Pid, Topic}, Opts}) ->
{{Pid, Topic}, Opts#{qos => X}}. {{Pid, Topic}, Opts#{qos => X}}.
filter_subscriptions(Data0, Params) ->
Data1 = filter_by_key(qos, qos(Params), Data0),
Data2 = filter_by_key(clientid, proplists:get_value(<<"clientid">>, Params), Data1),
case proplists:get_value(<<"share">>, Params) of
undefined -> Data2;
Share ->
Prefix = filename:join([<<"$share">>, Share]),
Size = byte_size(Prefix),
lists:filter(fun(#{topic := Topic}) ->
case Topic of
<<Prefix:Size/binary, _/binary>> -> true;
_ -> false
end
end,
Data2)
end.
qos(Params) ->
case proplists:get_value(<<"qos">>, Params) of
undefined -> undefined;
Qos when is_integer(Qos) -> Qos;
Qos when is_binary(Qos) -> binary_to_integer(Qos)
end.
filter_by_key(_Key, undefined, List) -> List;
filter_by_key(Key, Value, List) -> lists:filter(fun(E) -> Value =:= maps:get(Key, E) end, List).