Merge pull request #10913 from thalesmg/fix-plugin-proto-multicall-v50
fix(plugins): use `emqx:running_nodes` for multicall operations
This commit is contained in:
commit
2d7c1da901
|
@ -29,6 +29,7 @@
|
||||||
{emqx_management,3}.
|
{emqx_management,3}.
|
||||||
{emqx_management,4}.
|
{emqx_management,4}.
|
||||||
{emqx_mgmt_api_plugins,1}.
|
{emqx_mgmt_api_plugins,1}.
|
||||||
|
{emqx_mgmt_api_plugins,2}.
|
||||||
{emqx_mgmt_cluster,1}.
|
{emqx_mgmt_cluster,1}.
|
||||||
{emqx_mgmt_trace,1}.
|
{emqx_mgmt_trace,1}.
|
||||||
{emqx_mgmt_trace,2}.
|
{emqx_mgmt_trace,2}.
|
||||||
|
|
|
@ -33,8 +33,9 @@ init_per_suite(Config) ->
|
||||||
_ = emqx_mgmt_api_test_util:init_suite([emqx_conf]),
|
_ = emqx_mgmt_api_test_util:init_suite([emqx_conf]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_mgmt_api_test_util:end_suite([emqx_conf]).
|
emqx_mgmt_api_test_util:end_suite([emqx_conf]),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_object(_Config) ->
|
t_object(_Config) ->
|
||||||
Spec = #{
|
Spec = #{
|
||||||
|
|
|
@ -322,7 +322,8 @@ validate_name(Name) ->
|
||||||
|
|
||||||
%% API CallBack Begin
|
%% API CallBack Begin
|
||||||
list_plugins(get, _) ->
|
list_plugins(get, _) ->
|
||||||
{Plugins, []} = emqx_mgmt_api_plugins_proto_v1:get_plugins(),
|
Nodes = emqx:running_nodes(),
|
||||||
|
{Plugins, []} = emqx_mgmt_api_plugins_proto_v2:get_plugins(Nodes),
|
||||||
{200, format_plugins(Plugins)}.
|
{200, format_plugins(Plugins)}.
|
||||||
|
|
||||||
get_plugins() ->
|
get_plugins() ->
|
||||||
|
@ -373,7 +374,8 @@ upload_install(post, #{}) ->
|
||||||
|
|
||||||
do_install_package(FileName, Bin) ->
|
do_install_package(FileName, Bin) ->
|
||||||
%% TODO: handle bad nodes
|
%% TODO: handle bad nodes
|
||||||
{[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin),
|
Nodes = emqx:running_nodes(),
|
||||||
|
{[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v2:install_package(Nodes, FileName, Bin),
|
||||||
case lists:filter(fun(R) -> R =/= ok end, Res) of
|
case lists:filter(fun(R) -> R =/= ok end, Res) of
|
||||||
[] ->
|
[] ->
|
||||||
{200};
|
{200};
|
||||||
|
@ -386,7 +388,11 @@ do_install_package(FileName, Bin) ->
|
||||||
end,
|
end,
|
||||||
Filtered
|
Filtered
|
||||||
),
|
),
|
||||||
{error, #{error := Reason}} = hd(Filtered),
|
Reason =
|
||||||
|
case hd(Filtered) of
|
||||||
|
{error, #{error := Reason0}} -> Reason0;
|
||||||
|
{error, #{reason := Reason0}} -> Reason0
|
||||||
|
end,
|
||||||
{400, #{
|
{400, #{
|
||||||
code => 'BAD_PLUGIN_INFO',
|
code => 'BAD_PLUGIN_INFO',
|
||||||
message => iolist_to_binary([Reason, ":", FileName])
|
message => iolist_to_binary([Reason, ":", FileName])
|
||||||
|
@ -394,17 +400,18 @@ do_install_package(FileName, Bin) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
plugin(get, #{bindings := #{name := Name}}) ->
|
plugin(get, #{bindings := #{name := Name}}) ->
|
||||||
{Plugins, _} = emqx_mgmt_api_plugins_proto_v1:describe_package(Name),
|
Nodes = emqx:running_nodes(),
|
||||||
|
{Plugins, _} = emqx_mgmt_api_plugins_proto_v2:describe_package(Nodes, Name),
|
||||||
case format_plugins(Plugins) of
|
case format_plugins(Plugins) of
|
||||||
[Plugin] -> {200, Plugin};
|
[Plugin] -> {200, Plugin};
|
||||||
[] -> {404, #{code => 'NOT_FOUND', message => Name}}
|
[] -> {404, #{code => 'NOT_FOUND', message => Name}}
|
||||||
end;
|
end;
|
||||||
plugin(delete, #{bindings := #{name := Name}}) ->
|
plugin(delete, #{bindings := #{name := Name}}) ->
|
||||||
Res = emqx_mgmt_api_plugins_proto_v1:delete_package(Name),
|
Res = emqx_mgmt_api_plugins_proto_v2:delete_package(Name),
|
||||||
return(204, Res).
|
return(204, Res).
|
||||||
|
|
||||||
update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
|
update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
|
||||||
Res = emqx_mgmt_api_plugins_proto_v1:ensure_action(Name, Action),
|
Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action),
|
||||||
return(204, Res).
|
return(204, Res).
|
||||||
|
|
||||||
update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
|
update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
introduced_in/0,
|
introduced_in/0,
|
||||||
|
deprecated_since/0,
|
||||||
|
|
||||||
get_plugins/0,
|
get_plugins/0,
|
||||||
install_package/2,
|
install_package/2,
|
||||||
describe_package/1,
|
describe_package/1,
|
||||||
|
@ -31,6 +33,9 @@
|
||||||
introduced_in() ->
|
introduced_in() ->
|
||||||
"5.0.0".
|
"5.0.0".
|
||||||
|
|
||||||
|
deprecated_since() ->
|
||||||
|
"5.1.0".
|
||||||
|
|
||||||
-spec get_plugins() -> emqx_rpc:multicall_result().
|
-spec get_plugins() -> emqx_rpc:multicall_result().
|
||||||
get_plugins() ->
|
get_plugins() ->
|
||||||
rpc:multicall(emqx_mgmt_api_plugins, get_plugins, [], 15000).
|
rpc:multicall(emqx_mgmt_api_plugins, get_plugins, [], 15000).
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_mgmt_api_plugins_proto_v2).
|
||||||
|
|
||||||
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
introduced_in/0,
|
||||||
|
get_plugins/1,
|
||||||
|
install_package/3,
|
||||||
|
describe_package/2,
|
||||||
|
delete_package/1,
|
||||||
|
ensure_action/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.1.0".
|
||||||
|
|
||||||
|
-spec get_plugins([node()]) -> emqx_rpc:multicall_result().
|
||||||
|
get_plugins(Nodes) ->
|
||||||
|
rpc:multicall(Nodes, emqx_mgmt_api_plugins, get_plugins, [], 15000).
|
||||||
|
|
||||||
|
-spec install_package([node()], binary() | string(), binary()) -> emqx_rpc:multicall_result().
|
||||||
|
install_package(Nodes, Filename, Bin) ->
|
||||||
|
rpc:multicall(Nodes, emqx_mgmt_api_plugins, install_package, [Filename, Bin], 25000).
|
||||||
|
|
||||||
|
-spec describe_package([node()], binary() | string()) -> emqx_rpc:multicall_result().
|
||||||
|
describe_package(Nodes, Name) ->
|
||||||
|
rpc:multicall(Nodes, emqx_mgmt_api_plugins, describe_package, [Name], 10000).
|
||||||
|
|
||||||
|
-spec delete_package(binary() | string()) -> ok | {error, any()}.
|
||||||
|
delete_package(Name) ->
|
||||||
|
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, delete_package, [Name], all, 10000).
|
||||||
|
|
||||||
|
-spec ensure_action(binary() | string(), 'restart' | 'start' | 'stop') -> ok | {error, any()}.
|
||||||
|
ensure_action(Name, Action) ->
|
||||||
|
emqx_cluster_rpc:multicall(emqx_mgmt_api_plugins, ensure_action, [Name, Action], all, 10000).
|
|
@ -47,7 +47,8 @@ groups() ->
|
||||||
[
|
[
|
||||||
{copy_plugin, [sequence], [
|
{copy_plugin, [sequence], [
|
||||||
group_t_copy_plugin_to_a_new_node,
|
group_t_copy_plugin_to_a_new_node,
|
||||||
group_t_copy_plugin_to_a_new_node_single_node
|
group_t_copy_plugin_to_a_new_node_single_node,
|
||||||
|
group_t_cluster_leave
|
||||||
]},
|
]},
|
||||||
{create_tar_copy_plugin, [sequence], [group_t_copy_plugin_to_a_new_node]}
|
{create_tar_copy_plugin, [sequence], [group_t_copy_plugin_to_a_new_node]}
|
||||||
].
|
].
|
||||||
|
@ -676,6 +677,86 @@ group_t_copy_plugin_to_a_new_node_single_node(Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
group_t_cluster_leave({init, Config}) ->
|
||||||
|
PrivDataDir = ?config(priv_dir, Config),
|
||||||
|
ToInstallDir = filename:join(PrivDataDir, "plugins_copy_to"),
|
||||||
|
file:del_dir_r(ToInstallDir),
|
||||||
|
ok = filelib:ensure_path(ToInstallDir),
|
||||||
|
#{package := Package, release_name := PluginName} = get_demo_plugin_package(ToInstallDir),
|
||||||
|
NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
|
||||||
|
Cluster =
|
||||||
|
emqx_common_test_helpers:emqx_cluster(
|
||||||
|
[core, core],
|
||||||
|
#{
|
||||||
|
apps => [emqx_conf, emqx_plugins],
|
||||||
|
env => [
|
||||||
|
{emqx, init_config_load_done, false},
|
||||||
|
{emqx, boot_modules, []}
|
||||||
|
],
|
||||||
|
env_handler => fun
|
||||||
|
(emqx_plugins) ->
|
||||||
|
ok = emqx_plugins:put_config(install_dir, ToInstallDir),
|
||||||
|
%% this is to simulate an user setting the state
|
||||||
|
%% via environment variables before starting the node
|
||||||
|
ok = emqx_plugins:put_config(
|
||||||
|
states,
|
||||||
|
[#{name_vsn => NameVsn, enable => true}]
|
||||||
|
),
|
||||||
|
ok;
|
||||||
|
(_) ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
priv_data_dir => PrivDataDir,
|
||||||
|
schema_mod => emqx_conf_schema,
|
||||||
|
peer_mod => slave,
|
||||||
|
load_schema => true
|
||||||
|
}
|
||||||
|
),
|
||||||
|
Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster],
|
||||||
|
[
|
||||||
|
{to_install_dir, ToInstallDir},
|
||||||
|
{cluster, Cluster},
|
||||||
|
{nodes, Nodes},
|
||||||
|
{name_vsn, NameVsn},
|
||||||
|
{plugin_name, PluginName}
|
||||||
|
| Config
|
||||||
|
];
|
||||||
|
group_t_cluster_leave({'end', Config}) ->
|
||||||
|
Nodes = proplists:get_value(nodes, Config),
|
||||||
|
[ok = emqx_common_test_helpers:stop_slave(N) || N <- Nodes],
|
||||||
|
ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)),
|
||||||
|
ok;
|
||||||
|
group_t_cluster_leave(Config) ->
|
||||||
|
[N1, N2] = ?config(nodes, Config),
|
||||||
|
NameVsn = proplists:get_value(name_vsn, Config),
|
||||||
|
ok = erpc:call(N1, emqx_plugins, ensure_installed, [NameVsn]),
|
||||||
|
ok = erpc:call(N1, emqx_plugins, ensure_started, [NameVsn]),
|
||||||
|
ok = erpc:call(N1, emqx_plugins, ensure_enabled, [NameVsn]),
|
||||||
|
Params = unused,
|
||||||
|
%% 2 nodes running
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{running_status := [#{status := running}, #{status := running}]}]},
|
||||||
|
erpc:call(N1, emqx_mgmt_api_plugins, list_plugins, [get, Params])
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{running_status := [#{status := running}, #{status := running}]}]},
|
||||||
|
erpc:call(N2, emqx_mgmt_api_plugins, list_plugins, [get, Params])
|
||||||
|
),
|
||||||
|
|
||||||
|
%% Now, one node leaves the cluster.
|
||||||
|
ok = erpc:call(N2, ekka, leave, []),
|
||||||
|
|
||||||
|
%% Each node will no longer ask the plugin status to the other.
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{running_status := [#{node := N1, status := running}]}]},
|
||||||
|
erpc:call(N1, emqx_mgmt_api_plugins, list_plugins, [get, Params])
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{200, [#{running_status := [#{node := N2, status := running}]}]},
|
||||||
|
erpc:call(N2, emqx_mgmt_api_plugins, list_plugins, [get, Params])
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
make_tar(Cwd, NameWithVsn) ->
|
make_tar(Cwd, NameWithVsn) ->
|
||||||
make_tar(Cwd, NameWithVsn, NameWithVsn).
|
make_tar(Cwd, NameWithVsn, NameWithVsn).
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed an issue where a node that left the cluster would still report plugin status from other nodes.
|
Loading…
Reference in New Issue