From 5f526d548a79a3ec11843c954094ac665572d2a2 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Mon, 15 May 2023 10:37:15 +0300 Subject: [PATCH 1/4] style: use double percent (%%) comments --- apps/emqx/src/emqx_ssl_crl_cache.erl | 8 ++++---- .../test/emqx_bridge_rabbitmq_SUITE.erl | 2 +- .../src/emqx_bridge_rocketmq_connector.erl | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_ssl_crl_cache.erl b/apps/emqx/src/emqx_ssl_crl_cache.erl index 13eccbd83..94d5e0697 100644 --- a/apps/emqx/src/emqx_ssl_crl_cache.erl +++ b/apps/emqx/src/emqx_ssl_crl_cache.erl @@ -33,11 +33,11 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%---------------------------------------------------------------------- -% Based on `otp/lib/ssl/src/ssl_crl_cache.erl' -%---------------------------------------------------------------------- +%%---------------------------------------------------------------------- +%% Based on `otp/lib/ssl/src/ssl_crl_cache.erl' +%%---------------------------------------------------------------------- -%---------------------------------------------------------------------- +%%---------------------------------------------------------------------- %% Purpose: Simple default CRL cache %%---------------------------------------------------------------------- diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl index 45a8693e6..e6a6c03fb 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_SUITE.erl @@ -1,4 +1,4 @@ -%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index a7d01960e..b8034db40 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -1,4 +1,4 @@ -%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- From 33fa053b9b8e61c5efcb8679fd71c1d621a0f227 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Wed, 31 May 2023 20:20:54 +0300 Subject: [PATCH 2/4] test(emqx_common_test_helpers): keep EMQX ENV vars on a child node for its lifetime --- apps/emqx/test/emqx_common_test_helpers.erl | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index e545bb624..df69d4a13 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -859,6 +859,12 @@ setup_node(Node, Opts) when is_map(Opts) -> %% Setting env before starting any applications set_envs(Node, Env), + NodeDataDir = filename:join([ + PrivDataDir, + node(), + integer_to_list(erlang:unique_integer()) + ]), + %% Here we start the apps EnvHandlerForRpc = fun(App) -> @@ -870,17 +876,10 @@ setup_node(Node, Opts) when is_map(Opts) -> %% to avoid sharing data between executions and/or %% nodes. these variables might not be in the %% config file (e.g.: emqx_enterprise_schema). - NodeDataDir = filename:join([ - PrivDataDir, - node(), - integer_to_list(erlang:unique_integer()) - ]), Cookie = atom_to_list(erlang:get_cookie()), os:putenv("EMQX_NODE__DATA_DIR", NodeDataDir), os:putenv("EMQX_NODE__COOKIE", Cookie), emqx_config:init_load(SchemaMod), - os:unsetenv("EMQX_NODE__DATA_DIR"), - os:unsetenv("EMQX_NODE__COOKIE"), application:set_env(emqx, init_config_load_done, true) end, From 53ec8326b0a704fe62e435f34806047eaf055810 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Thu, 8 Jun 2023 20:21:54 +0300 Subject: [PATCH 3/4] fix(emqx_listeners): convert authentication certs --- apps/emqx/src/emqx_listeners.erl | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index f560c9ce9..79a438a54 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -843,7 +843,9 @@ convert_certs(ListenerConf) -> Listeners1 = maps:fold( fun(Name, Conf, Acc1) -> - Acc1#{Name => convert_certs(Type, Name, Conf)} + Conf1 = convert_certs(Type, Name, Conf), + Conf2 = convert_authn_certs(Type, Name, Conf1), + Acc1#{Name => Conf2} end, #{}, Listeners0 @@ -866,6 +868,19 @@ convert_certs(Type, Name, Conf) -> throw({bad_ssl_config, Reason}) end. +convert_authn_certs(Type, Name, #{<<"authentication">> := AuthNList} = Conf) -> + ChainName = listener_id(Type, Name), + AuthNList1 = lists:map( + fun(AuthN) -> + CertsDir = emqx_authentication_config:certs_dir(ChainName, AuthN), + emqx_authentication_config:convert_certs(CertsDir, AuthN) + end, + AuthNList + ), + Conf#{<<"authentication">> => AuthNList1}; +convert_authn_certs(_Type, _Name, Conf) -> + Conf. + filter_stacktrace({Reason, _Stacktrace}) -> Reason; filter_stacktrace(Reason) -> Reason. From e4d09d4ad412cb3f9849f092a15ca2d017e766a1 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 9 May 2023 20:44:30 +0300 Subject: [PATCH 4/4] feat: implement configuration and user data export/import CLI Closes: EMQX-9203 --- apps/emqx/src/bhvrs/emqx_config_backup.erl | 24 + apps/emqx/src/bhvrs/emqx_db_backup.erl | 19 + apps/emqx/src/emqx_banned.erl | 8 + apps/emqx_authn/src/emqx_authn.erl | 36 + .../emqx_enhanced_authn_scram_mnesia.erl | 12 + .../src/simple_authn/emqx_authn_mnesia.erl | 12 +- apps/emqx_authz/src/emqx_authz.erl | 66 +- apps/emqx_authz/src/emqx_authz_mnesia.erl | 9 + .../src/emqx_auto_subscribe.erl | 38 +- apps/emqx_bridge/src/emqx_bridge.erl | 96 ++- .../src/emqx_dashboard_admin.erl | 10 + apps/emqx_exhook/src/emqx_exhook_mgr.erl | 43 +- apps/emqx_gateway/src/emqx_gateway_conf.erl | 39 +- .../src/emqx_mgmt_api_api_keys.erl | 19 +- .../src/emqx_mgmt_api_listeners.erl | 44 +- apps/emqx_management/src/emqx_mgmt_auth.erl | 24 +- apps/emqx_management/src/emqx_mgmt_cli.erl | 35 +- .../src/emqx_mgmt_data_backup.erl | 690 ++++++++++++++++++ .../src/emqx_mgmt_listeners_conf.erl | 96 +++ .../test/emqx_mgmt_data_backup_SUITE.erl | 519 +++++++++++++ .../emqx-export-test-bootstrap-ce.tar.gz | Bin 0 -> 11822 bytes apps/emqx_modules/src/emqx_delayed.erl | 24 +- apps/emqx_modules/src/emqx_rewrite.erl | 18 +- apps/emqx_psk/src/emqx_psk.app.src | 2 +- apps/emqx_psk/src/emqx_psk.erl | 36 +- apps/emqx_retainer/src/emqx_retainer.erl | 20 +- .../emqx_rule_engine/src/emqx_rule_engine.erl | 46 +- apps/emqx_utils/src/emqx_utils.erl | 39 +- changes/ce/feat-10676.en.md | 4 + .../src/emqx_ee_schema_registry.erl | 53 +- .../src/emqx_ee_schema_registry_app.erl | 4 + .../test/emqx_ee_schema_registry_SUITE.erl | 31 + 32 files changed, 1990 insertions(+), 126 deletions(-) create mode 100644 apps/emqx/src/bhvrs/emqx_config_backup.erl create mode 100644 apps/emqx/src/bhvrs/emqx_db_backup.erl create mode 100644 apps/emqx_management/src/emqx_mgmt_data_backup.erl create mode 100644 apps/emqx_management/src/emqx_mgmt_listeners_conf.erl create mode 100644 apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl create mode 100644 apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-test-bootstrap-ce.tar.gz create mode 100644 changes/ce/feat-10676.en.md diff --git a/apps/emqx/src/bhvrs/emqx_config_backup.erl b/apps/emqx/src/bhvrs/emqx_config_backup.erl new file mode 100644 index 000000000..604fef106 --- /dev/null +++ b/apps/emqx/src/bhvrs/emqx_config_backup.erl @@ -0,0 +1,24 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_config_backup). + +-callback import_config(RawConf :: map()) -> + {ok, #{ + root_key => emqx_utils_maps:config_key(), + changed => [emqx_utils_maps:config_path()] + }} + | {error, #{root_key => emqx_utils_maps:config_key(), reason => term()}}. diff --git a/apps/emqx/src/bhvrs/emqx_db_backup.erl b/apps/emqx/src/bhvrs/emqx_db_backup.erl new file mode 100644 index 000000000..fddbdb1d0 --- /dev/null +++ b/apps/emqx/src/bhvrs/emqx_db_backup.erl @@ -0,0 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_db_backup). + +-callback backup_tables() -> [mria:table()]. diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index a0ccd93d7..a5c46da19 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -17,6 +17,7 @@ -module(emqx_banned). -behaviour(gen_server). +-behaviour(emqx_db_backup). -include("emqx.hrl"). -include("logger.hrl"). @@ -50,6 +51,8 @@ code_change/3 ]). +-export([backup_tables/0]). + %% Internal exports (RPC) -export([ expire_banned_items/1 @@ -82,6 +85,11 @@ mnesia(boot) -> {storage_properties, [{ets, [{read_concurrency, true}]}]} ]). +%%-------------------------------------------------------------------- +%% Data backup +%%-------------------------------------------------------------------- +backup_tables() -> [?BANNED_TAB]. + %% @doc Start the banned server. -spec start_link() -> startlink_ret(). start_link() -> diff --git a/apps/emqx_authn/src/emqx_authn.erl b/apps/emqx_authn/src/emqx_authn.erl index 515c3bfd6..2a8d82439 100644 --- a/apps/emqx_authn/src/emqx_authn.erl +++ b/apps/emqx_authn/src/emqx_authn.erl @@ -16,6 +16,8 @@ -module(emqx_authn). +-behaviour(emqx_config_backup). + -export([ providers/0, check_config/1, @@ -24,6 +26,11 @@ get_enabled_authns/0 ]). +%% Data backup +-export([ + import_config/1 +]). + -include("emqx_authn.hrl"). providers() -> @@ -126,3 +133,32 @@ get_enabled_authns() -> tally_authenticators(#{id := AuthenticatorName}, Acc) -> maps:update_with(AuthenticatorName, fun(N) -> N + 1 end, 1, Acc). + +%%------------------------------------------------------------------------------ +%% Data backup +%%------------------------------------------------------------------------------ + +-define(IMPORT_OPTS, #{override_to => cluster}). + +import_config(RawConf) -> + AuthnList = authn_list(maps:get(?CONF_NS_BINARY, RawConf, [])), + OldAuthnList = emqx:get_raw_config([?CONF_NS_BINARY], []), + MergedAuthnList = emqx_utils:merge_lists( + OldAuthnList, AuthnList, fun emqx_authentication:authenticator_id/1 + ), + case emqx_conf:update([?CONF_NS_ATOM], MergedAuthnList, ?IMPORT_OPTS) of + {ok, #{raw_config := NewRawConf}} -> + {ok, #{root_key => ?CONF_NS_ATOM, changed => changed_paths(OldAuthnList, NewRawConf)}}; + Error -> + {error, #{root_key => ?CONF_NS_ATOM, reason => Error}} + end. + +changed_paths(OldAuthnList, NewAuthnList) -> + KeyFun = fun emqx_authentication:authenticator_id/1, + Changed = maps:get(changed, emqx_utils:diff_lists(NewAuthnList, OldAuthnList, KeyFun)), + [[?CONF_NS_BINARY, emqx_authentication:authenticator_id(OldAuthn)] || {OldAuthn, _} <- Changed]. + +authn_list(Authn) when is_list(Authn) -> + Authn; +authn_list(Authn) when is_map(Authn) -> + [Authn]. diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index b11b89081..158112747 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -22,6 +22,7 @@ -behaviour(hocon_schema). -behaviour(emqx_authentication). +-behaviour(emqx_db_backup). -export([ namespace/0, @@ -54,6 +55,8 @@ group_match_spec/1 ]). +-export([backup_tables/0]). + %% Internal exports (RPC) -export([ do_destroy/1, @@ -101,6 +104,12 @@ mnesia(boot) -> {storage_properties, [{ets, [{read_concurrency, true}]}]} ]). +%%------------------------------------------------------------------------------ +%% Data backup +%%------------------------------------------------------------------------------ + +backup_tables() -> [?TAB]. + %%------------------------------------------------------------------------------ %% Hocon Schema %%------------------------------------------------------------------------------ @@ -357,6 +366,9 @@ check_client_final_message(Bin, #{is_superuser := IsSuperuser} = Cache, #{algori add_user(UserGroup, UserID, Password, IsSuperuser, State) -> {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(Password, State), + write_user(UserGroup, UserID, StoredKey, ServerKey, Salt, IsSuperuser). + +write_user(UserGroup, UserID, StoredKey, ServerKey, Salt, IsSuperuser) -> UserInfo = #user_info{ user_id = {UserGroup, UserID}, stored_key = StoredKey, diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index d57e9e00e..bf0b04d04 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -23,6 +23,7 @@ -behaviour(hocon_schema). -behaviour(emqx_authentication). +-behaviour(emqx_db_backup). -export([ namespace/0, @@ -66,6 +67,10 @@ import_csv/3 ]). +-export([mnesia/1]). + +-export([backup_tables/0]). + -type user_group() :: binary(). -type user_id() :: binary(). @@ -76,8 +81,6 @@ is_superuser :: boolean() }). --export([mnesia/1]). - -boot_mnesia({mnesia, [boot]}). -define(TAB, ?MODULE). @@ -103,6 +106,11 @@ mnesia(boot) -> {storage_properties, [{ets, [{read_concurrency, true}]}]} ]). +%%------------------------------------------------------------------------------ +%% Data backup +%%------------------------------------------------------------------------------ +backup_tables() -> [?TAB]. + %%------------------------------------------------------------------------------ %% Hocon Schema %%------------------------------------------------------------------------------ diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 278b70d6d..3c9698de0 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -15,7 +15,9 @@ %%-------------------------------------------------------------------- -module(emqx_authz). + -behaviour(emqx_config_handler). +-behaviour(emqx_config_backup). -include("emqx_authz.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -44,6 +46,13 @@ -export([acl_conf_file/0]). +%% Data backup +-export([ + import_config/1, + maybe_read_acl_file/1, + maybe_write_acl_file/1 +]). + -type source() :: map(). -type match_result() :: {matched, allow} | {matched, deny} | nomatch. @@ -326,9 +335,9 @@ init_metrics(Source) -> ) end. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% AuthZ callbacks -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% @doc Check AuthZ -spec authorize( @@ -451,9 +460,58 @@ do_authorize( get_enabled_authzs() -> lists:usort([Type || #{type := Type, enable := true} <- lookup()]). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ +%% Data backup +%%------------------------------------------------------------------------------ + +import_config(#{?CONF_NS_BINARY := AuthzConf}) -> + Sources = maps:get(<<"sources">>, AuthzConf, []), + OldSources = emqx:get_raw_config(?CONF_KEY_PATH, []), + MergedSources = emqx_utils:merge_lists(OldSources, Sources, fun type/1), + MergedAuthzConf = AuthzConf#{<<"sources">> => MergedSources}, + case emqx_conf:update([?CONF_NS_ATOM], MergedAuthzConf, #{override_to => cluster}) of + {ok, #{raw_config := #{<<"sources">> := NewSources}}} -> + {ok, #{ + root_key => ?CONF_NS_ATOM, + changed => changed_paths(OldSources, NewSources) + }}; + Error -> + {error, #{root_key => ?CONF_NS_ATOM, reason => Error}} + end; +import_config(_RawConf) -> + {ok, #{root_key => ?CONF_NS_ATOM, changed => []}}. + +changed_paths(OldSources, NewSources) -> + Changed = maps:get(changed, emqx_utils:diff_lists(NewSources, OldSources, fun type/1)), + [?CONF_KEY_PATH ++ [type(OldSource)] || {OldSource, _} <- Changed]. + +maybe_read_acl_file(RawConf) -> + maybe_convert_acl_file(RawConf, fun read_acl_file/1). + +maybe_write_acl_file(RawConf) -> + maybe_convert_acl_file(RawConf, fun write_acl_file/1). + +maybe_convert_acl_file( + #{?CONF_NS_BINARY := #{<<"sources">> := Sources} = AuthRawConf} = RawConf, Fun +) -> + Sources1 = lists:map( + fun + (#{<<"type">> := <<"file">>} = FileSource) -> Fun(FileSource); + (Source) -> Source + end, + Sources + ), + RawConf#{?CONF_NS_BINARY => AuthRawConf#{<<"sources">> => Sources1}}; +maybe_convert_acl_file(RawConf, _Fun) -> + RawConf. + +read_acl_file(#{<<"path">> := Path} = Source) -> + {ok, Rules} = emqx_authz_file:read_file(Path), + maps:remove(<<"path">>, Source#{<<"rules">> => Rules}). + +%%------------------------------------------------------------------------------ %% Internal function -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ client_info_source() -> emqx_authz_client_info:create( diff --git a/apps/emqx_authz/src/emqx_authz_mnesia.erl b/apps/emqx_authz/src/emqx_authz_mnesia.erl index 58df08653..bdb4877c0 100644 --- a/apps/emqx_authz/src/emqx_authz_mnesia.erl +++ b/apps/emqx_authz/src/emqx_authz_mnesia.erl @@ -42,6 +42,7 @@ }). -behaviour(emqx_authz). +-behaviour(emqx_db_backup). %% AuthZ Callbacks -export([ @@ -65,6 +66,8 @@ record_count/0 ]). +-export([backup_tables/0]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -119,6 +122,12 @@ authorize( end, do_authorize(Client, PubSub, Topic, Rules). +%%-------------------------------------------------------------------- +%% Data backup +%%-------------------------------------------------------------------- + +backup_tables() -> [?ACL_TABLE]. + %%-------------------------------------------------------------------- %% Management API %%-------------------------------------------------------------------- diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl index 32892992a..f8ecde84b 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl @@ -16,6 +16,8 @@ -module(emqx_auto_subscribe). +-behaviour(emqx_config_backup). + -include_lib("emqx/include/emqx_hooks.hrl"). -behaviour(emqx_config_handler). @@ -24,7 +26,6 @@ -define(MAX_AUTO_SUBSCRIBE, 20). -% -export([load/0, unload/0]). -export([ @@ -40,6 +41,11 @@ %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). +%% Data backup +-export([ + import_config/1 +]). + load() -> ok = emqx_conf:add_handler([auto_subscribe, topics], ?MODULE), update_hook(). @@ -73,8 +79,9 @@ post_config_update(_KeyPath, _Req, NewTopics, _OldConf, _AppEnvs) -> Config = emqx_conf:get([auto_subscribe], #{}), update_hook(Config#{topics => NewTopics}). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% hook +%%------------------------------------------------------------------------------ on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) -> case erlang:apply(TopicHandler, handle, [ClientInfo, ConnInfo, Options]) of @@ -87,17 +94,38 @@ on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) -> on_client_connected(_, _, _) -> ok. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Telemetry -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec get_basic_usage_info() -> #{auto_subscribe_count => non_neg_integer()}. get_basic_usage_info() -> AutoSubscribe = emqx_conf:get([auto_subscribe, topics], []), #{auto_subscribe_count => length(AutoSubscribe)}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ +%% Data backup +%%------------------------------------------------------------------------------ + +import_config(#{<<"auto_subscribe">> := #{<<"topics">> := Topics}}) -> + ConfPath = [auto_subscribe, topics], + OldTopics = emqx:get_raw_config(ConfPath, []), + KeyFun = fun(#{<<"topic">> := T}) -> T end, + MergedTopics = emqx_utils:merge_lists(OldTopics, Topics, KeyFun), + case emqx_conf:update(ConfPath, MergedTopics, #{override_to => cluster}) of + {ok, #{raw_config := NewTopics}} -> + Changed = maps:get(changed, emqx_utils:diff_lists(NewTopics, OldTopics, KeyFun)), + Changed1 = [ConfPath ++ [T] || {#{<<"topic">> := T}, _} <- Changed], + {ok, #{root_key => auto_subscribe, changed => Changed1}}; + Error -> + {error, #{root_key => auto_subscribe, reason => Error}} + end; +import_config(_RawConf) -> + {ok, #{root_key => auto_subscribe, changed => []}}. + +%%------------------------------------------------------------------------------ %% internal +%%------------------------------------------------------------------------------ format(Rules) when is_list(Rules) -> [format(Rule) || Rule <- Rules]; diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index e282c3dd8..9bdc1b3c2 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -14,13 +14,19 @@ %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_bridge). + -behaviour(emqx_config_handler). +-behaviour(emqx_config_backup). + -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --export([post_config_update/5]). +-export([ + pre_config_update/3, + post_config_update/5 +]). -export([ load_hook/0, @@ -53,6 +59,11 @@ %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). +%% Data backup +-export([ + import_config/1 +]). + -define(EGRESS_DIR_BRIDGES(T), T == webhook; T == mysql; @@ -80,8 +91,10 @@ T == iotdb ). +-define(ROOT_KEY, bridges). + load() -> - Bridges = emqx:get_config([bridges], #{}), + Bridges = emqx:get_config([?ROOT_KEY], #{}), lists:foreach( fun({Type, NamedConf}) -> lists:foreach( @@ -98,7 +111,7 @@ load() -> unload() -> unload_hook(), - Bridges = emqx:get_config([bridges], #{}), + Bridges = emqx:get_config([?ROOT_KEY], #{}), lists:foreach( fun({Type, NamedConf}) -> lists:foreach( @@ -139,7 +152,7 @@ reload_hook(Bridges) -> ok = load_hook(Bridges). load_hook() -> - Bridges = emqx:get_config([bridges], #{}), + Bridges = emqx:get_config([?ROOT_KEY], #{}), load_hook(Bridges). load_hook(Bridges) -> @@ -210,7 +223,7 @@ send_message(BridgeId, Message) -> send_message(BridgeType, BridgeName, ResId, Message). send_message(BridgeType, BridgeName, ResId, Message) -> - case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of + case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of not_found -> {error, bridge_not_found}; #{enable := true} = Config -> @@ -231,9 +244,14 @@ query_opts(Config) -> end. config_key_path() -> - [bridges]. + [?ROOT_KEY]. -post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> +pre_config_update([?ROOT_KEY], RawConf, RawConf) -> + {ok, RawConf}; +pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> + {ok, convert_certs(NewConf)}. + +post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf), %% The config update will be failed if any task in `perform_bridge_changes` failed. @@ -351,10 +369,74 @@ check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) -> remove(BridgeType, BridgeName) end. +%%---------------------------------------------------------------------------------------- +%% Data backup +%%---------------------------------------------------------------------------------------- + +import_config(RawConf) -> + RootKeyPath = config_key_path(), + BridgesConf = maps:get(<<"bridges">>, RawConf, #{}), + OldBridgesConf = emqx:get_raw_config(RootKeyPath, #{}), + MergedConf = merge_confs(OldBridgesConf, BridgesConf), + case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of + {ok, #{raw_config := NewRawConf}} -> + {ok, #{root_key => ?ROOT_KEY, changed => changed_paths(OldBridgesConf, NewRawConf)}}; + Error -> + {error, #{root_key => ?ROOT_KEY, reason => Error}} + end. + +merge_confs(OldConf, NewConf) -> + AllTypes = maps:keys(maps:merge(OldConf, NewConf)), + lists:foldr( + fun(Type, Acc) -> + NewBridges = maps:get(Type, NewConf, #{}), + OldBridges = maps:get(Type, OldConf, #{}), + Acc#{Type => maps:merge(OldBridges, NewBridges)} + end, + #{}, + AllTypes + ). + +changed_paths(OldRawConf, NewRawConf) -> + maps:fold( + fun(Type, Bridges, ChangedAcc) -> + OldBridges = maps:get(Type, OldRawConf, #{}), + Changed = maps:get(changed, emqx_utils_maps:diff_maps(Bridges, OldBridges)), + [[?ROOT_KEY, Type, K] || K <- maps:keys(Changed)] ++ ChangedAcc + end, + [], + NewRawConf + ). + %%======================================================================================== %% Helper functions %%======================================================================================== +convert_certs(BridgesConf) -> + maps:map( + fun(Type, Bridges) -> + maps:map( + fun(Name, BridgeConf) -> + Path = filename:join([?ROOT_KEY, Type, Name]), + case emqx_connector_ssl:convert_certs(Path, BridgeConf) of + {error, Reason} -> + ?SLOG(error, #{ + msg => "bad_ssl_config", + type => Type, + name => Name, + reason => Reason + }), + throw({bad_ssl_config, Reason}); + {ok, BridgeConf1} -> + BridgeConf1 + end + end, + Bridges + ) + end, + BridgesConf + ). + perform_bridge_changes(Tasks) -> perform_bridge_changes(Tasks, ok). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl index aaa43d621..e8f95d609 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -24,6 +24,8 @@ -boot_mnesia({mnesia, [boot]}). +-behaviour(emqx_db_backup). + %% Mnesia bootstrap -export([mnesia/1]). @@ -54,6 +56,8 @@ default_username/0 ]). +-export([backup_tables/0]). + -type emqx_admin() :: #?ADMIN{}. -define(BOOTSTRAP_USER_TAG, <<"bootstrap user">>). @@ -76,6 +80,12 @@ mnesia(boot) -> ]} ]). +%%-------------------------------------------------------------------- +%% Data backup +%%-------------------------------------------------------------------- + +backup_tables() -> [?ADMIN]. + %%-------------------------------------------------------------------- %% bootstrap API %%-------------------------------------------------------------------- diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index 80a508f62..6ff5350e2 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -18,6 +18,7 @@ -module(emqx_exhook_mgr). -behaviour(gen_server). +-behaviour(emqx_config_backup). -include("emqx_exhook.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -66,6 +67,11 @@ -export([roots/0]). +%% Data backup +-export([ + import_config/1 +]). + %% Running servers -type state() :: #{servers := servers()}. @@ -98,9 +104,9 @@ -export_type([servers/0, server/0]). -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- %% APIs -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- -spec start_link() -> ignore @@ -137,7 +143,7 @@ call(Req) -> init_ref_counter_table() -> _ = ets:new(?HOOKS_REF_COUNTER, [named_table, public]). -%%===================================================================== +%%======================================================================================== %% Hocon schema roots() -> emqx_exhook_schema:server_config(). @@ -179,9 +185,30 @@ post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) -> Result = call({update_config, UpdateReq, NewConf, OldConf}), {ok, Result}. -%%-------------------------------------------------------------------- +%%======================================================================================== + +%%---------------------------------------------------------------------------------------- +%% Data backup +%%---------------------------------------------------------------------------------------- + +import_config(#{<<"exhook">> := #{<<"servers">> := Servers} = ExHook}) -> + OldServers = emqx:get_raw_config(?SERVERS, []), + KeyFun = fun(#{<<"name">> := Name}) -> Name end, + ExHook1 = ExHook#{<<"servers">> => emqx_utils:merge_lists(OldServers, Servers, KeyFun)}, + case emqx_conf:update(?EXHOOK, ExHook1, #{override_to => cluster}) of + {ok, #{raw_config := #{<<"servers">> := NewRawServers}}} -> + Changed = maps:get(changed, emqx_utils:diff_lists(NewRawServers, OldServers, KeyFun)), + ChangedPaths = [?SERVERS ++ [Name] || {#{<<"name">> := Name}, _} <- Changed], + {ok, #{root_key => ?EXHOOK, changed => ChangedPaths}}; + Error -> + {error, #{root_key => ?EXHOOK, reason => Error}} + end; +import_config(_RawConf) -> + {ok, #{root_key => ?EXHOOK, changed => []}}. + +%%---------------------------------------------------------------------------------------- %% gen_server callbacks -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- init([]) -> process_flag(trap_exit, true), @@ -333,9 +360,9 @@ terminate(Reason, State = #{servers := Servers}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- %% Internal funcs -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- unload_exhooks() -> [ @@ -572,7 +599,7 @@ update_servers(Servers, State) -> set_disable(Server) -> Server#{status := disabled, timer := undefined}. -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- %% Server state persistent save(Name, ServerState) -> Saved = persistent_term:get(?APP, []), diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index ed7f6cf9a..2a64a6914 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -18,6 +18,7 @@ -module(emqx_gateway_conf). -behaviour(emqx_config_handler). +-behaviour(emqx_config_backup). %% Load/Unload -export([ @@ -64,6 +65,11 @@ post_config_update/5 ]). +%% Data backup +-export([ + import_config/1 +]). + -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_authentication.hrl"). -define(AUTHN_BIN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY). @@ -76,9 +82,9 @@ -define(IS_SSL(T), (T == <<"ssl_options">> orelse T == <<"dtls_options">>)). -define(IGNORE_KEYS, [<<"listeners">>, ?AUTHN_BIN]). -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- %% Load/Unload -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- -define(GATEWAY, [gateway]). -spec load() -> ok. @@ -89,7 +95,7 @@ load() -> unload() -> emqx_conf:remove_handler(?GATEWAY). -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- %% APIs -spec load_gateway(atom_or_bin(), map()) -> map_or_err(). @@ -365,9 +371,26 @@ ret_listener_or_err(GwName, {LType, LName}, {ok, #{raw_config := GwConf}}) -> ret_listener_or_err(_, _, Err) -> Err. -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- +%% Data backup +%%---------------------------------------------------------------------------------------- + +import_config(RawConf) -> + GatewayConf = maps:get(<<"gateway">>, RawConf, #{}), + OldGatewayConf = emqx:get_raw_config([<<"gateway">>], #{}), + MergedConf = maps:merge(OldGatewayConf, GatewayConf), + case emqx_conf:update([gateway], MergedConf, #{override_to => cluster}) of + {ok, #{raw_config := NewRawConf}} -> + Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawConf, OldGatewayConf)), + ChangedPaths = [[gateway, GwName] || GwName <- maps:keys(Changed)], + {ok, #{root_key => gateway, changed => ChangedPaths}}; + Error -> + {error, #{root_key => gateway, reason => Error}} + end. + +%%---------------------------------------------------------------------------------------- %% Config Handler -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- -spec pre_config_update( list(atom()), @@ -793,9 +816,9 @@ post_config_update(?GATEWAY, _Req = #{}, NewConfig, OldConfig, _AppEnvs) -> ), ok. -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------------------- +%% Internal funcs +%%---------------------------------------------------------------------------------------- tune_gw_certs(Fun, GwName, Conf) -> apply_to_gateway_basic_confs( diff --git a/apps/emqx_management/src/emqx_mgmt_api_api_keys.erl b/apps/emqx_management/src/emqx_mgmt_api_api_keys.erl index ba21adaa5..432734688 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_api_keys.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_api_keys.erl @@ -183,7 +183,7 @@ delete(Keys, Fields) -> lists:foldl(fun(Key, Acc) -> lists:keydelete(Key, 1, Acc) end, Fields, Keys). api_key(get, _) -> - {200, [format(App) || App <- emqx_mgmt_auth:list()]}; + {200, [emqx_mgmt_auth:format(App) || App <- emqx_mgmt_auth:list()]}; api_key(post, #{body := App}) -> #{ <<"name">> := Name, @@ -194,7 +194,7 @@ api_key(post, #{body := App}) -> Desc = unicode:characters_to_binary(Desc0, unicode), case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of {ok, NewApp} -> - {200, format(NewApp)}; + {200, emqx_mgmt_auth:format(NewApp)}; {error, Reason} -> {400, #{ code => 'BAD_REQUEST', @@ -206,7 +206,7 @@ api_key(post, #{body := App}) -> api_key_by_name(get, #{bindings := #{name := Name}}) -> case emqx_mgmt_auth:read(Name) of - {ok, App} -> {200, format(App)}; + {ok, App} -> {200, emqx_mgmt_auth:format(App)}; {error, not_found} -> {404, ?NOT_FOUND_RESPONSE} end; api_key_by_name(delete, #{bindings := #{name := Name}}) -> @@ -219,20 +219,9 @@ api_key_by_name(put, #{bindings := #{name := Name}, body := Body}) -> ExpiredAt = ensure_expired_at(Body), Desc = maps:get(<<"desc">>, Body, undefined), case emqx_mgmt_auth:update(Name, Enable, ExpiredAt, Desc) of - {ok, App} -> {200, format(App)}; + {ok, App} -> {200, emqx_mgmt_auth:format(App)}; {error, not_found} -> {404, ?NOT_FOUND_RESPONSE} end. -format(App = #{expired_at := ExpiredAt0, created_at := CreateAt}) -> - ExpiredAt = - case ExpiredAt0 of - infinity -> <<"infinity">>; - _ -> list_to_binary(calendar:system_time_to_rfc3339(ExpiredAt0)) - end, - App#{ - expired_at => ExpiredAt, - created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt)) - }. - ensure_expired_at(#{<<"expired_at">> := ExpiredAt}) when is_integer(ExpiredAt) -> ExpiredAt; ensure_expired_at(_) -> infinity. diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 1f1dda5f2..5c2419ccf 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -19,7 +19,6 @@ -behaviour(minirest_api). -export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]). --import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]). -export([ listener_type_status/2, @@ -36,6 +35,16 @@ do_list_listeners/0 ]). +-import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]). + +-import(emqx_mgmt_listeners_conf, [ + action/4, + create/3, + ensure_remove/2, + get_raw/2, + update/3 +]). + -include_lib("emqx/include/emqx.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -44,7 +53,6 @@ -define(LISTENER_NOT_FOUND, <<"Listener id not found">>). -define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>). -define(ADDR_PORT_INUSE, <<"Addr port in use">>). --define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}). namespace() -> "listeners". @@ -387,14 +395,13 @@ crud_listeners_by_id(get, #{bindings := #{id := Id0}}) -> crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) -> case parse_listener_conf(Body0) of {Id, Type, Name, Conf} -> - Path = [listeners, Type, Name], - case emqx_conf:get_raw(Path, undefined) of + case get_raw(Type, Name) of undefined -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; PrevConf -> MergeConfT = emqx_utils_maps:deep_merge(PrevConf, Conf), MergeConf = emqx_listeners:ensure_override_limiter_conf(MergeConfT, Conf), - case update(Path, MergeConf) of + case update(Type, Name, MergeConf) of {ok, #{raw_config := _RawConf}} -> crud_listeners_by_id(get, #{bindings => #{id => Id}}); {error, not_found} -> @@ -412,7 +419,7 @@ crud_listeners_by_id(post, #{body := Body}) -> create_listener(Body); crud_listeners_by_id(delete, #{bindings := #{id := Id}}) -> {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), - case ensure_remove([listeners, Type, Name]) of + case ensure_remove(Type, Name) of {ok, _} -> {204}; {error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}} end. @@ -457,12 +464,11 @@ restart_listeners_by_id(Method, Body = #{bindings := Bindings}) -> action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) -> {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), - Path = [listeners, Type, Name], - case emqx_conf:get_raw(Path, undefined) of + case get_raw(Type, Name) of undefined -> {404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}}; _PrevConf -> - case action(Path, Action, enabled(Action)) of + case action(Type, Name, Action, enabled(Action)) of {ok, #{raw_config := _RawConf}} -> {200}; {error, not_found} -> @@ -634,23 +640,6 @@ max_conn(_Int1, <<"infinity">>) -> <<"infinity">>; max_conn(<<"infinity">>, _Int) -> <<"infinity">>; max_conn(Int1, Int2) -> Int1 + Int2. -update(Path, Conf) -> - wrap(emqx_conf:update(Path, {update, Conf}, ?OPTS(cluster))). - -action(Path, Action, Conf) -> - wrap(emqx_conf:update(Path, {action, Action, Conf}, ?OPTS(cluster))). - -create(Path, Conf) -> - wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))). - -ensure_remove(Path) -> - wrap(emqx_conf:tombstone(Path, ?OPTS(cluster))). - -wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason}; -wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason}; -wrap({error, Reason}) -> {error, Reason}; -wrap(Ok) -> Ok. - listener_type_status_example() -> [ #{ @@ -813,8 +802,7 @@ tcp_schema_example() -> create_listener(Body) -> case parse_listener_conf(Body) of {Id, Type, Name, Conf} -> - Path = [listeners, Type, Name], - case create(Path, Conf) of + case create(Type, Name, Conf) of {ok, #{raw_config := _RawConf}} -> crud_listeners_by_id(get, #{bindings => #{id => Id}}); {error, already_exist} -> diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index ffb41179f..4fe47cf93 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -17,6 +17,8 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-behaviour(emqx_db_backup). + %% API -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). @@ -28,12 +30,15 @@ update/4, delete/1, list/0, - init_bootstrap_file/0 + init_bootstrap_file/0, + format/1 ]). -export([authorize/3]). -export([post_config_update/5]). +-export([backup_tables/0]). + %% Internal exports (RPC) -export([ do_update/4, @@ -67,6 +72,12 @@ mnesia(boot) -> {attributes, record_info(fields, ?APP)} ]). +%%-------------------------------------------------------------------- +%% Data backup +%%-------------------------------------------------------------------- + +backup_tables() -> [?APP]. + post_config_update([api_key], _Req, NewConf, _OldConf, _AppEnvs) -> #{bootstrap_file := File} = NewConf, case init_bootstrap_file(File) of @@ -127,6 +138,17 @@ do_delete(Name) -> [_App] -> mnesia:delete({?APP, Name}) end. +format(App = #{expired_at := ExpiredAt0, created_at := CreateAt}) -> + ExpiredAt = + case ExpiredAt0 of + infinity -> <<"infinity">>; + _ -> list_to_binary(calendar:system_time_to_rfc3339(ExpiredAt0)) + end, + App#{ + expired_at => ExpiredAt, + created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt)) + }. + list() -> to_map(ets:match_object(?APP, #?APP{_ = '_'})). diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 3ebf8e314..a3a643681 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -25,6 +25,7 @@ -include("emqx_mgmt.hrl"). -define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~ts~n", [Cmd, Descr])). +-define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}). -export([load/0]). @@ -44,7 +45,8 @@ log/1, authz/1, pem_cache/1, - olp/1 + olp/1, + data/1 ]). -define(PROC_INFOKEYS, [ @@ -739,6 +741,37 @@ olp(_) -> {"olp disable", "Disable overload protection"} ]). +%%-------------------------------------------------------------------- +%% @doc data Command + +data(["export"]) -> + case emqx_mgmt_data_backup:export(?DATA_BACKUP_OPTS) of + {ok, #{filename := Filename}} -> + emqx_ctl:print("Data has been successfully exported to ~s.~n", [Filename]); + {error, Reason} -> + Reason1 = emqx_mgmt_data_backup:format_error(Reason), + emqx_ctl:print("[error] Data export failed, reason: ~p.~n", [Reason1]) + end; +data(["import", Filename]) -> + case emqx_mgmt_data_backup:import(Filename, ?DATA_BACKUP_OPTS) of + {ok, #{db_errors := DbErrs, config_errors := ConfErrs}} when + map_size(DbErrs) =:= 0, map_size(ConfErrs) =:= 0 + -> + emqx_ctl:print("Data has been imported successfully.~n"); + {ok, _} -> + emqx_ctl:print( + "Data has been imported, but some errors occurred, see the the log above.~n" + ); + {error, Reason} -> + Reason1 = emqx_mgmt_data_backup:format_error(Reason), + emqx_ctl:print("[error] Data import failed, reason: ~p.~n", [Reason1]) + end; +data(_) -> + emqx_ctl:usage([ + {"data import ", "Import data from the specified tar archive file"}, + {"data export", "Export data"} + ]). + %%-------------------------------------------------------------------- %% Dump ETS %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl new file mode 100644 index 000000000..5e59bd057 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -0,0 +1,690 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_data_backup). + +-export([ + export/0, + export/1, + import/1, + import/2, + format_error/1 +]). + +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. + +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + +-include_lib("kernel/include/file.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-define(ROOT_BACKUP_DIR, "backup"). +-define(BACKUP_MNESIA_DIR, "mnesia"). +-define(TAR_SUFFIX, ".tar.gz"). +-define(META_FILENAME, "META.hocon"). +-define(CLUSTER_HOCON_FILENAME, "cluster.hocon"). +-define(CONF_KEYS, [ + <<"delayed">>, + <<"rewrite">>, + <<"retainer">>, + <<"mqtt">>, + <<"alarm">>, + <<"sysmon">>, + <<"sys_topics">>, + <<"limiter">>, + <<"log">>, + <<"persistent_session_store">>, + <<"prometheus">>, + <<"crl_cache">>, + <<"conn_congestion">>, + <<"force_shutdown">>, + <<"flapping_detect">>, + <<"broker">>, + <<"force_gc">>, + <<"zones">> +]). + +-define(DEFAULT_OPTS, #{}). +-define(tar(_FileName_), _FileName_ ++ ?TAR_SUFFIX). +-define(fmt_tar_err(_Expr_), + fun() -> + case _Expr_ of + {error, _Reason_} -> {error, erl_tar:format_error(_Reason_)}; + _Other_ -> _Other_ + end + end() +). + +-type backup_file_info() :: #{ + filename => binary(), + size => non_neg_integer(), + created_at => binary(), + node => node(), + atom() => _ +}. + +-type db_error_details() :: #{mria:table() => {error, _}}. +-type config_error_details() :: #{emqx_utils_maps:config_path() => {error, _}}. + +%%------------------------------------------------------------------------------ +%% APIs +%%------------------------------------------------------------------------------ + +-spec export() -> {ok, backup_file_info()} | {error, _}. +export() -> + export(?DEFAULT_OPTS). + +-spec export(map()) -> {ok, backup_file_info()} | {error, _}. +export(Opts) -> + {BackupName, TarDescriptor} = prepare_new_backup(Opts), + try + do_export(BackupName, TarDescriptor, Opts) + catch + Class:Reason:Stack -> + ?SLOG(error, #{ + msg => "emqx_data_export_failed", + exception => Class, + reason => Reason, + stacktrace => Stack + }), + {error, Reason} + after + %% erl_tar:close/1 raises error if called on an already closed tar + catch erl_tar:close(TarDescriptor), + file:del_dir_r(BackupName) + end. + +-spec import(file:filename_all()) -> + {ok, #{db_errors => db_error_details(), config_errors => config_error_details()}} + | {error, _}. +import(BackupFileName) -> + import(BackupFileName, ?DEFAULT_OPTS). + +-spec import(file:filename_all(), map()) -> + {ok, #{db_errors => db_error_details(), config_errors => config_error_details()}} + | {error, _}. +import(BackupFileName, Opts) -> + case is_import_allowed() of + true -> + case lookup_file(str(BackupFileName)) of + {ok, FilePath} -> + do_import(FilePath, Opts); + Err -> + Err + end; + false -> + {error, not_core_node} + end. + +format_error(not_core_node) -> + str( + io_lib:format( + "backup data import is only allowed on core EMQX nodes, but requested node ~p is not core", + [node()] + ) + ); +format_error(ee_to_ce_backup) -> + "importing EMQX Enterprise data backup to EMQX is not allowed"; +format_error(missing_backup_meta) -> + "invalid backup archive file: missing " ?META_FILENAME; +format_error(invalid_edition) -> + "invalid backup archive content: wrong EMQX edition value in " ?META_FILENAME; +format_error(invalid_version) -> + "invalid backup archive content: wrong EMQX version value in " ?META_FILENAME; +format_error(bad_archive_dir) -> + "invalid backup archive content: all files in the archive must be under directory"; +format_error(not_found) -> + "backup file not found"; +format_error(bad_backup_name) -> + "invalid backup name: file name must have " ?TAR_SUFFIX " extension"; +format_error({unsupported_version, ImportVersion}) -> + str( + io_lib:format( + "[warning] Backup version ~p is newer than EMQX version ~p, import is not allowed.~n", + [str(ImportVersion), str(emqx_release:version())] + ) + ); +format_error(Reason) -> + Reason. + +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + +prepare_new_backup(Opts) -> + Ts = erlang:system_time(millisecond), + {{Y, M, D}, {HH, MM, SS}} = local_datetime(Ts), + BackupBaseName = str( + io_lib:format( + "emqx-export-~0p-~2..0b-~2..0b-~2..0b-~2..0b-~2..0b.~3..0b", + [Y, M, D, HH, MM, SS, Ts rem 1000] + ) + ), + BackupName = filename:join(root_backup_dir(), BackupBaseName), + BackupTarName = ?tar(BackupName), + maybe_print("Exporting data to ~p...~n", [BackupTarName], Opts), + {ok, TarDescriptor} = ?fmt_tar_err(erl_tar:open(BackupTarName, [write, compressed])), + {BackupName, TarDescriptor}. + +do_export(BackupName, TarDescriptor, Opts) -> + BackupBaseName = filename:basename(BackupName), + BackupTarName = ?tar(BackupName), + Meta = #{ + version => emqx_release:version(), + edition => emqx_release:edition() + }, + MetaBin = bin(hocon_pp:do(Meta, #{})), + MetaFileName = filename:join(BackupBaseName, ?META_FILENAME), + + ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, MetaBin, MetaFileName, [])), + ok = export_cluster_hocon(TarDescriptor, BackupBaseName, Opts), + ok = export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts), + ok = ?fmt_tar_err(erl_tar:close(TarDescriptor)), + {ok, #file_info{ + size = Size, + ctime = {{Y1, M1, D1}, {H1, MM1, S1}} + }} = file:read_file_info(BackupTarName), + CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y1, M1, D1, H1, MM1, S1]), + {ok, #{ + filename => bin(BackupTarName), + size => Size, + created_at => bin(CreatedAt), + node => node() + }}. + +export_cluster_hocon(TarDescriptor, BackupBaseName, Opts) -> + maybe_print("Exporting cluster configuration...~n", [], Opts), + RawConf = emqx_config:read_override_conf(#{override_to => cluster}), + maybe_print( + "Exporting additional files from EMQX data_dir: ~p...~n", [str(emqx:data_dir())], Opts + ), + RawConf1 = read_data_files(RawConf), + RawConfBin = bin(hocon_pp:do(RawConf1, #{})), + NameInArchive = filename:join(BackupBaseName, ?CLUSTER_HOCON_FILENAME), + ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, RawConfBin, NameInArchive, [])). + +export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts) -> + maybe_print("Exporting built-in database...~n", [], Opts), + lists:foreach( + fun(Tab) -> export_mnesia_tab(TarDescriptor, Tab, BackupName, BackupBaseName, Opts) end, + tabs_to_backup() + ). + +export_mnesia_tab(TarDescriptor, TabName, BackupName, BackupBaseName, Opts) -> + maybe_print("Exporting ~p database table...~n", [TabName], Opts), + {ok, MnesiaBackupName} = do_export_mnesia_tab(TabName, BackupName), + NameInArchive = mnesia_backup_name(BackupBaseName, TabName), + ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, MnesiaBackupName, NameInArchive, [])), + _ = file:delete(MnesiaBackupName), + ok. + +do_export_mnesia_tab(TabName, BackupName) -> + Node = node(), + try + {ok, TabName, [Node]} = mnesia:activate_checkpoint( + [{name, TabName}, {min, [TabName]}, {allow_remote, false}] + ), + MnesiaBackupName = mnesia_backup_name(BackupName, TabName), + ok = filelib:ensure_dir(MnesiaBackupName), + ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName), + {ok, MnesiaBackupName} + after + mnesia:deactivate_checkpoint(TabName) + end. + +-ifdef(TEST). +tabs_to_backup() -> + %% Allow mocking in tests + ?MODULE:mnesia_tabs_to_backup(). +-else. +tabs_to_backup() -> + mnesia_tabs_to_backup(). +-endif. + +mnesia_tabs_to_backup() -> + lists:flatten([M:backup_tables() || M <- find_behaviours(emqx_db_backup)]). + +mnesia_backup_name(Path, TabName) -> + filename:join([Path, ?BACKUP_MNESIA_DIR, atom_to_list(TabName)]). + +is_import_allowed() -> + mria_rlog:role() =:= core. + +validate_backup(BackupDir) -> + case hocon:files([filename:join(BackupDir, ?META_FILENAME)]) of + {ok, #{ + <<"edition">> := Edition, + <<"version">> := Version + }} = Meta -> + validate( + [ + fun() -> check_edition(Edition) end, + fun() -> check_version(Version) end + ], + Meta + ); + _ -> + ?SLOG(error, #{msg => "missing_backup_meta", backup => BackupDir}), + {error, missing_backup_meta} + end. + +validate([ValidatorFun | T], OkRes) -> + case ValidatorFun() of + ok -> validate(T, OkRes); + Err -> Err + end; +validate([], OkRes) -> + OkRes. + +check_edition(BackupEdition) when BackupEdition =:= <<"ce">>; BackupEdition =:= <<"ee">> -> + Edition = bin(emqx_release:edition()), + case {BackupEdition, Edition} of + {<<"ee">>, <<"ce">>} -> + {error, ee_to_ce_backup}; + _ -> + ok + end; +check_edition(BackupEdition) -> + ?SLOG(error, #{msg => "invalid_backup_edition", edition => BackupEdition}), + {error, invalid_edition}. + +check_version(ImportVersion) -> + case parse_version_no_patch(ImportVersion) of + {ok, {ImportMajorInt, ImportMinorInt}} -> + Version = emqx_release:version(), + {ok, {MajorInt, MinorInt}} = parse_version_no_patch(bin(Version)), + case ImportMajorInt > MajorInt orelse ImportMinorInt > MinorInt of + true -> + %% 4.x backup files are anyway not compatible and will be treated as invalid, + %% before this step, + {error, {unsupported_version, str(ImportVersion)}}; + false -> + ok + end; + Err -> + Err + end. + +parse_version_no_patch(VersionBin) -> + case string:split(VersionBin, ".", all) of + [Major, Minor | _] -> + {MajorInt, _} = emqx_utils_binary:bin_to_int(Major), + {MinorInt, _} = emqx_utils_binary:bin_to_int(Minor), + {ok, {MajorInt, MinorInt}}; + _ -> + ?SLOG(error, #{msg => "failed_to_parse_backup_version", version => VersionBin}), + {error, invalid_version} + end. + +do_import(BackupFileName, Opts) -> + BackupDir = filename:join(root_backup_dir(), filename:basename(BackupFileName, ?TAR_SUFFIX)), + maybe_print("Importing data from ~p...~n", [BackupFileName], Opts), + try + ok = validate_backup_name(BackupFileName), + ok = extract_backup(BackupFileName), + {ok, _} = validate_backup(BackupDir), + ConfErrors = import_cluster_hocon(BackupDir, Opts), + MnesiaErrors = import_mnesia_tabs(BackupDir, Opts), + ?SLOG(info, #{msg => "emqx_data_import_success"}), + {ok, #{db_errors => MnesiaErrors, config_errors => ConfErrors}} + catch + error:{badmatch, {error, Reason}}:Stack -> + ?SLOG(error, #{msg => "emqx_data_import_failed", reason => Reason, stacktrace => Stack}), + {error, Reason}; + Class:Reason:Stack -> + ?SLOG(error, #{ + msg => "emqx_data_import_failed", + exception => Class, + reason => Reason, + stacktrace => Stack + }), + {error, Reason} + after + file:del_dir_r(BackupDir) + end. + +import_mnesia_tabs(BackupDir, Opts) -> + maybe_print("Importing built-in database...~n", [], Opts), + filter_errors( + lists:foldr( + fun(Tab, Acc) -> Acc#{Tab => import_mnesia_tab(BackupDir, Tab, Opts)} end, + #{}, + tabs_to_backup() + ) + ). + +import_mnesia_tab(BackupDir, TabName, Opts) -> + MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName), + case filelib:is_regular(MnesiaBackupFileName) of + true -> + maybe_print("Importing ~p database table...~n", [TabName], Opts), + restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts); + false -> + maybe_print("No backup file for ~p database table...~n", [TabName], Opts), + ?SLOG(info, #{msg => "missing_mnesia_backup", table => TabName, backup => BackupDir}), + ok + end. + +restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) -> + BackupNameToImport = MnesiaBackupFileName ++ "_for_import", + Prepared = + catch mnesia:traverse_backup( + MnesiaBackupFileName, BackupNameToImport, fun backup_converter/2, 0 + ), + try + case Prepared of + {ok, _} -> + Restored = mnesia:restore(BackupNameToImport, [{default_op, keep_tables}]), + case Restored of + {atomic, [TabName]} -> + ok; + RestoreErr -> + ?SLOG(error, #{ + msg => "failed_to_restore_mnesia_backup", + table => TabName, + backup => BackupDir, + reason => RestoreErr + }), + maybe_print_mnesia_import_err(TabName, RestoreErr, Opts), + {error, RestoreErr} + end; + PrepareErr -> + ?SLOG(error, #{ + msg => "failed_to_prepare_mnesia_backup_for_restoring", + table => TabName, + backup => BackupDir, + reason => PrepareErr + }), + maybe_print_mnesia_import_err(TabName, PrepareErr, Opts), + PrepareErr + end + after + %% Cleanup files as soon as they are not needed any more for more efficient disk usage + _ = file:delete(BackupNameToImport), + _ = file:delete(MnesiaBackupFileName) + end. + +backup_converter({schema, Tab, CreateList}, Acc) -> + check_rec_attributes(Tab, CreateList), + {[{schema, Tab, lists:map(fun convert_copies/1, CreateList)}], Acc}; +backup_converter(Other, Acc) -> + {[Other], Acc}. + +check_rec_attributes(Tab, CreateList) -> + ImportAttributes = proplists:get_value(attributes, CreateList), + Attributes = mnesia:table_info(Tab, attributes), + case ImportAttributes =/= Attributes of + true -> + throw({error, different_table_schema}); + false -> + ok + end. + +convert_copies({K, [_ | _]}) when K == ram_copies; K == disc_copies; K == disc_only_copies -> + {K, [node()]}; +convert_copies(Other) -> + Other. + +extract_backup(BackupFileName) -> + BackupDir = root_backup_dir(), + ok = validate_filenames(BackupFileName), + ?fmt_tar_err(erl_tar:extract(BackupFileName, [{cwd, BackupDir}, compressed])). + +validate_filenames(BackupFileName) -> + {ok, FileNames} = ?fmt_tar_err(erl_tar:table(BackupFileName, [compressed])), + BackupName = filename:basename(BackupFileName, ?TAR_SUFFIX), + IsValid = lists:all( + fun(FileName) -> + [Root | _] = filename:split(FileName), + Root =:= BackupName + end, + FileNames + ), + case IsValid of + true -> ok; + false -> {error, bad_archive_dir} + end. + +import_cluster_hocon(BackupDir, Opts) -> + HoconFileName = filename:join(BackupDir, ?CLUSTER_HOCON_FILENAME), + case filelib:is_regular(HoconFileName) of + true -> + {ok, RawConf} = hocon:files([HoconFileName]), + {ok, _} = validate_cluster_hocon(RawConf), + maybe_print("Importing cluster configuration...~n", [], Opts), + %% At this point, when all validations have been passed, we want to log errors (if any) + %% but proceed with the next items, instead of aborting the whole import operation + do_import_conf(RawConf, Opts); + false -> + maybe_print("No cluster configuration to be imported.~n", [], Opts), + ?SLOG(info, #{ + msg => "no_backup_hocon_config_to_import", + backup => BackupDir + }), + #{} + end. + +read_data_files(RawConf) -> + DataDir = bin(emqx:data_dir()), + {ok, Cwd} = file:get_cwd(), + AbsDataDir = bin(filename:join(Cwd, DataDir)), + RawConf1 = emqx_authz:maybe_read_acl_file(RawConf), + emqx_utils_maps:deep_convert(RawConf1, fun read_data_file/4, [DataDir, AbsDataDir]). + +-define(dir_pattern(_Dir_), <<_Dir_:(byte_size(_Dir_))/binary, _/binary>>). + +read_data_file(Key, Val, DataDir, AbsDataDir) -> + Val1 = + case Val of + ?dir_pattern(DataDir) = FileName -> + do_read_file(FileName); + ?dir_pattern(AbsDataDir) = FileName -> + do_read_file(FileName); + V -> + V + end, + {Key, Val1}. + +do_read_file(FileName) -> + case file:read_file(FileName) of + {ok, Content} -> + Content; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_read_data_file", + filename => FileName, + reason => Reason + }), + FileName + end. + +validate_cluster_hocon(RawConf) -> + %% write ACL file to comply with the schema... + RawConf1 = emqx_authz:maybe_write_acl_file(RawConf), + emqx_hocon:check( + emqx_conf:schema_module(), + maps:merge(emqx:get_raw_config([]), RawConf1), + #{atom_key => false, required => false} + ). + +do_import_conf(RawConf, Opts) -> + GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))), + maybe_print_errors(GenConfErrs, Opts), + Errors = + lists:foldr( + fun(Module, ErrorsAcc) -> + Module:import_config(RawConf), + case Module:import_config(RawConf) of + {ok, #{changed := Changed}} -> + maybe_print_changed(Changed, Opts), + ErrorsAcc; + {error, #{root_key := RootKey, reason := Reason}} -> + ErrorsAcc#{[RootKey] => Reason} + end + end, + GenConfErrs, + find_behaviours(emqx_config_backup) + ), + maybe_print_errors(Errors, Opts), + Errors. + +import_generic_conf(Data) -> + lists:map( + fun(Key) -> + case maps:get(Key, Data, undefined) of + undefined -> {[Key], ok}; + Conf -> {[Key], emqx_conf:update([Key], Conf, #{override_to => cluster})} + end + end, + ?CONF_KEYS + ). + +maybe_print_changed(Changed, Opts) -> + lists:foreach( + fun(ChangedPath) -> + maybe_print( + "Config key path ~p was present before import and " + "has been overwritten.~n", + [pretty_path(ChangedPath)], + Opts + ) + end, + Changed + ). + +maybe_print_errors(Errors, Opts) -> + maps:foreach( + fun(Path, Err) -> + maybe_print( + "Failed to import the following config path: ~p, reason: ~p~n", + [pretty_path(Path), Err], + Opts + ) + end, + Errors + ). + +filter_errors(Results) -> + maps:filter( + fun + (_Path, {error, _}) -> true; + (_, _) -> false + end, + Results + ). + +pretty_path(Path) -> + str(lists:join(".", [str(Part) || Part <- Path])). + +str(Data) when is_atom(Data) -> + atom_to_list(Data); +str(Data) -> + unicode:characters_to_list(Data). + +bin(Data) when is_atom(Data) -> + atom_to_binary(Data, utf8); +bin(Data) -> + unicode:characters_to_binary(Data). + +validate_backup_name(FileName) -> + BaseName = filename:basename(FileName, ?TAR_SUFFIX), + ValidName = BaseName ++ ?TAR_SUFFIX, + case filename:basename(FileName) of + ValidName -> ok; + _ -> {error, bad_backup_name} + end. + +lookup_file(FileName) -> + case filelib:is_regular(FileName) of + true -> + {ok, FileName}; + false -> + %% Only lookup by basename, don't allow to lookup by file path + case FileName =:= filename:basename(FileName) of + true -> + FilePath = filename:join(root_backup_dir(), FileName), + case filelib:is_file(FilePath) of + true -> {ok, FilePath}; + false -> {error, not_found} + end; + false -> + {error, not_found} + end + end. + +root_backup_dir() -> + Dir = filename:join(emqx:data_dir(), ?ROOT_BACKUP_DIR), + ok = ensure_path(Dir), + Dir. + +-if(?OTP_RELEASE < 25). +ensure_path(Path) -> filelib:ensure_dir(filename:join([Path, "dummy"])). +-else. +ensure_path(Path) -> filelib:ensure_path(Path). +-endif. + +local_datetime(MillisecondTs) -> + calendar:system_time_to_local_time(MillisecondTs, millisecond). + +maybe_print(Format, Args, #{print_fun := PrintFun}) -> + PrintFun(Format, Args); +maybe_print(_Format, _Args, _Opts) -> + ok. + +maybe_print_mnesia_import_err(TabName, Error, Opts) -> + maybe_print( + "[error] Failed to import built-in database table: ~p, reason: ~p~n", + [TabName, Error], + Opts + ). + +find_behaviours(Behaviour) -> + find_behaviours(Behaviour, apps(), []). + +%% Based on minirest_api:find_api_modules/1 +find_behaviours(_Behaviour, [] = _Apps, Acc) -> + Acc; +find_behaviours(Behaviour, [App | Apps], Acc) -> + case application:get_key(App, modules) of + undefined -> + Acc; + {ok, Modules} -> + NewAcc = lists:filter( + fun(Module) -> + Info = Module:module_info(attributes), + Bhvrs = lists:flatten( + proplists:get_all_values(behavior, Info) ++ + proplists:get_all_values(behaviour, Info) + ), + lists:member(Behaviour, Bhvrs) + end, + Modules + ), + find_behaviours(Behaviour, Apps, NewAcc ++ Acc) + end. + +apps() -> + [ + App + || {App, _, _} <- application:loaded_applications(), + case re:run(atom_to_list(App), "^emqx") of + {match, [{0, 4}]} -> true; + _ -> false + end + ]. diff --git a/apps/emqx_management/src/emqx_mgmt_listeners_conf.erl b/apps/emqx_management/src/emqx_mgmt_listeners_conf.erl new file mode 100644 index 000000000..f54aca845 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_listeners_conf.erl @@ -0,0 +1,96 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_listeners_conf). + +-behaviour(emqx_config_backup). + +-export([ + action/4, + create/3, + ensure_remove/2, + get_raw/2, + update/3 +]). + +%% Data backup +-export([ + import_config/1 +]). + +-include_lib("emqx/include/logger.hrl"). + +-define(CONF_ROOT_KEY, listeners). +-define(path(_Type_, _Name_), [?CONF_ROOT_KEY, _Type_, _Name_]). +-define(OPTS, #{rawconf_with_defaults => true, override_to => cluster}). +-define(IMPORT_OPTS, #{override_to => cluster}). + +action(Type, Name, Action, Conf) -> + wrap(emqx_conf:update(?path(Type, Name), {action, Action, Conf}, ?OPTS)). + +create(Type, Name, Conf) -> + wrap(emqx_conf:update(?path(Type, Name), {create, Conf}, ?OPTS)). + +ensure_remove(Type, Name) -> + wrap(emqx_conf:tombstone(?path(Type, Name), ?OPTS)). + +get_raw(Type, Name) -> emqx_conf:get_raw(?path(Type, Name), undefined). + +update(Type, Name, Conf) -> + wrap(emqx_conf:update(?path(Type, Name), {update, Conf}, ?OPTS)). + +wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason}; +wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason}; +wrap({error, Reason}) -> {error, Reason}; +wrap(Ok) -> Ok. + +%%------------------------------------------------------------------------------ +%% Data backup +%%------------------------------------------------------------------------------ + +import_config(RawConf) -> + NewConf = maps:get(<<"listeners">>, RawConf, #{}), + OldConf = emqx:get_raw_config([?CONF_ROOT_KEY], #{}), + MergedConf = merge_confs(OldConf, NewConf), + case emqx_conf:update([?CONF_ROOT_KEY], MergedConf, ?IMPORT_OPTS) of + {ok, #{raw_config := NewRawConf}} -> + {ok, #{root_key => ?CONF_ROOT_KEY, changed => changed_paths(OldConf, NewRawConf)}}; + Error -> + {error, #{root_key => ?CONF_ROOT_KEY, reason => Error}} + end. + +merge_confs(OldConf, NewConf) -> + AllTypes = maps:keys(maps:merge(OldConf, NewConf)), + lists:foldr( + fun(Type, Acc) -> + NewListeners = maps:get(Type, NewConf, #{}), + OldListeners = maps:get(Type, OldConf, #{}), + Acc#{Type => maps:merge(OldListeners, NewListeners)} + end, + #{}, + AllTypes + ). + +changed_paths(OldRawConf, NewRawConf) -> + maps:fold( + fun(Type, Listeners, ChangedAcc) -> + OldListeners = maps:get(Type, OldRawConf, #{}), + Changed = maps:get(changed, emqx_utils_maps:diff_maps(Listeners, OldListeners)), + [?path(Type, K) || K <- maps:keys(Changed)] ++ ChangedAcc + end, + [], + NewRawConf + ). diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl new file mode 100644 index 000000000..9df6d2138 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -0,0 +1,519 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mgmt_data_backup_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(BOOTSTRAP_BACKUP, "emqx-export-test-bootstrap-ce.tar.gz"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + [application:load(App) || App <- apps_to_start() ++ apps_to_load()], + Config. + +end_per_suite(_Config) -> + ok. + +init_per_testcase(t_import_on_cluster, Config) -> + %% Don't import listeners to avoid port conflicts + %% when the same conf will be imported to another cluster + meck:new(emqx_mgmt_listeners_conf, [passthrough]), + meck:new(emqx_gateway_conf, [passthrough]), + meck:expect( + emqx_mgmt_listeners_conf, + import_config, + 1, + {ok, #{changed => [], root_key => listeners}} + ), + meck:expect( + emqx_gateway_conf, + import_config, + 1, + {ok, #{changed => [], root_key => gateway}} + ), + [{cluster, cluster(Config)} | setup(Config)]; +init_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) -> + [{cluster, cluster(Config)} | setup(Config)]; +init_per_testcase(t_mnesia_bad_tab_schema, Config) -> + meck:new(emqx_mgmt_data_backup, [passthrough]), + meck:expect(emqx_mgmt_data_backup, mnesia_tabs_to_backup, 0, [data_backup_test]), + setup(Config); +init_per_testcase(_TestCase, Config) -> + setup(Config). + +end_per_testcase(t_import_on_cluster, Config) -> + cleanup_cluster(?config(cluster, Config)), + cleanup(Config), + meck:unload(emqx_mgmt_listeners_conf), + meck:unload(emqx_gateway_conf); +end_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) -> + cleanup_cluster(?config(cluster, Config)), + cleanup(Config); +end_per_testcase(t_mnesia_bad_tab_schema, Config) -> + cleanup(Config), + meck:unload(emqx_mgmt_data_backup); +end_per_testcase(_TestCase, Config) -> + cleanup(Config). + +t_empty_export_import(_Config) -> + ExpRawConf = emqx:get_raw_config([]), + {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), + Exp = {ok, #{db_errors => #{}, config_errors => #{}}}, + ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)), + ?assertEqual(ExpRawConf, emqx:get_raw_config([])), + %% idempotent update assert + ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)), + ?assertEqual(ExpRawConf, emqx:get_raw_config([])). + +t_cluster_hocon_export_import(Config) -> + RawConfBeforeImport = emqx:get_raw_config([]), + BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP), + Exp = {ok, #{db_errors => #{}, config_errors => #{}}}, + ?assertEqual(Exp, emqx_mgmt_data_backup:import(BootstrapFile)), + RawConfAfterImport = emqx:get_raw_config([]), + ?assertNotEqual(RawConfBeforeImport, RawConfAfterImport), + {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), + ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)), + ?assertEqual(RawConfAfterImport, emqx:get_raw_config([])), + %% idempotent update assert + ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)), + ?assertEqual(RawConfAfterImport, emqx:get_raw_config([])), + %% lookup file inside /backup + ?assertEqual(Exp, emqx_mgmt_data_backup:import(filename:basename(FileName))). + +t_ee_to_ce_backup(Config) -> + case emqx_release:edition() of + ce -> + EEBackupFileName = filename:join(?config(priv_dir, Config), "export-backup-ee.tar.gz"), + Meta = unicode:characters_to_binary( + hocon_pp:do(#{edition => ee, version => emqx_release:version()}, #{}) + ), + ok = erl_tar:create( + EEBackupFileName, + [ + {"export-backup-ee/cluster.hocon", <<>>}, + {"export-backup-ee/META.hocon", Meta} + ], + [compressed] + ), + ExpReason = ee_to_ce_backup, + ?assertEqual( + {error, ExpReason}, emqx_mgmt_data_backup:import(EEBackupFileName) + ), + %% Must be translated to a readable string + ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpReason)); + ee -> + %% Don't fail if the test is run with emqx-enterprise profile + ok + end. + +t_no_backup_file(_Config) -> + ExpReason = not_found, + ?assertEqual( + {error, not_found}, emqx_mgmt_data_backup:import("no_such_backup.tar.gz") + ), + ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpReason)). + +t_bad_backup_file(Config) -> + BadFileName = filename:join(?config(priv_dir, Config), "export-bad-backup-tar-gz"), + ok = file:write_file(BadFileName, <<>>), + NoMetaFileName = filename:join(?config(priv_dir, Config), "export-no-meta.tar.gz"), + ok = erl_tar:create(NoMetaFileName, [{"export-no-meta/cluster.hocon", <<>>}], [compressed]), + BadArchiveDirFileName = filename:join(?config(priv_dir, Config), "export-bad-dir.tar.gz"), + ok = erl_tar:create( + BadArchiveDirFileName, + [ + {"tmp/cluster.hocon", <<>>}, + {"export-bad-dir-inside/META.hocon", <<>>}, + {"/export-bad-dir-inside/mnesia/test_tab", <<>>} + ], + [compressed] + ), + InvalidEditionFileName = filename:join( + ?config(priv_dir, Config), "export-invalid-edition.tar.gz" + ), + Meta = unicode:characters_to_binary( + hocon_pp:do(#{edition => "test", version => emqx_release:version()}, #{}) + ), + ok = erl_tar:create( + InvalidEditionFileName, + [ + {"export-invalid-edition/cluster.hocon", <<>>}, + {"export-invalid-edition/META.hocon", Meta} + ], + [compressed] + ), + InvalidVersionFileName = filename:join( + ?config(priv_dir, Config), "export-invalid-version.tar.gz" + ), + Meta1 = unicode:characters_to_binary( + hocon_pp:do(#{edition => emqx_release:edition(), version => "test"}, #{}) + ), + ok = erl_tar:create( + InvalidVersionFileName, + [ + {"export-invalid-version/cluster.hocon", <<>>}, + {"export-invalid-version/META.hocon", Meta1} + ], + [compressed] + ), + BadFileNameReason = bad_backup_name, + NoMetaReason = missing_backup_meta, + BadArchiveDirReason = bad_archive_dir, + InvalidEditionReason = invalid_edition, + InvalidVersionReason = invalid_version, + ?assertEqual({error, BadFileNameReason}, emqx_mgmt_data_backup:import(BadFileName)), + ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(BadFileNameReason)), + ?assertEqual({error, NoMetaReason}, emqx_mgmt_data_backup:import(NoMetaFileName)), + ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(NoMetaReason)), + ?assertEqual( + {error, BadArchiveDirReason}, + emqx_mgmt_data_backup:import(BadArchiveDirFileName) + ), + ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(BadArchiveDirReason)), + ?assertEqual( + {error, InvalidEditionReason}, + emqx_mgmt_data_backup:import(InvalidEditionFileName) + ), + ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(InvalidEditionReason)), + ?assertEqual( + {error, InvalidVersionReason}, + emqx_mgmt_data_backup:import(InvalidVersionFileName) + ), + ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(InvalidVersionReason)). + +t_future_version(Config) -> + CurrentVersion = list_to_binary(emqx_release:version()), + [_, _ | Patch] = string:split(CurrentVersion, ".", all), + {ok, {MajorInt, MinorInt}} = emqx_mgmt_data_backup:parse_version_no_patch(CurrentVersion), + FutureMajorVersion = recompose_version(MajorInt + 1, MinorInt, Patch), + FutureMinorVersion = recompose_version(MajorInt, MinorInt + 1, Patch), + [MajorMeta, MinorMeta] = + [ + unicode:characters_to_binary( + hocon_pp:do(#{edition => emqx_release:edition(), version => V}, #{}) + ) + || V <- [FutureMajorVersion, FutureMinorVersion] + ], + MajorFileName = filename:join(?config(priv_dir, Config), "export-future-major-ver.tar.gz"), + MinorFileName = filename:join(?config(priv_dir, Config), "export-future-minor-ver.tar.gz"), + ok = erl_tar:create( + MajorFileName, + [ + {"export-future-major-ver/cluster.hocon", <<>>}, + {"export-future-major-ver/META.hocon", MajorMeta} + ], + [compressed] + ), + ok = erl_tar:create( + MinorFileName, + [ + {"export-future-minor-ver/cluster.hocon", <<>>}, + {"export-future-minor-ver/META.hocon", MinorMeta} + ], + [compressed] + ), + ExpMajorReason = {unsupported_version, FutureMajorVersion}, + ExpMinorReason = {unsupported_version, FutureMinorVersion}, + ?assertEqual({error, ExpMajorReason}, emqx_mgmt_data_backup:import(MajorFileName)), + ?assertEqual({error, ExpMinorReason}, emqx_mgmt_data_backup:import(MinorFileName)), + ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpMajorReason)), + ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpMinorReason)). + +t_bad_config(Config) -> + BadConfigFileName = filename:join(?config(priv_dir, Config), "export-bad-config-backup.tar.gz"), + Meta = unicode:characters_to_binary( + hocon_pp:do(#{edition => emqx_release:edition(), version => emqx_release:version()}, #{}) + ), + BadConfigMap = #{ + <<"listeners">> => + #{ + <<"bad-type">> => + #{<<"bad-name">> => #{<<"bad-field">> => <<"bad-val">>}} + } + }, + BadConfig = unicode:characters_to_binary(hocon_pp:do(BadConfigMap, #{})), + ok = erl_tar:create( + BadConfigFileName, + [ + {"export-bad-config-backup/cluster.hocon", BadConfig}, + {"export-bad-config-backup/META.hocon", Meta} + ], + [compressed] + ), + Res = emqx_mgmt_data_backup:import(BadConfigFileName), + ?assertMatch({error, #{kind := validation_error}}, Res). + +t_import_on_cluster(Config) -> + %% Randomly chosen config key to verify import result additionally + ?assertEqual([], emqx:get_config([authentication])), + BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP), + ExpImportRes = {ok, #{db_errors => #{}, config_errors => #{}}}, + ?assertEqual(ExpImportRes, emqx_mgmt_data_backup:import(BootstrapFile)), + ImportedAuthnConf = emqx:get_config([authentication]), + ?assertMatch([_ | _], ImportedAuthnConf), + {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), + {ok, Cwd} = file:get_cwd(), + AbsFilePath = filename:join(Cwd, FileName), + [CoreNode1, _CoreNode2, ReplicantNode] = NodesList = ?config(cluster, Config), + ReplImportReason = not_core_node, + ?assertEqual( + {error, ReplImportReason}, + rpc:call(ReplicantNode, emqx_mgmt_data_backup, import, [AbsFilePath]) + ), + ?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ReplImportReason)), + [?assertEqual([], rpc:call(N, emqx, get_config, [[authentication]])) || N <- NodesList], + ?assertEqual( + ExpImportRes, + rpc:call(CoreNode1, emqx_mgmt_data_backup, import, [AbsFilePath]) + ), + [ + ?assertEqual( + authn_ids(ImportedAuthnConf), + authn_ids(rpc:call(N, emqx, get_config, [[authentication]])) + ) + || N <- NodesList + ]. + +t_verify_imported_mnesia_tab_on_cluster(Config) -> + UsersToExport = users(<<"user_to_export_">>), + UsersBeforeImport = users(<<"user_before_import_">>), + [{ok, _} = emqx_dashboard_admin:add_user(U, U, U) || U <- UsersToExport], + {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), + {ok, Cwd} = file:get_cwd(), + AbsFilePath = filename:join(Cwd, FileName), + + [CoreNode1, CoreNode2, ReplicantNode] = NodesList = ?config(cluster, Config), + + [ + {ok, _} = rpc:call(CoreNode1, emqx_dashboard_admin, add_user, [U, U, U]) + || U <- UsersBeforeImport + ], + + ?assertEqual( + {ok, #{db_errors => #{}, config_errors => #{}}}, + rpc:call(CoreNode1, emqx_mgmt_data_backup, import, [AbsFilePath]) + ), + + [Tab] = emqx_dashboard_admin:backup_tables(), + AllUsers = lists:sort(mnesia:dirty_all_keys(Tab) ++ UsersBeforeImport), + [ + ?assertEqual( + AllUsers, + lists:sort(rpc:call(N, mnesia, dirty_all_keys, [Tab])) + ) + || N <- [CoreNode1, CoreNode2] + ], + + %% Give some extra time to replicant to import data... + timer:sleep(3000), + ?assertEqual(AllUsers, lists:sort(rpc:call(ReplicantNode, mnesia, dirty_all_keys, [Tab]))), + + [rpc:call(N, ekka, leave, []) || N <- lists:reverse(NodesList)], + [emqx_common_test_helpers:stop_slave(N) || N <- NodesList]. + +t_mnesia_bad_tab_schema(_Config) -> + OldAttributes = [id, name, description], + ok = create_test_tab(OldAttributes), + ok = mria:dirty_write({data_backup_test, <<"id">>, <<"old_name">>, <<"old_description">>}), + {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), + {atomic, ok} = mnesia:delete_table(data_backup_test), + NewAttributes = [id, name, description, new_field], + ok = create_test_tab(NewAttributes), + NewRec = + {data_backup_test, <<"id">>, <<"new_name">>, <<"new_description">>, <<"new_field_value">>}, + ok = mria:dirty_write(NewRec), + ?assertEqual( + {ok, #{ + db_errors => + #{data_backup_test => {error, {"Backup traversal failed", different_table_schema}}}, + config_errors => #{} + }}, + emqx_mgmt_data_backup:import(FileName) + ), + ?assertEqual([NewRec], mnesia:dirty_read(data_backup_test, <<"id">>)), + ?assertEqual([<<"id">>], mnesia:dirty_all_keys(data_backup_test)). + +t_read_files(_Config) -> + DataDir = emqx:data_dir(), + %% Relative "data" path is set in init_per_testcase/2, asserting it must be safe + ?assertEqual("data", DataDir), + {ok, Cwd} = file:get_cwd(), + AbsDataDir = filename:join(Cwd, DataDir), + FileBaseName = "t_read_files_tmp_file", + TestFileAbsPath = iolist_to_binary(filename:join(AbsDataDir, FileBaseName)), + TestFilePath = iolist_to_binary(filename:join(DataDir, FileBaseName)), + TestFileContent = <<"test_file_content">>, + ok = file:write_file(TestFileAbsPath, TestFileContent), + + RawConf = #{ + <<"test_rootkey">> => #{ + <<"test_field">> => <<"test_field_path">>, + <<"abs_data_dir_path_file">> => TestFileAbsPath, + <<"rel_data_dir_path_file">> => TestFilePath, + <<"path_outside_data_dir">> => <<"/tmp/some-file">> + } + }, + + RawConf1 = emqx_utils_maps:deep_put( + [<<"test_rootkey">>, <<"abs_data_dir_path_file">>], RawConf, TestFileContent + ), + ExpectedConf = emqx_utils_maps:deep_put( + [<<"test_rootkey">>, <<"rel_data_dir_path_file">>], RawConf1, TestFileContent + ), + ?assertEqual(ExpectedConf, emqx_mgmt_data_backup:read_data_files(RawConf)). + +%%------------------------------------------------------------------------------ +%% Internal test helpers +%%------------------------------------------------------------------------------ + +setup(Config) -> + %% avoid port conflicts if the cluster is started + AppHandler = fun + (emqx_dashboard) -> + ok = emqx_config:put([dashboard, listeners, http, bind], 0); + (_) -> + ok + end, + ok = emqx_common_test_helpers:start_apps(apps_to_start(), AppHandler), + PrevDataDir = application:get_env(emqx, data_dir), + application:set_env(emqx, data_dir, "data"), + [{previous_emqx_data_dir, PrevDataDir} | Config]. + +cleanup(Config) -> + emqx_common_test_helpers:stop_apps(apps_to_start()), + case ?config(previous_emqx_data_dir, Config) of + undefined -> + application:unset_env(emqx, data_dir); + {ok, Val} -> + application:set_env(emqx, data_dir, Val) + end. + +cleanup_cluster(ClusterNodes) -> + [rpc:call(N, ekka, leave, []) || N <- lists:reverse(ClusterNodes)], + [emqx_common_test_helpers:stop_slave(N) || N <- ClusterNodes]. + +users(Prefix) -> + [ + <> + || _ <- lists:seq(1, 10) + ]. + +authn_ids(AuthnConf) -> + lists:sort([emqx_authentication:authenticator_id(Conf) || Conf <- AuthnConf]). + +recompose_version(MajorInt, MinorInt, Patch) -> + unicode:characters_to_list( + [integer_to_list(MajorInt + 1), $., integer_to_list(MinorInt), $. | Patch] + ). + +cluster(Config) -> + PrivDataDir = ?config(priv_dir, Config), + [{Core1, Core1Opts}, {Core2, Core2Opts}, {Replicant, ReplOpts}] = + emqx_common_test_helpers:emqx_cluster( + [ + {core, data_backup_core1}, + {core, data_backup_core2}, + {replicant, data_backup_replicant} + ], + #{ + priv_data_dir => PrivDataDir, + schema_mod => emqx_conf_schema, + apps => apps_to_start(), + load_apps => apps_to_start() ++ apps_to_load(), + env => [{mria, db_backend, rlog}], + load_schema => true, + start_autocluster => true, + join_to => true, + listener_ports => [], + conf => [{[dashboard, listeners, http, bind], 0}], + env_handler => + fun(_) -> + application:set_env(emqx, boot_modules, [broker, router]) + end + } + ), + Node1 = emqx_common_test_helpers:start_slave(Core1, Core1Opts), + Node2 = emqx_common_test_helpers:start_slave(Core2, Core2Opts), + #{conf := _ReplConf, env := ReplEnv} = ReplOpts, + ClusterDiscovery = {static, [{seeds, [Node1, Node2]}]}, + ReplOpts1 = maps:remove( + join_to, + ReplOpts#{ + env => [{ekka, cluster_discovery, ClusterDiscovery} | ReplEnv], + env_handler => fun(_) -> + application:set_env(emqx, boot_modules, [broker, router]), + application:set_env( + ekka, + cluster_discovery, + ClusterDiscovery + ) + end + } + ), + ReplNode = emqx_common_test_helpers:start_slave(Replicant, ReplOpts1), + [Node1, Node2, ReplNode]. + +create_test_tab(Attributes) -> + ok = mria:create_table(data_backup_test, [ + {type, set}, + {rlog_shard, data_backup_test_shard}, + {storage, disc_copies}, + {record_name, data_backup_test}, + {attributes, Attributes}, + {storage_properties, [ + {ets, [ + {read_concurrency, true}, + {write_concurrency, true} + ]} + ]} + ]), + ok = mria:wait_for_tables([data_backup_test]). + +apps_to_start() -> + [ + emqx, + emqx_conf, + emqx_psk, + emqx_management, + emqx_dashboard, + emqx_authz, + emqx_authn, + emqx_rule_engine, + emqx_retainer, + emqx_prometheus, + emqx_modules, + emqx_gateway, + emqx_exhook, + emqx_bridge, + emqx_auto_subscribe + ]. + +apps_to_load() -> + [ + emqx_gateway_lwm2m, + emqx_gateway_coap, + emqx_gateway_exproto, + emqx_gateway_stomp, + emqx_gateway_mqttsn + ]. diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-test-bootstrap-ce.tar.gz b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-test-bootstrap-ce.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..b7da76bbb0a8b1037f64e21304a4376f21ff08ab GIT binary patch literal 11822 zcmaLcQ;aUm+MwaqYTLGL+qR9jZQJH*_iEd=ZQHAD+nR6hO!hyM%$_>9b0wA3*>g}; z1ksQ{|6ZV%I+j(Wt7>+${PtbL!ptM!uHq)7l`M&_D#!6E6pyp zdNLaBJmivbu#zFe0cZo{fIyT82*^xHC0W5wNa?-=wBATeb~>p4b0Q>bozw&loCaxSUw(GF`?zJHPr zY2I^9I}W*)(QPlxY((KlSkfmy2*Tap2*AH7m^l+Ah3!{JYejEdeq=Ig*VXSMizw~s z0IQpSvqon6Px2bO_R2oF9;*gUNO95b{+?C0di6m(mMo;?`Y$H(#rCeVI^ew`#0w1g zgY5PJfA7F|kpD;sql{*5<;7y|-%aidz}m5YOF)r=?m@->g2d(S``Hw7$K&=9#jPvt zGu1m?9W!4xQ8u2eONpd@NW2Wf3B)iDWC8Bd+Gq5EPbZ5rogvy4mi~()Hu7NJ+*#g3rjXR zjlBLBKa%xC;4qM?*mtAmXN-`(+5gdaV{xYqU{>Vn0N9x&- z#tUZ47*1c}(PV0`&Sx@Bbl*0BWiQFF28D31W`F*+=f+BS*WL`7PHU+~N-)3lK2r=) zk4u@7p`QhNvx5OBpUY8qGeOVa-1tRKPk(*Vr|-eGFV+3-04;h0e)2ms^W(rXcs@Gt zVj{IIs%sCt0l$)qeB)Ubv{4B`hXNUf;%yq?`|VNaVn~nRvmb$kd&b~AxNPU{>K?>} zcZgsgrQ=&2{2O@KT{_k3Gzm^r!!&V}Nlfee_S9GuEejmfk-n+s1rkMqc-S9h{^caNbxXrNd5bW9Zph z{X|Z9Q)|ku@BadY#7h7ufJyhUREb$8LYTD%5Z;V(aOAQ-kAwC)zDT%PefPe9Kb-^` z{0$k3)VnYQdG0o>K-tN$nS+efe3|4WI?Th3A!?@*0vtwoicgq2mEwP#JNNqCClP#y z+V}3iI2pVb&itm_`J6nN@@oBfS|;duPQS4+ulIcfsmx$orrlK#YQTR}BV^j~7nC3t zj0x{IDIf%}ILL>9MGEfMXYUaqrSrNRDy;sz(|+8i|NeUJB=AoNI60^uJytFFrn&Jx z@ErcG)*F=61#rGvy=!ZL#D&L)I?5g<&@!h00nFy6r6G;czx&TgCV%ta{w-hiyLmD% zWnhKzzfbS&ycs|F*_+1@_?h(od8fedzLf6sR^JzC#qfkbmCw5v`$;U{TC>ipJo3lf z$^0<_;oh>x6%MpB(r<| zs3u?QO-Jf)09V%FEd#Y)vfj?6xJ4*!cJBY`7`Zw~nDzhcZJi9e+397qK$iC|ko@KMjklD+%cjBGi^#Z1 zZ8HDYYQip`BLyCQ!8wn>VCEpu@7^YMIBw^3l(%+z*L*%Z+$54u9HxuSvgaOp#!+Q zSG(y<4`9jDgxzchi%*}0uNAiy|9fwU@DX=RJ)ZB$)CnD9moij=)I8{4zqiIh_tm}RmUE3 zQGBt}vvZ7XQ_Q&Yhe)#b?am+6_<4Fgl%d{RYkcf7E!6S>LBI3-!Qb=~0}kkSgLjeu zaL4|QKO!u-+s`))alwrMGo??!Pit_2Ha9e1!p=9=oPs@Bl0xv$I3VPsflRCy92sQ# zR%nYW!6O&LLxCSp=1&8pw9_;TN&3Sml2T_AQrl^S@c@oASYAcC4>w3(SyJh}VWayy z-*_2iHfKxggLToOK4Z4kQ~ED+Z6;c#Q;2(#-| z^hof_vw14pj5Xw~W4+Fx63d3*m*B&rb>Lj_{-_I*~wSRp_2CKSFbPYfnt5G$2< zKUjU1?!(1dU8BRzXeSc+2o5oCb<;Fx=cDOZxmG+fChzNyBnvtFMjgCOcJiE-xOafb z3~&6F8tJN$m28i9eQ|6DAb-m+=_EW0R;_MJ=2{f&)cg1>JJUR|nOWzWd~XpWS;1#I z`HIku{`jhmX9Hg>>p|^=6qG67@FIwMGR)xn*4DGnPP`t#`Z% z2vt7st`|!fr01&SSYt<7*o(hz);7&hy)q(b1gPjq>V=s~bxIE4CTR$FO!wQiq`;l2 z)5`p&%fNL7BZX)tB&RJm&R!m`+a_b7tqyU`)_Ee?<~}Am@#Ic7)6xF}p*?~=rvey& z-dav)moejL>2rBCCc?~*qs#l7STb(KY8=(nU*zRlGs#-GF4kZY4Le9|evFQ+QrefH zXqsK6GGS9Q(_~B^LfJ5kD%p@gp`*oYHA|!BFdc8Lbhdm*#@q5osYYp(Ws0yS7_Wk&8iXfs^`*brFzx^5HRikC#|)9#>L|=e}j0DLq^OO?d)hf8L{bpi!Kq&6=I6 zGa?EwO4W{_X_ZhfC4^Oph`)jrr&_{{O?IUhyQ?-buzD)xT{V|3tVf&k(Yvth246Zg zJ8Q{7VBHYX5Yd)s);2gzv<$7;+oEEJlmrb1KJFqNG~!#`0%R8HPUZ@el9@I zY%xO!|7eq%4g%JerKs)=%TtV?rK4$-%>;_)FN#lJ#=jR-(Z&rMJ!x~Y^`#p+_)n5mo&7*@Yr)t}$l|QRTr~|Uta)1RO*5*|nW-97^`WDAwf2mR>BDrJW9ygSje(Eu z+ZrwAQ#Q1dBG52FiKmn0^C&l9n>{Im5tl9>g$YxTJkKg~L=r zXSrOoSMfTB9K!fo8rGc>CQAmT{UR6e8;~2RGL;u-thJ`(7s*(1!_(PQ$Y4 zICVP;QGLa(`UbI~W2>F~wO%slS7qV45=7M0EuH5Tv8veW?ueqQ&)#%r-%W#PGXMgD zh5?aobrj8@eAdY_ade%Th_Tj~*ph+R+cvE4>`cj=# z>oLS-7V0cwH`K>GWXO`ArIkqyIEh<{{8f6N;#dOE>dFeHE%S0S6 zP3%~=O?)Jqg{HNegQ{^%CGwyK2WMXEH*?XUQC220D`td$FcS~8a(+Rgjq1h-9w81H z*#$UQ^xA6i}J~v>8NE4nV;8R6{ns%nCtw*fR3bj*+mxFlsHz{W|G*~Lx5t@&?l+{tk zlz1yNuR8!}w)D&w^`XV#Fb&G()OuJ4r0ty}?J@ObEd9*c^TA3ZOee)tT`FZRI2Y>S zW$DtRRcT~U#E2ZYiVJJ|E5@Zw&(zR^(CaqwHS=tG;o0ay?}f6fCygme?nKn8{YzA3 z! z4B@JrygPtRUr454{Q!p8`d`o@JvFS8c#~*Z?E1@T&GS_hnFEH^s~7vt`SZsz<^f{! zTtn>k=-;FRt!eL|I;Pj2-F* zGs&Dai0~SJfvAh;YMKcfUx9#z2&1dTql5T$RN9ml+p&0Lj~r_@abBsMsobeSPTVU< zT;qk+-H7z?Dn!F?&H{tbj;FeqTC8AJ@+IiRkKTV3C*&-mSuyDI*h2^Uu zovE<67juz0O(23_ma=wL7qp~k2L$x(Wt>4jpe@^%4rc`CU2&hMgsh`Fg3;np@~1tB zNR_vF*biZg^YiH|QiMPpdz*su8CrGCp zLhIYwPCze!)97E)M3?ZxdBA5|UvN>|!H*sEdu2~f)$AUGF^g1s`W-+1m5Nw#gvI`yEk7n_lGXIXZ(jv1%%o=x`^(k zN<%YVe2{?wHXuQY*+0G@ODDyT(zr>_ zW#aCQCm0Ske>ho;vXe)jwg5JdiHzqa+$?j!3ABdFwro~nIczAqE>1`?16326*ct*I{UA-z2UcViY9pAH>UgD9({d$^*x;__yjC`M`&Ru2DZu$+neeip!I8gwKwjJN z=r?jooph~t(CSmmrap$zmC(2**RE2qfJY8|F_Nqc7YbT6F$;k^4fmfKQPM8drotZl zB5O2wI`ubO+az6a@>uES?pOho!fJ^{y10l?TMpu+IF!zrC9$#ejdE};R`WG;;F#1~ z;(?cmAl46#O7%D*b&PNAhb?NY(HhVX(#*Km=RqAW#J*jIOH?d77fi5%rA+{3)nWP&TXc;j*+$YZWq zQZCICpbUTKRE!0fKIc(q^mu$u;fBwkcZvoqt`#m5$|z} z+2s!Qb02+NEKzvj%5(WBIya*^tH)Rwd@ zBh)*^N9N%s>Ib+QP*L3_*-t^btc`$@#F{`CBF{xYQ+`V9r)c_gJ#LZ zP!@p}Hks1p_*m@5YE`=3RFs3)Dbgl5IEX^AQVDquD-?)6EbFc^oln8c=L(0w0&rE) zXncuxWFg;K%9+!o- z&g>cF@<0^@6kSCxTeJf)DI^S>n>Kh>p-C~mJo87!7I=RpEO8tY+H$sywr91QjMG8pCT zJk2Q#thsIA3zb~4>@s4pP%^j@wy5R*2R(4bo)Uc{p@rog+{jp~4FOL3_*h|3*OS}0 zzmj2Tz`h4dW!{~I?Bj+GqbtIJ6R&MF^d2uJWDGy=r`T6kKCckJ_G;;6Hr&33tawbUU)IzFUJwv*QH`rROcZflOSVphB4v7P#}O5G)3vk`Ud0EL zLLj`hAc4Kz%ss&#ft2EKpMwAd`gjrx0So&jR zxaF1-6@*1#6I(@;{RKyNw*a*#|If3%!SHZ}@XXS1!bdpsuLA`Jfv+qs-viz@Cjy_DL8?g! zg7&7R+S*@^dno+AAJ0h$KM(!bm-*4Hfw`C9xS#@m2Y}>f?SpzBzG0AtI=mr2n*aFu za$@w?Vx;pxWNaM@{ngF=rs6pl$i??-$5`Q7@14tFDf8Fz)-H&s_fsVI$xoWIM`4doS68y@gGI{aTECQ zwM?Ci{-_)Y8nidm`u%*q-ru{_K(8g{5cHmq_1ykG8U4*5U=Zs3o|rr9f$BAz{vdq|=urH&>^mI5+ zq<>-9ZT9nfO;{hp8RVzRTd!iXN7C@1+in{F_^j=RuiwGVZ=0{JolWmr!JW&SlxW}$ zNnso*3lb<>()`#WhG(b$P12@>5cb45apeDNU!XI=<<(ilXhwJ~H-W51dATv-W%Nd$ zInvnO)9e*%hI99dgSaQoRVcT`&cX3ya{DOT&(XQhRm0~I^=X1*i4F1|lKIs&hRWFk zT(UhACAJH!FzOe;JUyKfzIN0vKEG`jWa^N86v53@L3Pwk#9@zE1uGsvKC1w7@iskq z5E?8Jb{9>-4{18sF14(d+WV9I(*=2w9JaXg5k3(>p(73k-*gs}$!ugdI8SR*9c~_Y z9b-YDcr{P@#5`w80KoDf)9Hbsd6WDh-j}UMSokZm3d1l-V~>1ms@}vXj0qq;nJ#yd zmx14|QZ%6ER%F+xp`Y6t9Hi==Tys$6=n-}g$E|kD07vN#y0`>oG{-i-f#_^1g`E)E zr~Hkv{0cs!H|#qWP-SXb@G1x|q~EKhw8Py&(wnPSe__wea*qLsZz~sCgEoZT8UIOd z=f0kBN5fp3O%R=K0(5y9+VC(z z^Dcer9s@yF=16df?Dc$7U5;xNfPd~WwS_`{bp3q1IKOH6{JQp;7;3>BEjhU$bdR%c z74C+3NWPhFTjTFC35Qz`?@yo3Kx)yy?>uBMb|@kvFt^FVcA{RsY})J-c5V`yr=E@n z{FyY*YC79xCRmgvakvh9YYk1mFV*~1!`N&K#h*9E5Rt_tF^P;qLXiFs1Qkad5vN9L z(i^w`3m?^N$+{80!3m&4vuw}p5dA0zFD8yvz`5-pY4#O=ba2mKvY~oYd3&t5ieXUh zcDj8^L1g3uypV+0XT2tzeSKP+gb>Frcu$7YZu-8_{Bqc6bz!IoXinC4?W~TtDMrIH zV9lM+L%|Pa=ZD8f`HtYzeuZi1<4kr&HYccnEP^HvET$)?Ad^PDd+Ds4?q4jd^dN& z-NtF?wP?Sv`QdYR{J5Om2>B6@!a%|kc7dEH5)DWE@94q8>tOD(Ynx$C0xh^CCBw9s#CAa~4(p3>dt) zZ?ZvmAdgBR0PlfCVnP{`MPw10I`q#?|MaxGKc@Lna7H1+pSEk4`o6SwMzID^&49@N zMSXC0Na}}-5MD22YN^K3Ww_dOm{1{*_eE;iNz_8TWM+4-isF={QO!T4N!-z?en&Fc zCi`2281ev+e>u=b@qmh+jTx!x4wMP($W(2mnuLrW^TC5JY3P1t32t$Q}skx@O;Qd;(iNNQM^%7nS@r_ zrXSBvy88)Sr>|L<`6Zh!wGB<)F6DycXuaMi)S|X8SudU$EB@0DZ|Sf=6+Qf)m&3!U z^y!q8_1%S_F1px(`oV^h`m-0UwYZI8E2ctxgV@Z+fMZyUoT+ART7e9=Gokf}bS^y= zYLrf#I+l(m7-0hIiOet!bo3B4uo*%hH(0Z>`No#^%B5tcJ@RYu7{)DPA?LO;R@SUj z2~zxYFeSDvO{WbHt5W#k#d5gHX=p$e#~$7X3cSO}HBq`p!%mm3dOWLPjv z8uq_sJ4?j8^zJO%=wa3ztUY3@DsczPbD2`YaO^tIX)YCGU+hA(vcr~kRu)pm+BbnY zYIsQdoj2J!%PBe6RV;1!@H4x5phq(4b=Txv*N~iWRum~A6b@!$(wkQbuZ>$;yo+l# z&~R2l?3AzjPCmse8AstVa>9K*)$Sx)RfDQKb_lPK%<7n9V}+#d{(KjhKXST8xVBcK ze|Wza@mMW1&?HsqV{a@tK6sumUvylT$$Wn$568|Xll6@cgY_L5`i4DOn75^*yyqa8 zs@xq(*>)}b^E;A*D)(8|7w!$Jo{I*)xdEie*AYP0wxXiht9GfDrEg8{wyBa3+}JO) zzpT}Eig{DrWC(T8GEtd;69n0J9-IW^r5Wz&^{3tUZw#85mk7t2tEtCn%o2BM3oQx_IOSh*c7WKz`Q z!oW>S7JC^xW>y1FhoqLCv*=X^D6qPLVh!qh^c^=6X(v_uinxoHA%stby_c5 z?BWwGX*>9*)726-<233EmrBY$+Ao2SCFJGwjvctoY$@9dnPOuxTh>wEm8WwDV;&6U zmaVQLS{r8!HVu9@rAhTAx;`~jHtWXQ%@=OmKJ%}(9(&v8;<^e+J3+Ev>Z+ILq%o~V zvg$l%fc}WYUb8T{8JR`6(bhfDmKOR^lq?EQQwesXPafrS=DJ~(J-4AL9L$??7`3bV zA<1%5S#$dNEK?{k;YI8iWqdv!VigCno&XvjNZ6hWi0UH6!A+z=HnS8;9%ne3CASk(4X~lETy{)zvVB?Zy?nKgn$t&CJC+2Qk-g2pjoNvRp_$aEXg8GQ z_72a9o3!*%K|3jB`D!9FD;hS|W@iSjJo8DlQEaA1KZ1_o$mkw9y4hK-&$9Kymk+uH zXJ8t%Zte}OGw#F>&$bEOxT~TLPm2c+3xQdt(ue$#erU!gSmI%mor7xo;-je?io zqZt>8NrjgkC_!CJ;wj}uzk)WC9L_E3xD|giM|;Nl^%gAG9}JjtnU(7txEI=PSB{=|t$={seZX zJ9mfXHI)(ZF~B|I&9AYQD=%QfZdlEa?uPVNMxs>6VRwt_%WPf~%Q6|4DqW29Zgw`5 zj5yNyvP0k2oJZ#p+(ek8TA$oZQNQ!YhL>BEoyj?@$66UL4rtTa+#tWI>T=>f_4YG1E zYRAT_n49XaIj)xn`=3rlhrbqccIer3J8YeA89eG}Jk+?W zGj8n_ap~hdMKyIhty=Hy`|^nOpf0(11&yuwq%b_zso7U$a`-2<&zu(T(Rn|Fy(v zbK^oD`m9zNIxo~LV98LSp|Xz9D6LR&Xs+H0v7l?Dc6G# zNoTglZuxJKJ)8EEW{NATs;`P@hp=F!x&hbsdz2kbO}X^#&Opx<*Pn^us7J7z_tSvz zsv?Q0;Yf)ybr1?C&phv6SW1}j0CZkZx<4vy8YqJcLXIl)0fA_#4b$Eism#G8>atvKB!tkXi!OR zjB#PkI&#R^)or&#uP3CEY8p?Ww2-WHhK;GhTmgN-$Pe*kJkBQNB=^jm94dvGnsC~Jk#Tuzbb^*_0%QB-Ki$ArWo zH>Rg?{zJNw7}?k-C5)Oz4;I*-X_euTQ}{zd(^^DOVCHPvGYmy<_z8$65DvoSGwMwa zGP4p(9r%PrBu{hJ?b4*4T9Y0$8!%IxLKURCCmWlx&wWu~uuJ*+u4!29_MsQVrvfwO z=N_BzThww^d&eHcdN{SAIS`s=jlZIz$Lm6RhE3Vjg)&HD=7qZDU)5!SEJ5QuJV4Ac|P|!|VLd#?tNhKqzWjv91O9AEo@9ae1zHI4_keVwxfs_3wJ6Pto z(}H$5pB=VAQw4*{F0Uz!+U)u*AVWj=Z&DGMpVgEZI~PepXIs9^@e!7|OvP!$%iDzL z1)#jtm`ZeD6am)6#lpx1#Xa=m)I~9z<+9A|Jd-91x6@e%zXqH5uPqq~8AkbwyL-CJ zcW7;X*YP8I!DpH=isS*g;A$<3cs420NmQZwz8IFd{Y~au`4R@Tyok6V#BfT<50q6e zeqH}e`weV857Yf@J1W_wvjo1^#AxO)Tc)Y}-XPxemogPwn5bWh2!|EtNA9lCzTyNX3KLsHdJFL0_5}-lNR8c$& zhJzJ`tC(0V1@NcUO-s+IN~#0Q^YEIemmcYD63Qsn6)O5o~;Ea}& zd-`(26pHn1{bm91e=v_Ca6}c~?&bzmcwM6yN7V-pd^$KI{$@!t+J5yJ`l-vX0SC1Z z2M3y6`UGtDyc>1BhJ_;`Jg!-!XLR1pqeu!35eyybV^$ZXrHHp{#z4WSnx0mmLxbKQ z!g*V)g|)_{1jh5kO=!xzfqY6IV-?OLlY@QK307J|?tllik8Uken^4aSA<+2|7`xYW za4+mpdIkpT*~?juML~7l^g$JkegsP4E!kx*-@Ry&aldN5YeM@kCFXc=P)-x%u~pv4 zSbuA~Db*TpbPHsXTS^Zl=~^C1Lui@`9pBHBF1(6Ph z&~jv-e2& zbQlKb`u&Gewy;i4$RpRIG`?s^2pEAUOq_vLb)ZrW)oD!>pq1zD+5GA;ZyN{<3(fAi zJ01c@#4yAH*K#+00d9nYP8okf76Bn5NTWYhYo^?EOidl>Mdt~Tub5?5zyuaN(Ic`M zooCIY_e&UABL1X2&)(ccJbI3A+A$48B>scsFoP8P6JYyVP>~mELVB%e28-CC;t-65 zlBW2rSrG^uOD)zxJ4`y~E>52v%WG)ilG;LLV^`omKUJr5nf6Bm>Y1qD zKi=t^R}TuQNO$ZTWs$Z(tC#0<2H)3^26}5gqmH4pQPyNKj-aYkftj89f%0 zau$uG9HCo$8j-Rt`PrlDKtN+Zp2eF5VLr);%X7Y=4D1bo%ZDHbWP~gEsvkgVz^&Gb z$5S<|XL_5kAQFEyxBnrSBg4?}{_@%N67HeCm55;jeqnkR^Wcm?5IBZCh&xKg{E=BW zR3-F#%6CP#Xn?!W>oT-Li$Nn}Si4=H4-E9b054xB-~a#s literal 0 HcmV?d00001 diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 5a4f1fc3e..32219a139 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -98,9 +98,9 @@ -define(FORMAT_FUN, {?MODULE, format_delayed}). -define(NOW, erlang:system_time(milli_seconds)). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Mnesia bootstrap -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ mnesia(boot) -> ok = mria:create_table(?TAB, [ {type, ordered_set}, @@ -110,9 +110,9 @@ mnesia(boot) -> {attributes, record_info(fields, delayed_message)} ]). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Hooks -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ on_message_publish( Msg = #message{ id = Id, @@ -143,9 +143,9 @@ on_message_publish( on_message_publish(Msg) -> {ok, Msg}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Start delayed publish server -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec start_link() -> emqx_types:startlink_ret(). start_link() -> @@ -270,9 +270,9 @@ post_config_update(_KeyPath, _ConfigReq, NewConf, _OldConf, _AppEnvs) -> Enable = maps:get(enable, NewConf, undefined), load_or_unload(Enable). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callback -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([]) -> ok = mria:wait_for_tables([?TAB]), @@ -335,9 +335,9 @@ terminate(_Reason, #{stats_timer := StatsTimer} = State) -> code_change(_Vsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Telemetry -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec get_basic_usage_info() -> #{delayed_message_count => non_neg_integer()}. get_basic_usage_info() -> @@ -348,9 +348,9 @@ get_basic_usage_info() -> end, #{delayed_message_count => DelayedCount}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Ensure the stats -spec ensure_stats_event(state()) -> state(). diff --git a/apps/emqx_modules/src/emqx_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl index 6bf8abb89..7f7955cf8 100644 --- a/apps/emqx_modules/src/emqx_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -49,9 +49,12 @@ %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). -%%-------------------------------------------------------------------- +-define(update(_Rules_), + emqx_conf:update([rewrite], _Rules_, #{override_to => cluster}) +). +%%------------------------------------------------------------------------------ %% Load/Unload -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ enable() -> emqx_conf:add_handler([rewrite], ?MODULE), @@ -67,7 +70,7 @@ list() -> emqx_conf:get_raw([<<"rewrite">>], []). update(Rules0) -> - case emqx_conf:update([rewrite], Rules0, #{override_to => cluster}) of + case ?update(Rules0) of {ok, _} -> ok; {error, Reason} -> @@ -109,18 +112,19 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) -> Binds = fill_client_binds(Message), {ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Telemetry -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec get_basic_usage_info() -> #{topic_rewrite_rule_count => non_neg_integer()}. get_basic_usage_info() -> RewriteRules = list(), #{topic_rewrite_rule_count => length(RewriteRules)}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ + compile(Rules) -> lists:foldl( fun(Rule, {Publish, Subscribe, Error}) -> diff --git a/apps/emqx_psk/src/emqx_psk.app.src b/apps/emqx_psk/src/emqx_psk.app.src index c3786bcc0..fc5fb707c 100644 --- a/apps/emqx_psk/src/emqx_psk.app.src +++ b/apps/emqx_psk/src/emqx_psk.app.src @@ -2,7 +2,7 @@ {application, emqx_psk, [ {description, "EMQX PSK"}, % strict semver, bump manually! - {vsn, "5.0.1"}, + {vsn, "5.0.2"}, {modules, []}, {registered, [emqx_psk_sup]}, {applications, [kernel, stdlib]}, diff --git a/apps/emqx_psk/src/emqx_psk.erl b/apps/emqx_psk/src/emqx_psk.erl index 65bdeab48..6bdf48c9b 100644 --- a/apps/emqx_psk/src/emqx_psk.erl +++ b/apps/emqx_psk/src/emqx_psk.erl @@ -17,6 +17,8 @@ -module(emqx_psk). -behaviour(gen_server). +-behaviour(emqx_db_backup). +-behaviour(emqx_config_backup). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). @@ -48,6 +50,12 @@ insert_psks/1 ]). +%% Data backup +-export([ + import_config/1, + backup_tables/0 +]). + -record(psk_entry, { psk_id :: binary(), shared_secret :: binary(), @@ -86,6 +94,12 @@ mnesia(boot) -> {storage_properties, [{ets, [{read_concurrency, true}]}]} ]). +%%------------------------------------------------------------------------------ +%% Data backup +%%------------------------------------------------------------------------------ + +backup_tables() -> [?TAB]. + %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -115,9 +129,27 @@ start_link() -> stop() -> gen_server:stop(?MODULE). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ +%% Data backup +%%------------------------------------------------------------------------------ + +import_config(#{<<"psk_authentication">> := PskConf}) -> + case emqx_conf:update([psk_authentication], PskConf, #{override_to => cluster}) of + {ok, _} -> + case get_config(enable) of + true -> load(); + false -> ok + end, + {ok, #{root_key => psk_authentication, changed => []}}; + Error -> + {error, #{root_key => psk_authentication, reason => Error}} + end; +import_config(_RawConf) -> + {ok, #{root_key => psk_authentication, changed => []}}. + +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init(_Opts) -> _ = diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index b81ea2446..b9a608f62 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -82,9 +82,9 @@ -callback clean(context()) -> ok. -callback size(context()) -> non_neg_integer(). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Hook API -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec on_session_subscribed(_, _, emqx_types:subopts(), _) -> any(). on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefined -> ok; @@ -118,9 +118,9 @@ on_message_publish(Msg = #message{flags = #{retain := true}}, Context) -> on_message_publish(Msg, _) -> {ok, Msg}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% APIs -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% @doc Start the retainer -spec start_link() -> emqx_types:startlink_ret(). @@ -169,9 +169,9 @@ call(Req) -> stats_fun() -> gen_server:cast(?MODULE, ?FUNCTION_NAME). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% APIs -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}. get_basic_usage_info() -> @@ -183,9 +183,9 @@ get_basic_usage_info() -> #{retained_messages => 0} end. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([]) -> erlang:process_flag(trap_exit, true), @@ -248,9 +248,9 @@ terminate(_Reason, #{clear_timer := ClearTimer}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -spec new_state() -> state(). new_state() -> #{ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 24ad2c5f0..83686cdcc 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -18,6 +18,7 @@ -behaviour(gen_server). -behaviour(emqx_config_handler). +-behaiour(emqx_config_backup). -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -78,6 +79,11 @@ code_change/3 ]). +%% Data backup +-export([ + import_config/1 +]). + -define(RULE_ENGINE, ?MODULE). -define(T_CALL, infinity). @@ -105,7 +111,7 @@ start_link() -> gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []). -%%------------------------------------------------------------------------------ +%%---------------------------------------------------------------------------------------- %% The config handler for emqx_rule_engine %%------------------------------------------------------------------------------ post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) -> @@ -142,9 +148,9 @@ post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRule {error, Error} end. -%%------------------------------------------------------------------------------ +%%---------------------------------------------------------------------------------------- %% APIs for rules -%%------------------------------------------------------------------------------ +%%---------------------------------------------------------------------------------------- -spec load_rules() -> ok. load_rules() -> @@ -185,9 +191,9 @@ delete_rule(RuleId) when is_binary(RuleId) -> insert_rule(Rule) -> gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL). -%%------------------------------------------------------------------------------ +%%---------------------------------------------------------------------------------------- %% Rule Management -%%------------------------------------------------------------------------------ +%%---------------------------------------------------------------------------------------- -spec get_rules() -> [rule()]. get_rules() -> @@ -301,9 +307,9 @@ unload_hooks_for_rule(#{id := Id, from := Topics}) -> Topics ). -%%------------------------------------------------------------------------------ +%%---------------------------------------------------------------------------------------- %% Telemetry helper functions -%%------------------------------------------------------------------------------ +%%---------------------------------------------------------------------------------------- -spec get_basic_usage_info() -> #{ @@ -362,9 +368,27 @@ tally_referenced_bridges(BridgeIDs, Acc0) -> BridgeIDs ). -%%------------------------------------------------------------------------------ +%%---------------------------------------------------------------------------------------- +%% Data backup +%%---------------------------------------------------------------------------------------- + +import_config(#{<<"rule_engine">> := #{<<"rules">> := NewRules} = RuleEngineConf}) -> + OldRules = emqx:get_raw_config(?KEY_PATH, #{}), + RuleEngineConf1 = RuleEngineConf#{<<"rules">> => maps:merge(OldRules, NewRules)}, + case emqx_conf:update([rule_engine], RuleEngineConf1, #{override_to => cluster}) of + {ok, #{raw_config := #{<<"rules">> := NewRawRules}}} -> + Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawRules, OldRules)), + ChangedPaths = [?RULE_PATH(Id) || Id <- maps:keys(Changed)], + {ok, #{root_key => rule_engine, changed => ChangedPaths}}; + Error -> + {error, #{root_key => rule_engine, reason => Error}} + end; +import_config(_RawConf) -> + {ok, #{root_key => rule_engine, changed => []}}. + +%%---------------------------------------------------------------------------------------- %% gen_server callbacks -%%------------------------------------------------------------------------------ +%%---------------------------------------------------------------------------------------- init([]) -> _TableId = ets:new(?KV_TAB, [ @@ -404,9 +428,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%------------------------------------------------------------------------------ +%%---------------------------------------------------------------------------------------- %% Internal Functions -%%------------------------------------------------------------------------------ +%%---------------------------------------------------------------------------------------- parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) -> case emqx_rule_sqlparser:parse(Sql) of diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index 1badabc38..c7888cd36 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -56,7 +56,8 @@ safe_to_existing_atom/2, pub_props_to_packet/1, safe_filename/1, - diff_lists/3 + diff_lists/3, + merge_lists/3 ]). -export([ @@ -819,6 +820,42 @@ diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) -> changed => lists:reverse(Changed) }. +%% @doc Merges two lists preserving the original order of elements in both lists. +%% KeyFunc must extract a unique key from each element. +%% If two keys exist in both lists, the value in List1 is superseded by the value in List2, but +%% the element position in the result list will equal its position in List1. +%% Example: +%% emqx_utils:merge_append_lists( +%% [#{id => a, val => old}, #{id => b, val => old}], +%% [#{id => a, val => new}, #{id => c}, #{id => b, val => new}, #{id => d}], +%% fun(#{id := Id}) -> Id end). +%% [#{id => a,val => new}, +%% #{id => b,val => new}, +%% #{id => c}, +%% #{id => d}] +-spec merge_lists(list(T), list(T), KeyFunc) -> list(T) when + KeyFunc :: fun((T) -> any()), + T :: any(). +merge_lists(List1, List2, KeyFunc) -> + WithKeysList2 = lists:map(fun(E) -> {KeyFunc(E), E} end, List2), + WithKeysList1 = lists:map( + fun(E) -> + K = KeyFunc(E), + case lists:keyfind(K, 1, WithKeysList2) of + false -> {K, E}; + WithKey1 -> WithKey1 + end + end, + List1 + ), + NewWithKeysList2 = lists:filter( + fun({K, _}) -> + not lists:keymember(K, 1, WithKeysList1) + end, + WithKeysList2 + ), + [E || {_, E} <- WithKeysList1 ++ NewWithKeysList2]. + search(_ExpectValue, _KeyFunc, []) -> false; search(ExpectValue, KeyFunc, [Item | List]) -> diff --git a/changes/ce/feat-10676.en.md b/changes/ce/feat-10676.en.md new file mode 100644 index 000000000..9628c0b74 --- /dev/null +++ b/changes/ce/feat-10676.en.md @@ -0,0 +1,4 @@ +Implement configuration and user data import/export CLI. + +The `emqx ctl export` and `emqx ctl import` commands allow to export configuration and built-in database +data from a running EMQX cluster and later import it to the same or another running EMQX cluster. diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl index 1390f9bfe..90127e629 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl @@ -5,6 +5,7 @@ -behaviour(gen_server). -behaviour(emqx_config_handler). +-behaviour(emqx_config_backup). -include("emqx_ee_schema_registry.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -13,9 +14,7 @@ %% API -export([ start_link/0, - get_serde/1, - add_schema/2, get_schema/1, delete_schema/1, @@ -34,6 +33,11 @@ %% `emqx_config_handler' API -export([post_config_update/5]). +%% Data backup +-export([ + import_config/1 +]). + -type schema() :: #{ type := serde_type(), source := binary(), @@ -129,7 +133,50 @@ post_config_update( {error, Reason, SerdesToRollback} -> lists:foreach(fun ensure_serde_absent/1, SerdesToRollback), {error, Reason} - end. + end; +post_config_update(?CONF_KEY_PATH, _Cmd, NewConf = #{schemas := NewSchemas}, OldConf, _AppEnvs) -> + OldSchemas = maps:get(schemas, OldConf, #{}), + #{ + added := Added, + changed := Changed0, + removed := Removed + } = emqx_utils_maps:diff_maps(NewSchemas, OldSchemas), + Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0), + RemovedNames = maps:keys(Removed), + case RemovedNames of + [] -> + ok; + _ -> + async_delete_serdes(RemovedNames) + end, + SchemasToBuild = maps:to_list(maps:merge(Changed, Added)), + case build_serdes(SchemasToBuild) of + ok -> + {ok, NewConf}; + {error, Reason, SerdesToRollback} -> + lists:foreach(fun ensure_serde_absent/1, SerdesToRollback), + {error, Reason} + end; +post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) -> + {ok, NewConf}. + +%%------------------------------------------------------------------------------------------------- +%% Data backup +%%------------------------------------------------------------------------------------------------- + +import_config(#{<<"schema_registry">> := #{<<"schemas">> := Schemas} = SchemaRegConf}) -> + OldSchemas = emqx:get_raw_config([?CONF_KEY_ROOT, schemas], #{}), + SchemaRegConf1 = SchemaRegConf#{<<"schemas">> => maps:merge(OldSchemas, Schemas)}, + case emqx_conf:update(?CONF_KEY_PATH, SchemaRegConf1, #{override_to => cluster}) of + {ok, #{raw_config := #{<<"schemas">> := NewRawSchemas}}} -> + Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawSchemas, OldSchemas)), + ChangedPaths = [[?CONF_KEY_ROOT, schemas, Name] || Name <- maps:keys(Changed)], + {ok, #{root_key => ?CONF_KEY_ROOT, changed => ChangedPaths}}; + Error -> + {error, #{root_key => ?CONF_KEY_ROOT, reason => Error}} + end; +import_config(_RawConf) -> + {ok, #{root_key => ?CONF_KEY_ROOT, changed => []}}. %%------------------------------------------------------------------------------------------------- %% `gen_server' API diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl index 195a54c15..85d35be1f 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl @@ -11,9 +11,13 @@ start(_StartType, _StartArgs) -> ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity), + %% HTTP API handler emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry), + %% Conf load / data import handler + emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry), emqx_ee_schema_registry_sup:start_link(). stop(_State) -> emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']), + emqx_conf:remove_handler(?CONF_KEY_PATH), ok. diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index 71f7c7d8b..9167fed9e 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -700,3 +700,34 @@ t_cluster_serde_build(Config) -> ] ), ok. + +t_import_config(_Config) -> + RawConf = #{ + <<"schema_registry">> => + #{ + <<"schemas">> => + #{ + <<"my_avro_schema">> => + #{ + <<"description">> => <<"My Avro Schema">>, + <<"source">> => + <<"{\"type\":\"record\",\"fields\":[{\"type\":\"int\",\"name\":\"i\"},{\"type\":\"string\",\"name\":\"s\"}]}">>, + <<"type">> => <<"avro">> + } + } + } + }, + RawConf1 = emqx_utils_maps:deep_put( + [<<"schema_registry">>, <<"schemas">>, <<"my_avro_schema">>, <<"description">>], + RawConf, + <<"Updated description">> + ), + Path = [schema_registry, schemas, <<"my_avro_schema">>], + ?assertEqual( + {ok, #{root_key => schema_registry, changed => []}}, + emqx_ee_schema_registry:import_config(RawConf) + ), + ?assertEqual( + {ok, #{root_key => schema_registry, changed => [Path]}}, + emqx_ee_schema_registry:import_config(RawConf1) + ).