chore: move rpc call from emqx_management_proto to emqx_conf_proto
This commit is contained in:
parent
8a0f7bfd99
commit
402f75592e
|
@ -12,6 +12,7 @@
|
||||||
{emqx_cm,2}.
|
{emqx_cm,2}.
|
||||||
{emqx_conf,1}.
|
{emqx_conf,1}.
|
||||||
{emqx_conf,2}.
|
{emqx_conf,2}.
|
||||||
|
{emqx_conf,3}.
|
||||||
{emqx_dashboard,1}.
|
{emqx_dashboard,1}.
|
||||||
{emqx_delayed,1}.
|
{emqx_delayed,1}.
|
||||||
{emqx_eviction_agent,1}.
|
{emqx_eviction_agent,1}.
|
||||||
|
@ -28,7 +29,6 @@
|
||||||
{emqx_management,2}.
|
{emqx_management,2}.
|
||||||
{emqx_management,3}.
|
{emqx_management,3}.
|
||||||
{emqx_management,4}.
|
{emqx_management,4}.
|
||||||
{emqx_management,5}.
|
|
||||||
{emqx_metrics,1}.
|
{emqx_metrics,1}.
|
||||||
{emqx_mgmt_api_plugins,1}.
|
{emqx_mgmt_api_plugins,1}.
|
||||||
{emqx_mgmt_api_plugins,2}.
|
{emqx_mgmt_api_plugins,2}.
|
||||||
|
|
|
@ -71,7 +71,7 @@ get_raw(KeyPath) ->
|
||||||
%% @doc Returns all values in the cluster.
|
%% @doc Returns all values in the cluster.
|
||||||
-spec get_all(emqx_utils_maps:config_key_path()) -> #{node() => term()}.
|
-spec get_all(emqx_utils_maps:config_key_path()) -> #{node() => term()}.
|
||||||
get_all(KeyPath) ->
|
get_all(KeyPath) ->
|
||||||
{ResL, []} = emqx_conf_proto_v2:get_all(KeyPath),
|
{ResL, []} = emqx_conf_proto_v3:get_all(KeyPath),
|
||||||
maps:from_list(ResL).
|
maps:from_list(ResL).
|
||||||
|
|
||||||
%% @doc Returns the specified node's KeyPath, or exception if not found
|
%% @doc Returns the specified node's KeyPath, or exception if not found
|
||||||
|
@ -79,14 +79,14 @@ get_all(KeyPath) ->
|
||||||
get_by_node(Node, KeyPath) when Node =:= node() ->
|
get_by_node(Node, KeyPath) when Node =:= node() ->
|
||||||
emqx:get_config(KeyPath);
|
emqx:get_config(KeyPath);
|
||||||
get_by_node(Node, KeyPath) ->
|
get_by_node(Node, KeyPath) ->
|
||||||
emqx_conf_proto_v2:get_config(Node, KeyPath).
|
emqx_conf_proto_v3:get_config(Node, KeyPath).
|
||||||
|
|
||||||
%% @doc Returns the specified node's KeyPath, or the default value if not found
|
%% @doc Returns the specified node's KeyPath, or the default value if not found
|
||||||
-spec get_by_node(node(), emqx_utils_maps:config_key_path(), term()) -> term().
|
-spec get_by_node(node(), emqx_utils_maps:config_key_path(), term()) -> term().
|
||||||
get_by_node(Node, KeyPath, Default) when Node =:= node() ->
|
get_by_node(Node, KeyPath, Default) when Node =:= node() ->
|
||||||
emqx:get_config(KeyPath, Default);
|
emqx:get_config(KeyPath, Default);
|
||||||
get_by_node(Node, KeyPath, Default) ->
|
get_by_node(Node, KeyPath, Default) ->
|
||||||
emqx_conf_proto_v2:get_config(Node, KeyPath, Default).
|
emqx_conf_proto_v3:get_config(Node, KeyPath, Default).
|
||||||
|
|
||||||
%% @doc Returns the specified node's KeyPath, or config_not_found if key path not found
|
%% @doc Returns the specified node's KeyPath, or config_not_found if key path not found
|
||||||
-spec get_node_and_config(emqx_utils_maps:config_key_path()) -> term().
|
-spec get_node_and_config(emqx_utils_maps:config_key_path()) -> term().
|
||||||
|
@ -101,7 +101,7 @@ get_node_and_config(KeyPath) ->
|
||||||
) ->
|
) ->
|
||||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||||
update(KeyPath, UpdateReq, Opts) ->
|
update(KeyPath, UpdateReq, Opts) ->
|
||||||
emqx_conf_proto_v2:update(KeyPath, UpdateReq, Opts).
|
emqx_conf_proto_v3:update(KeyPath, UpdateReq, Opts).
|
||||||
|
|
||||||
%% @doc Update the specified node's key path in local-override.conf.
|
%% @doc Update the specified node's key path in local-override.conf.
|
||||||
-spec update(
|
-spec update(
|
||||||
|
@ -114,7 +114,7 @@ update(KeyPath, UpdateReq, Opts) ->
|
||||||
update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() ->
|
update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() ->
|
||||||
emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local});
|
emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local});
|
||||||
update(Node, KeyPath, UpdateReq, Opts) ->
|
update(Node, KeyPath, UpdateReq, Opts) ->
|
||||||
emqx_conf_proto_v2:update(Node, KeyPath, UpdateReq, Opts).
|
emqx_conf_proto_v3:update(Node, KeyPath, UpdateReq, Opts).
|
||||||
|
|
||||||
%% @doc Mark the specified key path as tombstone
|
%% @doc Mark the specified key path as tombstone
|
||||||
tombstone(KeyPath, Opts) ->
|
tombstone(KeyPath, Opts) ->
|
||||||
|
@ -124,7 +124,7 @@ tombstone(KeyPath, Opts) ->
|
||||||
-spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
-spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
||||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||||
remove(KeyPath, Opts) ->
|
remove(KeyPath, Opts) ->
|
||||||
emqx_conf_proto_v2:remove_config(KeyPath, Opts).
|
emqx_conf_proto_v3:remove_config(KeyPath, Opts).
|
||||||
|
|
||||||
%% @doc remove the specified node's key path in local-override.conf.
|
%% @doc remove the specified node's key path in local-override.conf.
|
||||||
-spec remove(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
-spec remove(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
||||||
|
@ -132,13 +132,13 @@ remove(KeyPath, Opts) ->
|
||||||
remove(Node, KeyPath, Opts) when Node =:= node() ->
|
remove(Node, KeyPath, Opts) when Node =:= node() ->
|
||||||
emqx:remove_config(KeyPath, Opts#{override_to => local});
|
emqx:remove_config(KeyPath, Opts#{override_to => local});
|
||||||
remove(Node, KeyPath, Opts) ->
|
remove(Node, KeyPath, Opts) ->
|
||||||
emqx_conf_proto_v2:remove_config(Node, KeyPath, Opts).
|
emqx_conf_proto_v3:remove_config(Node, KeyPath, Opts).
|
||||||
|
|
||||||
%% @doc reset all value of key path in cluster-override.conf or local-override.conf.
|
%% @doc reset all value of key path in cluster-override.conf or local-override.conf.
|
||||||
-spec reset(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
-spec reset(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
||||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||||
reset(KeyPath, Opts) ->
|
reset(KeyPath, Opts) ->
|
||||||
emqx_conf_proto_v2:reset(KeyPath, Opts).
|
emqx_conf_proto_v3:reset(KeyPath, Opts).
|
||||||
|
|
||||||
%% @doc reset the specified node's key path in local-override.conf.
|
%% @doc reset the specified node's key path in local-override.conf.
|
||||||
-spec reset(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
-spec reset(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
|
||||||
|
@ -146,7 +146,7 @@ reset(KeyPath, Opts) ->
|
||||||
reset(Node, KeyPath, Opts) when Node =:= node() ->
|
reset(Node, KeyPath, Opts) when Node =:= node() ->
|
||||||
emqx:reset_config(KeyPath, Opts#{override_to => local});
|
emqx:reset_config(KeyPath, Opts#{override_to => local});
|
||||||
reset(Node, KeyPath, Opts) ->
|
reset(Node, KeyPath, Opts) ->
|
||||||
emqx_conf_proto_v2:reset(Node, KeyPath, Opts).
|
emqx_conf_proto_v3:reset(Node, KeyPath, Opts).
|
||||||
|
|
||||||
%% @doc Called from build script.
|
%% @doc Called from build script.
|
||||||
%% TODO: move to a external escript after all refactoring is done
|
%% TODO: move to a external escript after all refactoring is done
|
||||||
|
|
|
@ -137,7 +137,7 @@ sync_cluster_conf() ->
|
||||||
|
|
||||||
%% @private Some core nodes are running, try to sync the cluster config from them.
|
%% @private Some core nodes are running, try to sync the cluster config from them.
|
||||||
sync_cluster_conf2(Nodes) ->
|
sync_cluster_conf2(Nodes) ->
|
||||||
{Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes),
|
{Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes),
|
||||||
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
|
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
|
||||||
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
|
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
|
||||||
case (Failed =/= [] orelse NotReady =/= []) of
|
case (Failed =/= [] orelse NotReady =/= []) of
|
||||||
|
@ -284,7 +284,7 @@ conf_sort({ok, _}, {ok, _}) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
sync_data_from_node(Node) ->
|
sync_data_from_node(Node) ->
|
||||||
case emqx_conf_proto_v2:sync_data_from_node(Node) of
|
case emqx_conf_proto_v3:sync_data_from_node(Node) of
|
||||||
{ok, DataBin} ->
|
{ok, DataBin} ->
|
||||||
case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of
|
case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of
|
||||||
{ok, []} ->
|
{ok, []} ->
|
||||||
|
|
|
@ -0,0 +1,119 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_conf_proto_v3).
|
||||||
|
|
||||||
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
introduced_in/0,
|
||||||
|
sync_data_from_node/1,
|
||||||
|
get_config/2,
|
||||||
|
get_config/3,
|
||||||
|
get_all/1,
|
||||||
|
|
||||||
|
update/3,
|
||||||
|
update/4,
|
||||||
|
remove_config/2,
|
||||||
|
remove_config/3,
|
||||||
|
|
||||||
|
reset/2,
|
||||||
|
reset/3,
|
||||||
|
|
||||||
|
get_override_config_file/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([get_hocon_config/1, get_hocon_config/2]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.1.1".
|
||||||
|
|
||||||
|
-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc().
|
||||||
|
sync_data_from_node(Node) ->
|
||||||
|
rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000).
|
||||||
|
-type update_config_key_path() :: [emqx_utils_maps:config_key(), ...].
|
||||||
|
|
||||||
|
-spec get_config(node(), emqx_utils_maps:config_key_path()) ->
|
||||||
|
term() | emqx_rpc:badrpc().
|
||||||
|
get_config(Node, KeyPath) ->
|
||||||
|
rpc:call(Node, emqx, get_config, [KeyPath]).
|
||||||
|
|
||||||
|
-spec get_config(node(), emqx_utils_maps:config_key_path(), _Default) ->
|
||||||
|
term() | emqx_rpc:badrpc().
|
||||||
|
get_config(Node, KeyPath, Default) ->
|
||||||
|
rpc:call(Node, emqx, get_config, [KeyPath, Default]).
|
||||||
|
|
||||||
|
-spec get_all(emqx_utils_maps:config_key_path()) -> emqx_rpc:multicall_result().
|
||||||
|
get_all(KeyPath) ->
|
||||||
|
rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000).
|
||||||
|
|
||||||
|
-spec update(
|
||||||
|
update_config_key_path(),
|
||||||
|
emqx_config:update_request(),
|
||||||
|
emqx_config:update_opts()
|
||||||
|
) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||||
|
update(KeyPath, UpdateReq, Opts) ->
|
||||||
|
emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]).
|
||||||
|
|
||||||
|
-spec update(
|
||||||
|
node(),
|
||||||
|
update_config_key_path(),
|
||||||
|
emqx_config:update_request(),
|
||||||
|
emqx_config:update_opts()
|
||||||
|
) ->
|
||||||
|
{ok, emqx_config:update_result()}
|
||||||
|
| {error, emqx_config:update_error()}
|
||||||
|
| emqx_rpc:badrpc().
|
||||||
|
update(Node, KeyPath, UpdateReq, Opts) ->
|
||||||
|
rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
|
||||||
|
|
||||||
|
-spec remove_config(update_config_key_path(), emqx_config:update_opts()) ->
|
||||||
|
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||||
|
remove_config(KeyPath, Opts) ->
|
||||||
|
emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
|
||||||
|
|
||||||
|
-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) ->
|
||||||
|
{ok, emqx_config:update_result()}
|
||||||
|
| {error, emqx_config:update_error()}
|
||||||
|
| emqx_rpc:badrpc().
|
||||||
|
remove_config(Node, KeyPath, Opts) ->
|
||||||
|
rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000).
|
||||||
|
|
||||||
|
-spec reset(update_config_key_path(), emqx_config:update_opts()) ->
|
||||||
|
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||||
|
reset(KeyPath, Opts) ->
|
||||||
|
emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]).
|
||||||
|
|
||||||
|
-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) ->
|
||||||
|
{ok, emqx_config:update_result()}
|
||||||
|
| {error, emqx_config:update_error()}
|
||||||
|
| emqx_rpc:badrpc().
|
||||||
|
reset(Node, KeyPath, Opts) ->
|
||||||
|
rpc:call(Node, emqx, reset_config, [KeyPath, Opts]).
|
||||||
|
|
||||||
|
-spec get_override_config_file([node()]) -> emqx_rpc:multicall_result().
|
||||||
|
get_override_config_file(Nodes) ->
|
||||||
|
rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000).
|
||||||
|
|
||||||
|
-spec get_hocon_config(node()) -> map() | {badrpc, _}.
|
||||||
|
get_hocon_config(Node) ->
|
||||||
|
rpc:call(Node, emqx_conf_cli, get_config, []).
|
||||||
|
|
||||||
|
-spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}.
|
||||||
|
get_hocon_config(Node, Key) ->
|
||||||
|
rpc:call(Node, emqx_conf_cli, get_config, [Key]).
|
|
@ -179,7 +179,7 @@ get_sys_memory() ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
node_info(Nodes) ->
|
node_info(Nodes) ->
|
||||||
emqx_rpc:unwrap_erpc(emqx_management_proto_v5:node_info(Nodes)).
|
emqx_rpc:unwrap_erpc(emqx_management_proto_v4:node_info(Nodes)).
|
||||||
|
|
||||||
stopped_node_info(Node) ->
|
stopped_node_info(Node) ->
|
||||||
{Node, #{node => Node, node_status => 'stopped', role => core}}.
|
{Node, #{node => Node, node_status => 'stopped', role => core}}.
|
||||||
|
@ -223,7 +223,7 @@ convert_broker_info({K, V}, M) ->
|
||||||
M#{K => iolist_to_binary(V)}.
|
M#{K => iolist_to_binary(V)}.
|
||||||
|
|
||||||
broker_info(Nodes) ->
|
broker_info(Nodes) ->
|
||||||
emqx_rpc:unwrap_erpc(emqx_management_proto_v5:broker_info(Nodes)).
|
emqx_rpc:unwrap_erpc(emqx_management_proto_v4:broker_info(Nodes)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Metrics and Stats
|
%% Metrics and Stats
|
||||||
|
@ -330,7 +330,7 @@ kickout_client(Node, ClientId) ->
|
||||||
|
|
||||||
kickout_clients(ClientIds) when is_list(ClientIds) ->
|
kickout_clients(ClientIds) when is_list(ClientIds) ->
|
||||||
F = fun(Node) ->
|
F = fun(Node) ->
|
||||||
emqx_management_proto_v5:kickout_clients(Node, ClientIds)
|
emqx_management_proto_v4:kickout_clients(Node, ClientIds)
|
||||||
end,
|
end,
|
||||||
Results = lists:map(F, emqx:running_nodes()),
|
Results = lists:map(F, emqx:running_nodes()),
|
||||||
case lists:filter(fun(Res) -> Res =/= ok end, Results) of
|
case lists:filter(fun(Res) -> Res =/= ok end, Results) of
|
||||||
|
@ -446,7 +446,7 @@ do_call_client(ClientId, Req) ->
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
call_client(Node, ClientId, Req) ->
|
call_client(Node, ClientId, Req) ->
|
||||||
unwrap_rpc(emqx_management_proto_v5:call_client(Node, ClientId, Req)).
|
unwrap_rpc(emqx_management_proto_v4:call_client(Node, ClientId, Req)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Subscriptions
|
%% Subscriptions
|
||||||
|
@ -459,7 +459,7 @@ do_list_subscriptions() ->
|
||||||
throw(not_implemented).
|
throw(not_implemented).
|
||||||
|
|
||||||
list_subscriptions(Node) ->
|
list_subscriptions(Node) ->
|
||||||
unwrap_rpc(emqx_management_proto_v5:list_subscriptions(Node)).
|
unwrap_rpc(emqx_management_proto_v4:list_subscriptions(Node)).
|
||||||
|
|
||||||
list_subscriptions_via_topic(Topic, FormatFun) ->
|
list_subscriptions_via_topic(Topic, FormatFun) ->
|
||||||
lists:append([
|
lists:append([
|
||||||
|
@ -481,7 +481,7 @@ subscribe(ClientId, TopicTables) ->
|
||||||
subscribe(emqx:running_nodes(), ClientId, TopicTables).
|
subscribe(emqx:running_nodes(), ClientId, TopicTables).
|
||||||
|
|
||||||
subscribe([Node | Nodes], ClientId, TopicTables) ->
|
subscribe([Node | Nodes], ClientId, TopicTables) ->
|
||||||
case unwrap_rpc(emqx_management_proto_v5:subscribe(Node, ClientId, TopicTables)) of
|
case unwrap_rpc(emqx_management_proto_v4:subscribe(Node, ClientId, TopicTables)) of
|
||||||
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
|
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
|
||||||
{subscribe, Res} -> {subscribe, Res, Node}
|
{subscribe, Res} -> {subscribe, Res, Node}
|
||||||
end;
|
end;
|
||||||
|
@ -508,7 +508,7 @@ unsubscribe(ClientId, Topic) ->
|
||||||
-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
|
-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
|
||||||
{unsubscribe, _} | {error, channel_not_found}.
|
{unsubscribe, _} | {error, channel_not_found}.
|
||||||
unsubscribe([Node | Nodes], ClientId, Topic) ->
|
unsubscribe([Node | Nodes], ClientId, Topic) ->
|
||||||
case unwrap_rpc(emqx_management_proto_v5:unsubscribe(Node, ClientId, Topic)) of
|
case unwrap_rpc(emqx_management_proto_v4:unsubscribe(Node, ClientId, Topic)) of
|
||||||
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
|
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
|
||||||
Re -> Re
|
Re -> Re
|
||||||
end;
|
end;
|
||||||
|
@ -531,7 +531,7 @@ unsubscribe_batch(ClientId, Topics) ->
|
||||||
-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
|
-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
|
||||||
{unsubscribe_batch, _} | {error, channel_not_found}.
|
{unsubscribe_batch, _} | {error, channel_not_found}.
|
||||||
unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
|
unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
|
||||||
case unwrap_rpc(emqx_management_proto_v5:unsubscribe_batch(Node, ClientId, Topics)) of
|
case unwrap_rpc(emqx_management_proto_v4:unsubscribe_batch(Node, ClientId, Topics)) of
|
||||||
{error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
|
{error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
|
||||||
Re -> Re
|
Re -> Re
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -372,11 +372,12 @@ find_suitable_accept(Headers, Preferences) when is_list(Preferences), length(Pre
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% Should deprecated json v1 since 5.2.0
|
||||||
get_configs_v1(QueryStr) ->
|
get_configs_v1(QueryStr) ->
|
||||||
Node = maps:get(<<"node">>, QueryStr, node()),
|
Node = maps:get(<<"node">>, QueryStr, node()),
|
||||||
case
|
case
|
||||||
lists:member(Node, emqx:running_nodes()) andalso
|
lists:member(Node, emqx:running_nodes()) andalso
|
||||||
emqx_management_proto_v5:get_full_config(Node)
|
emqx_management_proto_v4:get_full_config(Node)
|
||||||
of
|
of
|
||||||
false ->
|
false ->
|
||||||
Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])),
|
Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])),
|
||||||
|
@ -393,9 +394,9 @@ get_configs_v2(QueryStr) ->
|
||||||
Conf =
|
Conf =
|
||||||
case maps:find(<<"key">>, QueryStr) of
|
case maps:find(<<"key">>, QueryStr) of
|
||||||
error ->
|
error ->
|
||||||
emqx_management_proto_v5:get_full_config_v2(Node);
|
emqx_conf_proto_v3:get_hocon_config(Node);
|
||||||
{ok, Key} ->
|
{ok, Key} ->
|
||||||
emqx_management_proto_v5:get_config_v2(Node, atom_to_binary(Key))
|
emqx_conf_proto_v3:get_hocon_config(Node, atom_to_binary(Key))
|
||||||
end,
|
end,
|
||||||
{
|
{
|
||||||
200,
|
200,
|
||||||
|
|
|
@ -515,7 +515,7 @@ list_listeners() ->
|
||||||
lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]).
|
lists:map(fun list_listeners/1, [Self | lists:delete(Self, emqx:running_nodes())]).
|
||||||
|
|
||||||
list_listeners(Node) ->
|
list_listeners(Node) ->
|
||||||
wrap_rpc(emqx_management_proto_v5:list_listeners(Node)).
|
wrap_rpc(emqx_management_proto_v4:list_listeners(Node)).
|
||||||
|
|
||||||
listener_status_by_id(NodeL) ->
|
listener_status_by_id(NodeL) ->
|
||||||
Listeners = maps:to_list(listener_status_by_id(NodeL, #{})),
|
Listeners = maps:to_list(listener_status_by_id(NodeL, #{})),
|
||||||
|
|
|
@ -1,96 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_management_proto_v5).
|
|
||||||
|
|
||||||
-behaviour(emqx_bpapi).
|
|
||||||
|
|
||||||
-export([
|
|
||||||
introduced_in/0,
|
|
||||||
|
|
||||||
node_info/1,
|
|
||||||
broker_info/1,
|
|
||||||
list_subscriptions/1,
|
|
||||||
|
|
||||||
list_listeners/1,
|
|
||||||
subscribe/3,
|
|
||||||
unsubscribe/3,
|
|
||||||
unsubscribe_batch/3,
|
|
||||||
|
|
||||||
call_client/3,
|
|
||||||
|
|
||||||
get_full_config/1,
|
|
||||||
get_full_config_v2/1,
|
|
||||||
get_config_v2/2,
|
|
||||||
|
|
||||||
kickout_clients/2
|
|
||||||
]).
|
|
||||||
|
|
||||||
-include_lib("emqx/include/bpapi.hrl").
|
|
||||||
|
|
||||||
introduced_in() ->
|
|
||||||
"5.1.1".
|
|
||||||
|
|
||||||
-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) ->
|
|
||||||
{unsubscribe, _} | {error, _} | {badrpc, _}.
|
|
||||||
unsubscribe_batch(Node, ClientId, Topics) ->
|
|
||||||
rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]).
|
|
||||||
|
|
||||||
-spec node_info([node()]) -> emqx_rpc:erpc_multicall(map()).
|
|
||||||
node_info(Nodes) ->
|
|
||||||
erpc:multicall(Nodes, emqx_mgmt, node_info, [], 30000).
|
|
||||||
|
|
||||||
-spec broker_info([node()]) -> emqx_rpc:erpc_multicall(map()).
|
|
||||||
broker_info(Nodes) ->
|
|
||||||
erpc:multicall(Nodes, emqx_mgmt, broker_info, [], 30000).
|
|
||||||
|
|
||||||
-spec list_subscriptions(node()) -> [map()] | {badrpc, _}.
|
|
||||||
list_subscriptions(Node) ->
|
|
||||||
rpc:call(Node, emqx_mgmt, do_list_subscriptions, []).
|
|
||||||
|
|
||||||
-spec list_listeners(node()) -> map() | {badrpc, _}.
|
|
||||||
list_listeners(Node) ->
|
|
||||||
rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
|
|
||||||
|
|
||||||
-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
|
|
||||||
{subscribe, _} | {error, atom()} | {badrpc, _}.
|
|
||||||
subscribe(Node, ClientId, TopicTables) ->
|
|
||||||
rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]).
|
|
||||||
|
|
||||||
-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) ->
|
|
||||||
{unsubscribe, _} | {error, _} | {badrpc, _}.
|
|
||||||
unsubscribe(Node, ClientId, Topic) ->
|
|
||||||
rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]).
|
|
||||||
|
|
||||||
-spec call_client(node(), emqx_types:clientid(), term()) -> term().
|
|
||||||
call_client(Node, ClientId, Req) ->
|
|
||||||
rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]).
|
|
||||||
|
|
||||||
-spec get_full_config(node()) -> map() | list() | {badrpc, _}.
|
|
||||||
get_full_config(Node) ->
|
|
||||||
rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []).
|
|
||||||
|
|
||||||
-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}.
|
|
||||||
kickout_clients(Node, ClientIds) ->
|
|
||||||
rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]).
|
|
||||||
|
|
||||||
-spec get_full_config_v2(node()) -> map() | {badrpc, _}.
|
|
||||||
get_full_config_v2(Node) ->
|
|
||||||
rpc:call(Node, emqx_conf_cli, get_config, []).
|
|
||||||
|
|
||||||
-spec get_config_v2(node(), binary()) -> map() | {badrpc, _}.
|
|
||||||
get_config_v2(Node, Key) ->
|
|
||||||
rpc:call(Node, emqx_conf_cli, get_config, [Key]).
|
|
|
@ -273,7 +273,7 @@ t_configs_node({'init', Config}) ->
|
||||||
Node = node(),
|
Node = node(),
|
||||||
meck:expect(emqx, running_nodes, fun() -> [Node, bad_node, other_node] end),
|
meck:expect(emqx, running_nodes, fun() -> [Node, bad_node, other_node] end),
|
||||||
meck:expect(
|
meck:expect(
|
||||||
emqx_management_proto_v5,
|
emqx_management_proto_v4,
|
||||||
get_full_config,
|
get_full_config,
|
||||||
fun
|
fun
|
||||||
(Node0) when Node0 =:= Node -> <<"\"self\"">>;
|
(Node0) when Node0 =:= Node -> <<"\"self\"">>;
|
||||||
|
@ -283,7 +283,7 @@ t_configs_node({'init', Config}) ->
|
||||||
),
|
),
|
||||||
Config;
|
Config;
|
||||||
t_configs_node({'end', _}) ->
|
t_configs_node({'end', _}) ->
|
||||||
meck:unload([emqx, emqx_management_proto_v5]);
|
meck:unload([emqx, emqx_management_proto_v4]);
|
||||||
t_configs_node(_) ->
|
t_configs_node(_) ->
|
||||||
Node = atom_to_list(node()),
|
Node = atom_to_list(node()),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue