From bde4215a1144d3aa368fb6912e740220df25ed59 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 7 Mar 2023 17:04:01 +0200 Subject: [PATCH] fix: copy plugins to a new node joining a cluster Closes: EMQX-8889 --- apps/emqx/priv/bpapi.versions | 1 + .../src/emqx_mgmt_api_plugins.erl | 4 +- apps/emqx_plugins/src/emqx_plugins.app.src | 2 +- apps/emqx_plugins/src/emqx_plugins.erl | 61 +++++++- .../src/proto/emqx_plugins_proto_v1.erl | 35 +++++ apps/emqx_plugins/test/emqx_plugins_SUITE.erl | 144 ++++++++++++++++-- changes/ce/fix-10117.en.md | 2 + 7 files changed, 233 insertions(+), 16 deletions(-) create mode 100644 apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl create mode 100644 changes/ce/fix-10117.en.md diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 769145722..5f0ad5d30 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -25,6 +25,7 @@ {emqx_mgmt_trace,2}. {emqx_persistent_session,1}. {emqx_plugin_libs,1}. +{emqx_plugins,1}. {emqx_prometheus,1}. {emqx_resource,1}. {emqx_retainer,1}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index d1acfee6a..4930e587c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -323,8 +323,6 @@ get_plugins() -> upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -> [{FileName, Bin}] = maps:to_list(maps:without([type], Plugin)), %% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall - %% TODO what happens when a new node join in? - %% emqx_plugins_monitor should copy plugins from other core node when boot-up. NameVsn = string:trim(FileName, trailing, ".tar.gz"), case emqx_plugins:describe(NameVsn) of {error, #{error := "bad_info_file", return := {enoent, _}}} -> @@ -456,8 +454,8 @@ delete_package(Name) -> %% for RPC plugin update ensure_action(Name, start) -> - _ = emqx_plugins:ensure_enabled(Name), _ = emqx_plugins:ensure_started(Name), + _ = emqx_plugins:ensure_enabled(Name), ok; ensure_action(Name, stop) -> _ = emqx_plugins:ensure_stopped(Name), diff --git a/apps/emqx_plugins/src/emqx_plugins.app.src b/apps/emqx_plugins/src/emqx_plugins.app.src index ed893c80d..c0372c003 100644 --- a/apps/emqx_plugins/src/emqx_plugins.app.src +++ b/apps/emqx_plugins/src/emqx_plugins.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_plugins, [ {description, "EMQX Plugin Management"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {modules, []}, {mod, {emqx_plugins_app, []}}, {applications, [kernel, stdlib, emqx]}, diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 8993404d4..264247086 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -47,7 +47,8 @@ -export([ get_config/2, - put_config/2 + put_config/2, + get_tar/1 ]). %% internal @@ -113,6 +114,33 @@ do_ensure_installed(NameVsn) -> }} end. +-spec get_tar(name_vsn()) -> {ok, binary()} | {error, any}. +get_tar(NameVsn) -> + TarGz = pkg_file(NameVsn), + case file:read_file(TarGz) of + {ok, Content} -> + {ok, Content}; + {error, _} -> + case maybe_create_tar(NameVsn, TarGz, install_dir()) of + ok -> + file:read_file(TarGz); + Err -> + Err + end + end. + +maybe_create_tar(NameVsn, TarGzName, InstallDir) when is_binary(InstallDir) -> + maybe_create_tar(NameVsn, TarGzName, binary_to_list(InstallDir)); +maybe_create_tar(NameVsn, TarGzName, InstallDir) -> + case filelib:wildcard(filename:join(dir(NameVsn), "**")) of + [_ | _] = PluginFiles -> + InstallDir1 = string:trim(InstallDir, trailing, "/") ++ "/", + PluginFiles1 = [{string:prefix(F, InstallDir1), F} || F <- PluginFiles], + erl_tar:create(TarGzName, PluginFiles1, [compressed]); + _ -> + {error, plugin_not_found} + end. + write_tar_file_content(BaseDir, TarContent) -> lists:foreach( fun({Name, Bin}) -> @@ -393,6 +421,7 @@ do_ensure_started(NameVsn) -> tryit( "start_plugins", fun() -> + ok = ensure_exists_and_installed(NameVsn), Plugin = do_read_plugin(NameVsn), ok = load_code_start_apps(NameVsn, Plugin) end @@ -446,6 +475,36 @@ do_read_plugin({file, InfoFile}, Options) -> do_read_plugin(NameVsn, Options) -> do_read_plugin({file, info_file(NameVsn)}, Options). +ensure_exists_and_installed(NameVsn) -> + case filelib:is_dir(dir(NameVsn)) of + true -> + ok; + _ -> + Nodes = [N || N <- mria:running_nodes(), N /= node()], + case get_from_any_node(Nodes, NameVsn, []) of + {ok, TarContent} -> + ok = file:write_file(pkg_file(NameVsn), TarContent), + ok = do_ensure_installed(NameVsn); + {error, NodeErrors} -> + ?SLOG(error, #{ + msg => "failed_to_copy_plugin_from_other_nodes", + name_vsn => NameVsn, + node_errors => NodeErrors + }), + {error, plugin_not_found} + end + end. + +get_from_any_node([], _NameVsn, Errors) -> + {error, Errors}; +get_from_any_node([Node | T], NameVsn, Errors) -> + case emqx_plugins_proto_v1:get_tar(Node, NameVsn, infinity) of + {ok, _} = Res -> + Res; + Err -> + get_from_any_node(T, NameVsn, [{Node, Err} | Errors]) + end. + plugins_readme(NameVsn, #{fill_readme := true}, Info) -> case file:read_file(readme_file(NameVsn)) of {ok, Bin} -> Info#{readme => Bin}; diff --git a/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl new file mode 100644 index 000000000..e1cd42c7b --- /dev/null +++ b/apps/emqx_plugins/src/proto/emqx_plugins_proto_v1.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugins_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + get_tar/3 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-type name_vsn() :: binary() | string(). + +introduced_in() -> + "5.0.21". + +-spec get_tar(node(), name_vsn(), timeout()) -> {ok, binary()} | {error, any}. +get_tar(Node, NameVsn, Timeout) -> + rpc:call(Node, emqx_plugins, get_tar, [NameVsn], Timeout). diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index 9c410a1ee..f91233132 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -36,10 +36,30 @@ -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_TAG, "0.1.0-2"). -define(PACKAGE_SUFFIX, ".tar.gz"). -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + [ + {group, copy_plugin}, + {group, create_tar_copy_plugin}, + emqx_common_test_helpers:all(?MODULE) + ]. + +groups() -> + [ + {copy_plugin, [sequence], [group_t_copy_plugin_to_a_new_node]}, + {create_tar_copy_plugin, [sequence], [group_t_copy_plugin_to_a_new_node]} + ]. + +init_per_group(copy_plugin, Config) -> + Config; +init_per_group(create_tar_copy_plugin, Config) -> + [{remove_tar, true} | Config]. + +end_per_group(_Group, _Config) -> + ok. init_per_suite(Config) -> WorkDir = proplists:get_value(data_dir, Config), + filelib:ensure_path(WorkDir), OrigInstallDir = emqx_plugins:get_config(install_dir, undefined), emqx_common_test_helpers:start_apps([emqx_conf]), emqx_plugins:put_config(install_dir, WorkDir), @@ -71,15 +91,7 @@ end_per_testcase(TestCase, Config) -> ?MODULE:TestCase({'end', Config}). get_demo_plugin_package() -> - get_demo_plugin_package( - #{ - release_name => ?EMQX_PLUGIN_TEMPLATE_RELEASE_NAME, - git_url => ?EMQX_PLUGIN_TEMPLATE_URL, - vsn => ?EMQX_PLUGIN_TEMPLATE_VSN, - tag => ?EMQX_PLUGIN_TEMPLATE_TAG, - shdir => emqx_plugins:install_dir() - } - ). + get_demo_plugin_package(emqx_plugins:install_dir()). get_demo_plugin_package( #{ @@ -98,7 +110,17 @@ get_demo_plugin_package( TargetName ]), ok = file:write_file(Pkg, PluginBin), - Opts#{package => Pkg}. + Opts#{package => Pkg}; +get_demo_plugin_package(Dir) -> + get_demo_plugin_package( + #{ + release_name => ?EMQX_PLUGIN_TEMPLATE_RELEASE_NAME, + git_url => ?EMQX_PLUGIN_TEMPLATE_URL, + vsn => ?EMQX_PLUGIN_TEMPLATE_VSN, + tag => ?EMQX_PLUGIN_TEMPLATE_TAG, + shdir => Dir + } + ). bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(L) when is_list(L) -> unicode:characters_to_binary(L, utf8); @@ -479,6 +501,106 @@ t_elixir_plugin(Config) -> ?assertEqual([], emqx_plugins:list()), ok. +group_t_copy_plugin_to_a_new_node({init, Config}) -> + WorkDir = proplists:get_value(data_dir, Config), + FromInstallDir = filename:join(WorkDir, atom_to_list(plugins_copy_from)), + file:del_dir_r(FromInstallDir), + ok = filelib:ensure_path(FromInstallDir), + ToInstallDir = filename:join(WorkDir, atom_to_list(plugins_copy_to)), + file:del_dir_r(ToInstallDir), + ok = filelib:ensure_path(ToInstallDir), + #{package := Package, release_name := PluginName} = get_demo_plugin_package(FromInstallDir), + [{CopyFrom, CopyFromOpts}, {CopyTo, CopyToOpts}] = + emqx_common_test_helpers:emqx_cluster( + [ + {core, plugins_copy_from}, + {core, plugins_copy_to} + ], + #{ + apps => [emqx_conf, emqx_plugins], + env => [ + {emqx, init_config_load_done, false}, + {emqx, boot_modules, []} + ], + load_schema => false + } + ), + CopyFromNode = emqx_common_test_helpers:start_slave( + CopyFrom, maps:remove(join_to, CopyFromOpts) + ), + ok = rpc:call(CopyFromNode, emqx_plugins, put_config, [install_dir, FromInstallDir]), + CopyToNode = emqx_common_test_helpers:start_slave(CopyTo, maps:remove(join_to, CopyToOpts)), + ok = rpc:call(CopyToNode, emqx_plugins, put_config, [install_dir, ToInstallDir]), + NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), + ok = rpc:call(CopyFromNode, emqx_plugins, ensure_installed, [NameVsn]), + ok = rpc:call(CopyFromNode, emqx_plugins, ensure_started, [NameVsn]), + ok = rpc:call(CopyFromNode, emqx_plugins, ensure_enabled, [NameVsn]), + case proplists:get_bool(remove_tar, Config) of + true -> + %% Test the case when a plugin is installed, but its original tar file is removed + %% and must be re-created + ok = file:delete(filename:join(FromInstallDir, NameVsn ++ ?PACKAGE_SUFFIX)); + false -> + ok + end, + [ + {from_install_dir, FromInstallDir}, + {to_install_dir, ToInstallDir}, + {copy_from_node, CopyFromNode}, + {copy_to_node, CopyToNode}, + {name_vsn, NameVsn}, + {plugin_name, PluginName} + | Config + ]; +group_t_copy_plugin_to_a_new_node({'end', Config}) -> + CopyFromNode = proplists:get_value(copy_from_node, Config), + CopyToNode = proplists:get_value(copy_to_node, Config), + ok = rpc:call(CopyFromNode, emqx_config, delete_override_conf_files, []), + ok = rpc:call(CopyToNode, emqx_config, delete_override_conf_files, []), + rpc:call(CopyToNode, ekka, leave, []), + rpc:call(CopyFromNode, ekka, leave, []), + {ok, _} = emqx_common_test_helpers:stop_slave(CopyToNode), + {ok, _} = emqx_common_test_helpers:stop_slave(CopyFromNode), + ok = file:del_dir_r(proplists:get_value(to_install_dir, Config)), + ok = file:del_dir_r(proplists:get_value(from_install_dir, Config)); +group_t_copy_plugin_to_a_new_node(Config) -> + CopyFromNode = proplists:get_value(copy_from_node, Config), + CopyToNode = proplists:get_value(copy_to_node, Config), + CopyToDir = proplists:get_value(to_install_dir, Config), + CopyFromPluginsState = rpc:call(CopyFromNode, emqx_plugins, get_config, [[states], []]), + NameVsn = proplists:get_value(name_vsn, Config), + PluginName = proplists:get_value(plugin_name, Config), + PluginApp = list_to_atom(PluginName), + ?assertMatch([#{enable := true, name_vsn := NameVsn}], CopyFromPluginsState), + ?assert( + proplists:is_defined( + PluginApp, + rpc:call(CopyFromNode, application, which_applications, []) + ) + ), + ?assertEqual([], filelib:wildcard(filename:join(CopyToDir, "**"))), + %% Check that a new node doesn't have this plugin before it joins the cluster + ?assertEqual([], rpc:call(CopyToNode, emqx_conf, get, [[plugins, states], []])), + ?assertMatch({error, _}, rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn])), + ?assertNot( + proplists:is_defined( + PluginApp, + rpc:call(CopyToNode, application, which_applications, []) + ) + ), + ok = rpc:call(CopyToNode, ekka, join, [CopyFromNode]), + %% Mimic cluster-override conf copying + ok = rpc:call(CopyToNode, emqx_plugins, put_config, [[states], CopyFromPluginsState]), + %% Plugin copying is triggered upon app restart on a new node. + %% This is similar to emqx_conf, which copies cluster-override conf upon start, + %% see: emqx_conf_app:init_conf/0 + ok = rpc:call(CopyToNode, application, stop, [emqx_plugins]), + {ok, _} = rpc:call(CopyToNode, application, ensure_all_started, [emqx_plugins]), + ?assertMatch( + {ok, #{running_status := running, config_status := enabled}}, + rpc:call(CopyToNode, emqx_plugins, describe, [NameVsn]) + ). + make_tar(Cwd, NameWithVsn) -> make_tar(Cwd, NameWithVsn, NameWithVsn). diff --git a/changes/ce/fix-10117.en.md b/changes/ce/fix-10117.en.md new file mode 100644 index 000000000..711d739ca --- /dev/null +++ b/changes/ce/fix-10117.en.md @@ -0,0 +1,2 @@ +Fix an error occurring when a joining node doesn't have plugins that are installed on other nodes in the cluster. +After this change, the joining node will copy all the necessary plugins from other nodes.