fix(mgmt_api): kickout non-existing clientid should return code `404` (#6180)

This commit is contained in:
JimMoen 2021-11-17 12:29:27 +08:00 committed by GitHub
commit 57075902a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 24 deletions

View File

@ -119,6 +119,8 @@
-define(APP, emqx_management). -define(APP, emqx_management).
-elvis([{elvis_style, god_modules, disable}]).
%% TODO: remove these function after all api use minirest version 1.X %% TODO: remove these function after all api use minirest version 1.X
return() -> return() ->
ok. ok.
@ -147,7 +149,8 @@ node_info(Node) when Node =:= node() ->
memory_used => proplists:get_value(total, Memory), memory_used => proplists:get_value(total, Memory),
process_available => erlang:system_info(process_limit), process_available => erlang:system_info(process_limit),
process_used => erlang:system_info(process_count), process_used => erlang:system_info(process_count),
max_fds => proplists:get_value(max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))), max_fds => proplists:get_value(
max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))),
connections => ets:info(emqx_channel, size), connections => ets:info(emqx_channel, size),
node_status => 'Running', node_status => 'Running',
uptime => proplists:get_value(uptime, BrokerInfo), uptime => proplists:get_value(uptime, BrokerInfo),
@ -232,10 +235,12 @@ nodes_info_count(PropList) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
lookup_client({clientid, ClientId}, FormatFun) -> lookup_client({clientid, ClientId}, FormatFun) ->
lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- mria_mnesia:running_nodes()]); lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun)
|| Node <- mria_mnesia:running_nodes()]);
lookup_client({username, Username}, FormatFun) -> lookup_client({username, Username}, FormatFun) ->
lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- mria_mnesia:running_nodes()]). lists:append([lookup_client(Node, {username, Username}, FormatFun)
|| Node <- mria_mnesia:running_nodes()]).
lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() -> lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() ->
lists:append(lists:map( lists:append(lists:map(
@ -256,11 +261,13 @@ lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() ->
lookup_client(Node, {username, Username}, FormatFun) -> lookup_client(Node, {username, Username}, FormatFun) ->
rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]). rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]).
kickout_client(ClientId) -> kickout_client({ClientID, FormatFun}) ->
Results = [kickout_client(Node, ClientId) || Node <- mria_mnesia:running_nodes()], case lookup_client({clientid, ClientID}, FormatFun) of
case lists:any(fun(Item) -> Item =:= ok end, Results) of [] ->
true -> ok; {error, not_found};
false -> lists:last(Results) _ ->
Results = [kickout_client(Node, ClientID) || Node <- mria_mnesia:running_nodes()],
check_results(Results)
end. end.
kickout_client(Node, ClientId) when Node =:= node() -> kickout_client(Node, ClientId) when Node =:= node() ->
@ -280,7 +287,7 @@ list_client_subscriptions(ClientId) ->
end, Results), end, Results),
case Expected of case Expected of
[] -> []; [] -> [];
[Result|_] -> Result [Result | _] -> Result
end. end.
client_subscriptions(Node, ClientId) when Node =:= node() -> client_subscriptions(Node, ClientId) when Node =:= node() ->
@ -291,10 +298,7 @@ client_subscriptions(Node, ClientId) ->
clean_authz_cache(ClientId) -> clean_authz_cache(ClientId) ->
Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()], Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of check_results(Results).
true -> ok;
false -> lists:last(Results)
end.
clean_authz_cache(Node, ClientId) when Node =:= node() -> clean_authz_cache(Node, ClientId) when Node =:= node() ->
case emqx_cm:lookup_channels(ClientId) of case emqx_cm:lookup_channels(ClientId) of
@ -334,7 +338,7 @@ call_client(ClientId, Req) ->
end, Results), end, Results),
case Expected of case Expected of
[] -> {error, not_found}; [] -> {error, not_found};
[Result|_] -> Result [Result | _] -> Result
end. end.
%% @private %% @private
@ -345,7 +349,7 @@ call_client(Node, ClientId, Req) when Node =:= node() ->
Pid = lists:last(Pids), Pid = lists:last(Pids),
case emqx_cm:get_chan_info(ClientId, Pid) of case emqx_cm:get_chan_info(ClientId, Pid) of
#{conninfo := #{conn_mod := ConnMod}} -> #{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(Pid, Req); erlang:apply(ConnMod, call, [Pid, Req]);
undefined -> {error, not_found} undefined -> {error, not_found}
end end
end; end;
@ -366,11 +370,12 @@ list_subscriptions(Node) ->
rpc_call(Node, list_subscriptions, [Node]). rpc_call(Node, list_subscriptions, [Node]).
list_subscriptions_via_topic(Topic, FormatFun) -> list_subscriptions_via_topic(Topic, FormatFun) ->
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- mria_mnesia:running_nodes()]). lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
|| Node <- mria_mnesia:running_nodes()]).
list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() -> list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}], MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
M:F(ets:select(emqx_suboption, MatchSpec)); erlang:apply(M, F, [ets:select(emqx_suboption, MatchSpec)]);
list_subscriptions_via_topic(Node, Topic, FormatFun) -> list_subscriptions_via_topic(Node, Topic, FormatFun) ->
rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]). rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
@ -497,8 +502,11 @@ listener_id_filter(Id, Listeners) ->
Filter = fun(#{id := Id0}) -> Id0 =:= Id end, Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
lists:filter(Filter, Listeners). lists:filter(Filter, Listeners).
-spec manage_listener(Operation :: start_listener|stop_listener|restart_listener, Param :: map()) -> -spec manage_listener( Operation :: start_listener
ok | {error, Reason :: term()}. | stop_listener
| restart_listener
, Param :: map()) ->
ok | {error, Reason :: term()}.
manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()-> manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()->
erlang:apply(emqx_listeners, Operation, [ID]); erlang:apply(emqx_listeners, Operation, [ID]);
manage_listener(Operation, Param = #{node := Node}) -> manage_listener(Operation, Param = #{node := Node}) ->
@ -566,9 +574,12 @@ add_duration_field(Alarms) ->
add_duration_field([], _Now, Acc) -> add_duration_field([], _Now, Acc) ->
Acc; Acc;
add_duration_field([Alarm = #{activated := true, activate_at := ActivateAt}| Rest], Now, Acc) -> add_duration_field([Alarm = #{activated := true, activate_at := ActivateAt} | Rest], Now, Acc) ->
add_duration_field(Rest, Now, [Alarm#{duration => Now - ActivateAt} | Acc]); add_duration_field(Rest, Now, [Alarm#{duration => Now - ActivateAt} | Acc]);
add_duration_field([Alarm = #{activated := false, activate_at := ActivateAt, deactivate_at := DeactivateAt}| Rest], Now, Acc) -> add_duration_field( [Alarm = #{ activated := false
, activate_at := ActivateAt
, deactivate_at := DeactivateAt} | Rest]
, Now, Acc) ->
add_duration_field(Rest, Now, [Alarm#{duration => DeactivateAt - ActivateAt} | Acc]). add_duration_field(Rest, Now, [Alarm#{duration => DeactivateAt - ActivateAt} | Acc]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -611,12 +622,18 @@ check_row_limit(Tables) ->
check_row_limit([], _Limit) -> check_row_limit([], _Limit) ->
ok; ok;
check_row_limit([Tab|Tables], Limit) -> check_row_limit([Tab | Tables], Limit) ->
case table_size(Tab) > Limit of case table_size(Tab) > Limit of
true -> false; true -> false;
false -> check_row_limit(Tables, Limit) false -> check_row_limit(Tables, Limit)
end. end.
check_results(Results) ->
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
max_row_limit() -> max_row_limit() ->
?MAX_ROW_LIMIT. ?MAX_ROW_LIMIT.

View File

@ -505,8 +505,12 @@ lookup(#{clientid := ClientID}) ->
end. end.
kickout(#{clientid := ClientID}) -> kickout(#{clientid := ClientID}) ->
emqx_mgmt:kickout_client(ClientID), case emqx_mgmt:kickout_client({ClientID, ?FORMAT_FUN}) of
{204}. {error, not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
_ ->
{204}
end.
get_authz_cache(#{clientid := ClientID})-> get_authz_cache(#{clientid := ClientID})->
case emqx_mgmt:list_authz_cache(ClientID) of case emqx_mgmt:list_authz_cache(ClientID) of