fix: get_config return target node's node conf
This commit is contained in:
parent
a9191f3c33
commit
59ed65787f
|
@ -179,7 +179,7 @@ get_sys_memory() ->
|
|||
end.
|
||||
|
||||
node_info(Nodes) ->
|
||||
emqx_rpc:unwrap_erpc(emqx_management_proto_v3:node_info(Nodes)).
|
||||
emqx_rpc:unwrap_erpc(emqx_management_proto_v5:node_info(Nodes)).
|
||||
|
||||
stopped_node_info(Node) ->
|
||||
{Node, #{node => Node, node_status => 'stopped', role => core}}.
|
||||
|
@ -223,7 +223,7 @@ convert_broker_info({K, V}, M) ->
|
|||
M#{K => iolist_to_binary(V)}.
|
||||
|
||||
broker_info(Nodes) ->
|
||||
emqx_rpc:unwrap_erpc(emqx_management_proto_v3:broker_info(Nodes)).
|
||||
emqx_rpc:unwrap_erpc(emqx_management_proto_v5:broker_info(Nodes)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Metrics and Stats
|
||||
|
@ -330,7 +330,7 @@ kickout_client(Node, ClientId) ->
|
|||
|
||||
kickout_clients(ClientIds) when is_list(ClientIds) ->
|
||||
F = fun(Node) ->
|
||||
emqx_management_proto_v4:kickout_clients(Node, ClientIds)
|
||||
emqx_management_proto_v5:kickout_clients(Node, ClientIds)
|
||||
end,
|
||||
Results = lists:map(F, emqx:running_nodes()),
|
||||
case lists:filter(fun(Res) -> Res =/= ok end, Results) of
|
||||
|
@ -446,7 +446,7 @@ do_call_client(ClientId, Req) ->
|
|||
|
||||
%% @private
|
||||
call_client(Node, ClientId, Req) ->
|
||||
unwrap_rpc(emqx_management_proto_v3:call_client(Node, ClientId, Req)).
|
||||
unwrap_rpc(emqx_management_proto_v5:call_client(Node, ClientId, Req)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Subscriptions
|
||||
|
@ -459,7 +459,7 @@ do_list_subscriptions() ->
|
|||
throw(not_implemented).
|
||||
|
||||
list_subscriptions(Node) ->
|
||||
unwrap_rpc(emqx_management_proto_v3:list_subscriptions(Node)).
|
||||
unwrap_rpc(emqx_management_proto_v5:list_subscriptions(Node)).
|
||||
|
||||
list_subscriptions_via_topic(Topic, FormatFun) ->
|
||||
lists:append([
|
||||
|
@ -481,7 +481,7 @@ subscribe(ClientId, TopicTables) ->
|
|||
subscribe(emqx:running_nodes(), ClientId, TopicTables).
|
||||
|
||||
subscribe([Node | Nodes], ClientId, TopicTables) ->
|
||||
case unwrap_rpc(emqx_management_proto_v3:subscribe(Node, ClientId, TopicTables)) of
|
||||
case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of
|
||||
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
|
||||
{subscribe, Res} -> {subscribe, Res, Node}
|
||||
end;
|
||||
|
@ -508,7 +508,7 @@ unsubscribe(ClientId, Topic) ->
|
|||
-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
|
||||
{unsubscribe, _} | {error, channel_not_found}.
|
||||
unsubscribe([Node | Nodes], ClientId, Topic) ->
|
||||
case unwrap_rpc(emqx_management_proto_v3:unsubscribe(Node, ClientId, Topic)) of
|
||||
case unwrap_rpc(emqx_management_proto_v5:unsubscribe(Node, ClientId, Topic)) of
|
||||
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
|
||||
Re -> Re
|
||||
end;
|
||||
|
@ -531,7 +531,7 @@ unsubscribe_batch(ClientId, Topics) ->
|
|||
-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
|
||||
{unsubscribe_batch, _} | {error, channel_not_found}.
|
||||
unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
|
||||
case unwrap_rpc(emqx_management_proto_v3:unsubscribe_batch(Node, ClientId, Topics)) of
|
||||
case unwrap_rpc(emqx_management_proto_v5:unsubscribe_batch(Node, ClientId, Topics)) of
|
||||
{error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
|
||||
Re -> Re
|
||||
end;
|
||||
|
|
|
@ -350,7 +350,7 @@ configs(put, #{body := Conf, query_string := #{<<"mode">> := Mode}}, _Req) ->
|
|||
{error, Errors} -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Errors)}}
|
||||
end.
|
||||
|
||||
find_suitable_accept(Headers, Perferences) when is_list(Perferences), length(Perferences) > 0 ->
|
||||
find_suitable_accept(Headers, Preferences) when is_list(Preferences), length(Preferences) > 0 ->
|
||||
AcceptVal = maps:get(<<"accept">>, Headers, <<"*/*">>),
|
||||
%% Multiple types, weighted with the quality value syntax:
|
||||
%% Accept: text/html, application/xhtml+xml, application/xml;q=0.9, image/webp, */*;q=0.8
|
||||
|
@ -363,11 +363,11 @@ find_suitable_accept(Headers, Perferences) when is_list(Perferences), length(Per
|
|||
),
|
||||
case lists:member(<<"*/*">>, Accepts) of
|
||||
true ->
|
||||
{ok, lists:nth(1, Perferences)};
|
||||
{ok, lists:nth(1, Preferences)};
|
||||
false ->
|
||||
Found = lists:filter(fun(Accept) -> lists:member(Accept, Accepts) end, Perferences),
|
||||
Found = lists:filter(fun(Accept) -> lists:member(Accept, Accepts) end, Preferences),
|
||||
case Found of
|
||||
[] -> {error, no_suitalbe_accept};
|
||||
[] -> {error, no_suitable_accept};
|
||||
_ -> {ok, lists:nth(1, Found)}
|
||||
end
|
||||
end.
|
||||
|
@ -376,7 +376,7 @@ get_configs_v1(QueryStr) ->
|
|||
Node = maps:get(<<"node">>, QueryStr, node()),
|
||||
case
|
||||
lists:member(Node, emqx:running_nodes()) andalso
|
||||
emqx_management_proto_v2:get_full_config(Node)
|
||||
emqx_management_proto_v5:get_full_config(Node)
|
||||
of
|
||||
false ->
|
||||
Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])),
|
||||
|
@ -389,10 +389,13 @@ get_configs_v1(QueryStr) ->
|
|||
end.
|
||||
|
||||
get_configs_v2(QueryStr) ->
|
||||
Node = maps:get(<<"node">>, QueryStr, node()),
|
||||
Conf =
|
||||
case maps:find(<<"key">>, QueryStr) of
|
||||
error -> emqx_conf_cli:get_config();
|
||||
{ok, Key} -> emqx_conf_cli:get_config(atom_to_binary(Key))
|
||||
error ->
|
||||
emqx_management_proto_v5:get_full_config_v2(Node);
|
||||
{ok, Key} ->
|
||||
emqx_management_proto_v5:get_config_v2(Node, atom_to_binary(Key))
|
||||
end,
|
||||
{
|
||||
200,
|
||||
|
|
|
@ -515,7 +515,7 @@ list_listeners() ->
|
|||
lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]).
|
||||
|
||||
list_listeners(Node) ->
|
||||
wrap_rpc(emqx_management_proto_v2:list_listeners(Node)).
|
||||
wrap_rpc(emqx_management_proto_v5:list_listeners(Node)).
|
||||
|
||||
listener_status_by_id(NodeL) ->
|
||||
Listeners = maps:to_list(listener_status_by_id(NodeL, #{})),
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_management_proto_v5).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([
|
||||
introduced_in/0,
|
||||
|
||||
node_info/1,
|
||||
broker_info/1,
|
||||
list_subscriptions/1,
|
||||
|
||||
list_listeners/1,
|
||||
subscribe/3,
|
||||
unsubscribe/3,
|
||||
unsubscribe_batch/3,
|
||||
|
||||
call_client/3,
|
||||
|
||||
get_full_config/1,
|
||||
get_full_config_v2/1,
|
||||
get_config_v2/2,
|
||||
|
||||
kickout_clients/2
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.1.1".
|
||||
|
||||
-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) ->
|
||||
{unsubscribe, _} | {error, _} | {badrpc, _}.
|
||||
unsubscribe_batch(Node, ClientId, Topics) ->
|
||||
rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]).
|
||||
|
||||
-spec node_info([node()]) -> emqx_rpc:erpc_multicall(map()).
|
||||
node_info(Nodes) ->
|
||||
erpc:multicall(Nodes, emqx_mgmt, node_info, [], 30000).
|
||||
|
||||
-spec broker_info([node()]) -> emqx_rpc:erpc_multicall(map()).
|
||||
broker_info(Nodes) ->
|
||||
erpc:multicall(Nodes, emqx_mgmt, broker_info, [], 30000).
|
||||
|
||||
-spec list_subscriptions(node()) -> [map()] | {badrpc, _}.
|
||||
list_subscriptions(Node) ->
|
||||
rpc:call(Node, emqx_mgmt, do_list_subscriptions, []).
|
||||
|
||||
-spec list_listeners(node()) -> map() | {badrpc, _}.
|
||||
list_listeners(Node) ->
|
||||
rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
|
||||
|
||||
-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
|
||||
{subscribe, _} | {error, atom()} | {badrpc, _}.
|
||||
subscribe(Node, ClientId, TopicTables) ->
|
||||
rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]).
|
||||
|
||||
-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) ->
|
||||
{unsubscribe, _} | {error, _} | {badrpc, _}.
|
||||
unsubscribe(Node, ClientId, Topic) ->
|
||||
rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]).
|
||||
|
||||
-spec call_client(node(), emqx_types:clientid(), term()) -> term().
|
||||
call_client(Node, ClientId, Req) ->
|
||||
rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]).
|
||||
|
||||
-spec get_full_config(node()) -> map() | list() | {badrpc, _}.
|
||||
get_full_config(Node) ->
|
||||
rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []).
|
||||
|
||||
-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}.
|
||||
kickout_clients(Node, ClientIds) ->
|
||||
rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]).
|
||||
|
||||
-spec get_full_config_v2(node()) -> map() | {badrpc, _}.
|
||||
get_full_config_v2(Node) ->
|
||||
rpc:call(Node, emqx_conf_cli, get_config, []).
|
||||
|
||||
-spec get_config_v2(node(), binary()) -> map() | {badrpc, _}.
|
||||
get_config_v2(Node, Key) ->
|
||||
rpc:call(Node, emqx_conf_cli, get_config, [Key]).
|
|
@ -273,7 +273,7 @@ t_configs_node({'init', Config}) ->
|
|||
Node = node(),
|
||||
meck:expect(emqx, running_nodes, fun() -> [Node, bad_node, other_node] end),
|
||||
meck:expect(
|
||||
emqx_management_proto_v2,
|
||||
emqx_management_proto_v5,
|
||||
get_full_config,
|
||||
fun
|
||||
(Node0) when Node0 =:= Node -> <<"\"self\"">>;
|
||||
|
@ -283,7 +283,7 @@ t_configs_node({'init', Config}) ->
|
|||
),
|
||||
Config;
|
||||
t_configs_node({'end', _}) ->
|
||||
meck:unload([emqx, emqx_management_proto_v2]);
|
||||
meck:unload([emqx, emqx_management_proto_v5]);
|
||||
t_configs_node(_) ->
|
||||
Node = atom_to_list(node()),
|
||||
|
||||
|
|
Loading…
Reference in New Issue