Merge pull request #10676 from SergeTupchiy/EMQX-9203-config-backup

feat: implement configuration and user data export/import
This commit is contained in:
SergeTupchiy 2023-06-09 14:18:39 +03:00 committed by GitHub
commit e61b2100a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 2018 additions and 140 deletions

View File

@ -0,0 +1,24 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_config_backup).
-callback import_config(RawConf :: map()) ->
{ok, #{
root_key => emqx_utils_maps:config_key(),
changed => [emqx_utils_maps:config_path()]
}}
| {error, #{root_key => emqx_utils_maps:config_key(), reason => term()}}.

View File

@ -0,0 +1,19 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_db_backup).
-callback backup_tables() -> [mria:table()].

View File

@ -17,6 +17,7 @@
-module(emqx_banned).
-behaviour(gen_server).
-behaviour(emqx_db_backup).
-include("emqx.hrl").
-include("logger.hrl").
@ -50,6 +51,8 @@
code_change/3
]).
-export([backup_tables/0]).
%% Internal exports (RPC)
-export([
expire_banned_items/1
@ -82,6 +85,11 @@ mnesia(boot) ->
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]).
%%--------------------------------------------------------------------
%% Data backup
%%--------------------------------------------------------------------
backup_tables() -> [?BANNED_TAB].
%% @doc Start the banned server.
-spec start_link() -> startlink_ret().
start_link() ->

View File

@ -843,7 +843,9 @@ convert_certs(ListenerConf) ->
Listeners1 =
maps:fold(
fun(Name, Conf, Acc1) ->
Acc1#{Name => convert_certs(Type, Name, Conf)}
Conf1 = convert_certs(Type, Name, Conf),
Conf2 = convert_authn_certs(Type, Name, Conf1),
Acc1#{Name => Conf2}
end,
#{},
Listeners0
@ -866,6 +868,19 @@ convert_certs(Type, Name, Conf) ->
throw({bad_ssl_config, Reason})
end.
convert_authn_certs(Type, Name, #{<<"authentication">> := AuthNList} = Conf) ->
ChainName = listener_id(Type, Name),
AuthNList1 = lists:map(
fun(AuthN) ->
CertsDir = emqx_authentication_config:certs_dir(ChainName, AuthN),
emqx_authentication_config:convert_certs(CertsDir, AuthN)
end,
AuthNList
),
Conf#{<<"authentication">> => AuthNList1};
convert_authn_certs(_Type, _Name, Conf) ->
Conf.
filter_stacktrace({Reason, _Stacktrace}) -> Reason;
filter_stacktrace(Reason) -> Reason.

View File

@ -33,11 +33,11 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%----------------------------------------------------------------------
% Based on `otp/lib/ssl/src/ssl_crl_cache.erl'
%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Based on `otp/lib/ssl/src/ssl_crl_cache.erl'
%%----------------------------------------------------------------------
%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Purpose: Simple default CRL cache
%%----------------------------------------------------------------------

View File

@ -859,6 +859,12 @@ setup_node(Node, Opts) when is_map(Opts) ->
%% Setting env before starting any applications
set_envs(Node, Env),
NodeDataDir = filename:join([
PrivDataDir,
node(),
integer_to_list(erlang:unique_integer())
]),
%% Here we start the apps
EnvHandlerForRpc =
fun(App) ->
@ -870,17 +876,10 @@ setup_node(Node, Opts) when is_map(Opts) ->
%% to avoid sharing data between executions and/or
%% nodes. these variables might not be in the
%% config file (e.g.: emqx_enterprise_schema).
NodeDataDir = filename:join([
PrivDataDir,
node(),
integer_to_list(erlang:unique_integer())
]),
Cookie = atom_to_list(erlang:get_cookie()),
os:putenv("EMQX_NODE__DATA_DIR", NodeDataDir),
os:putenv("EMQX_NODE__COOKIE", Cookie),
emqx_config:init_load(SchemaMod),
os:unsetenv("EMQX_NODE__DATA_DIR"),
os:unsetenv("EMQX_NODE__COOKIE"),
application:set_env(emqx, init_config_load_done, true)
end,

View File

@ -16,6 +16,8 @@
-module(emqx_authn).
-behaviour(emqx_config_backup).
-export([
providers/0,
check_config/1,
@ -24,6 +26,11 @@
get_enabled_authns/0
]).
%% Data backup
-export([
import_config/1
]).
-include("emqx_authn.hrl").
providers() ->
@ -126,3 +133,32 @@ get_enabled_authns() ->
tally_authenticators(#{id := AuthenticatorName}, Acc) ->
maps:update_with(AuthenticatorName, fun(N) -> N + 1 end, 1, Acc).
%%------------------------------------------------------------------------------
%% Data backup
%%------------------------------------------------------------------------------
-define(IMPORT_OPTS, #{override_to => cluster}).
import_config(RawConf) ->
AuthnList = authn_list(maps:get(?CONF_NS_BINARY, RawConf, [])),
OldAuthnList = emqx:get_raw_config([?CONF_NS_BINARY], []),
MergedAuthnList = emqx_utils:merge_lists(
OldAuthnList, AuthnList, fun emqx_authentication:authenticator_id/1
),
case emqx_conf:update([?CONF_NS_ATOM], MergedAuthnList, ?IMPORT_OPTS) of
{ok, #{raw_config := NewRawConf}} ->
{ok, #{root_key => ?CONF_NS_ATOM, changed => changed_paths(OldAuthnList, NewRawConf)}};
Error ->
{error, #{root_key => ?CONF_NS_ATOM, reason => Error}}
end.
changed_paths(OldAuthnList, NewAuthnList) ->
KeyFun = fun emqx_authentication:authenticator_id/1,
Changed = maps:get(changed, emqx_utils:diff_lists(NewAuthnList, OldAuthnList, KeyFun)),
[[?CONF_NS_BINARY, emqx_authentication:authenticator_id(OldAuthn)] || {OldAuthn, _} <- Changed].
authn_list(Authn) when is_list(Authn) ->
Authn;
authn_list(Authn) when is_map(Authn) ->
[Authn].

View File

@ -22,6 +22,7 @@
-behaviour(hocon_schema).
-behaviour(emqx_authentication).
-behaviour(emqx_db_backup).
-export([
namespace/0,
@ -54,6 +55,8 @@
group_match_spec/1
]).
-export([backup_tables/0]).
%% Internal exports (RPC)
-export([
do_destroy/1,
@ -101,6 +104,12 @@ mnesia(boot) ->
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]).
%%------------------------------------------------------------------------------
%% Data backup
%%------------------------------------------------------------------------------
backup_tables() -> [?TAB].
%%------------------------------------------------------------------------------
%% Hocon Schema
%%------------------------------------------------------------------------------
@ -357,6 +366,9 @@ check_client_final_message(Bin, #{is_superuser := IsSuperuser} = Cache, #{algori
add_user(UserGroup, UserID, Password, IsSuperuser, State) ->
{StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(Password, State),
write_user(UserGroup, UserID, StoredKey, ServerKey, Salt, IsSuperuser).
write_user(UserGroup, UserID, StoredKey, ServerKey, Salt, IsSuperuser) ->
UserInfo = #user_info{
user_id = {UserGroup, UserID},
stored_key = StoredKey,

View File

@ -23,6 +23,7 @@
-behaviour(hocon_schema).
-behaviour(emqx_authentication).
-behaviour(emqx_db_backup).
-export([
namespace/0,
@ -66,6 +67,10 @@
import_csv/3
]).
-export([mnesia/1]).
-export([backup_tables/0]).
-type user_group() :: binary().
-type user_id() :: binary().
@ -76,8 +81,6 @@
is_superuser :: boolean()
}).
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-define(TAB, ?MODULE).
@ -103,6 +106,11 @@ mnesia(boot) ->
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]).
%%------------------------------------------------------------------------------
%% Data backup
%%------------------------------------------------------------------------------
backup_tables() -> [?TAB].
%%------------------------------------------------------------------------------
%% Hocon Schema
%%------------------------------------------------------------------------------

View File

@ -15,7 +15,9 @@
%%--------------------------------------------------------------------
-module(emqx_authz).
-behaviour(emqx_config_handler).
-behaviour(emqx_config_backup).
-include("emqx_authz.hrl").
-include_lib("emqx/include/logger.hrl").
@ -44,6 +46,13 @@
-export([acl_conf_file/0]).
%% Data backup
-export([
import_config/1,
maybe_read_acl_file/1,
maybe_write_acl_file/1
]).
-type source() :: map().
-type match_result() :: {matched, allow} | {matched, deny} | nomatch.
@ -326,9 +335,9 @@ init_metrics(Source) ->
)
end.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% AuthZ callbacks
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% @doc Check AuthZ
-spec authorize(
@ -451,9 +460,58 @@ do_authorize(
get_enabled_authzs() ->
lists:usort([Type || #{type := Type, enable := true} <- lookup()]).
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Data backup
%%------------------------------------------------------------------------------
import_config(#{?CONF_NS_BINARY := AuthzConf}) ->
Sources = maps:get(<<"sources">>, AuthzConf, []),
OldSources = emqx:get_raw_config(?CONF_KEY_PATH, []),
MergedSources = emqx_utils:merge_lists(OldSources, Sources, fun type/1),
MergedAuthzConf = AuthzConf#{<<"sources">> => MergedSources},
case emqx_conf:update([?CONF_NS_ATOM], MergedAuthzConf, #{override_to => cluster}) of
{ok, #{raw_config := #{<<"sources">> := NewSources}}} ->
{ok, #{
root_key => ?CONF_NS_ATOM,
changed => changed_paths(OldSources, NewSources)
}};
Error ->
{error, #{root_key => ?CONF_NS_ATOM, reason => Error}}
end;
import_config(_RawConf) ->
{ok, #{root_key => ?CONF_NS_ATOM, changed => []}}.
changed_paths(OldSources, NewSources) ->
Changed = maps:get(changed, emqx_utils:diff_lists(NewSources, OldSources, fun type/1)),
[?CONF_KEY_PATH ++ [type(OldSource)] || {OldSource, _} <- Changed].
maybe_read_acl_file(RawConf) ->
maybe_convert_acl_file(RawConf, fun read_acl_file/1).
maybe_write_acl_file(RawConf) ->
maybe_convert_acl_file(RawConf, fun write_acl_file/1).
maybe_convert_acl_file(
#{?CONF_NS_BINARY := #{<<"sources">> := Sources} = AuthRawConf} = RawConf, Fun
) ->
Sources1 = lists:map(
fun
(#{<<"type">> := <<"file">>} = FileSource) -> Fun(FileSource);
(Source) -> Source
end,
Sources
),
RawConf#{?CONF_NS_BINARY => AuthRawConf#{<<"sources">> => Sources1}};
maybe_convert_acl_file(RawConf, _Fun) ->
RawConf.
read_acl_file(#{<<"path">> := Path} = Source) ->
{ok, Rules} = emqx_authz_file:read_file(Path),
maps:remove(<<"path">>, Source#{<<"rules">> => Rules}).
%%------------------------------------------------------------------------------
%% Internal function
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
client_info_source() ->
emqx_authz_client_info:create(

View File

@ -42,6 +42,7 @@
}).
-behaviour(emqx_authz).
-behaviour(emqx_db_backup).
%% AuthZ Callbacks
-export([
@ -65,6 +66,8 @@
record_count/0
]).
-export([backup_tables/0]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
@ -119,6 +122,12 @@ authorize(
end,
do_authorize(Client, PubSub, Topic, Rules).
%%--------------------------------------------------------------------
%% Data backup
%%--------------------------------------------------------------------
backup_tables() -> [?ACL_TABLE].
%%--------------------------------------------------------------------
%% Management API
%%--------------------------------------------------------------------

View File

@ -16,6 +16,8 @@
-module(emqx_auto_subscribe).
-behaviour(emqx_config_backup).
-include_lib("emqx/include/emqx_hooks.hrl").
-behaviour(emqx_config_handler).
@ -24,7 +26,6 @@
-define(MAX_AUTO_SUBSCRIBE, 20).
%
-export([load/0, unload/0]).
-export([
@ -40,6 +41,11 @@
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
%% Data backup
-export([
import_config/1
]).
load() ->
ok = emqx_conf:add_handler([auto_subscribe, topics], ?MODULE),
update_hook().
@ -73,8 +79,9 @@ post_config_update(_KeyPath, _Req, NewTopics, _OldConf, _AppEnvs) ->
Config = emqx_conf:get([auto_subscribe], #{}),
update_hook(Config#{topics => NewTopics}).
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% hook
%%------------------------------------------------------------------------------
on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) ->
case erlang:apply(TopicHandler, handle, [ClientInfo, ConnInfo, Options]) of
@ -87,17 +94,38 @@ on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) ->
on_client_connected(_, _, _) ->
ok.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Telemetry
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
-spec get_basic_usage_info() -> #{auto_subscribe_count => non_neg_integer()}.
get_basic_usage_info() ->
AutoSubscribe = emqx_conf:get([auto_subscribe, topics], []),
#{auto_subscribe_count => length(AutoSubscribe)}.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Data backup
%%------------------------------------------------------------------------------
import_config(#{<<"auto_subscribe">> := #{<<"topics">> := Topics}}) ->
ConfPath = [auto_subscribe, topics],
OldTopics = emqx:get_raw_config(ConfPath, []),
KeyFun = fun(#{<<"topic">> := T}) -> T end,
MergedTopics = emqx_utils:merge_lists(OldTopics, Topics, KeyFun),
case emqx_conf:update(ConfPath, MergedTopics, #{override_to => cluster}) of
{ok, #{raw_config := NewTopics}} ->
Changed = maps:get(changed, emqx_utils:diff_lists(NewTopics, OldTopics, KeyFun)),
Changed1 = [ConfPath ++ [T] || {#{<<"topic">> := T}, _} <- Changed],
{ok, #{root_key => auto_subscribe, changed => Changed1}};
Error ->
{error, #{root_key => auto_subscribe, reason => Error}}
end;
import_config(_RawConf) ->
{ok, #{root_key => auto_subscribe, changed => []}}.
%%------------------------------------------------------------------------------
%% internal
%%------------------------------------------------------------------------------
format(Rules) when is_list(Rules) ->
[format(Rule) || Rule <- Rules];

View File

@ -14,13 +14,19 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge).
-behaviour(emqx_config_handler).
-behaviour(emqx_config_backup).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([post_config_update/5]).
-export([
pre_config_update/3,
post_config_update/5
]).
-export([
load_hook/0,
@ -53,6 +59,11 @@
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
%% Data backup
-export([
import_config/1
]).
-define(EGRESS_DIR_BRIDGES(T),
T == webhook;
T == mysql;
@ -80,8 +91,10 @@
T == iotdb
).
-define(ROOT_KEY, bridges).
load() ->
Bridges = emqx:get_config([bridges], #{}),
Bridges = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach(
fun({Type, NamedConf}) ->
lists:foreach(
@ -98,7 +111,7 @@ load() ->
unload() ->
unload_hook(),
Bridges = emqx:get_config([bridges], #{}),
Bridges = emqx:get_config([?ROOT_KEY], #{}),
lists:foreach(
fun({Type, NamedConf}) ->
lists:foreach(
@ -139,7 +152,7 @@ reload_hook(Bridges) ->
ok = load_hook(Bridges).
load_hook() ->
Bridges = emqx:get_config([bridges], #{}),
Bridges = emqx:get_config([?ROOT_KEY], #{}),
load_hook(Bridges).
load_hook(Bridges) ->
@ -210,7 +223,7 @@ send_message(BridgeId, Message) ->
send_message(BridgeType, BridgeName, ResId, Message).
send_message(BridgeType, BridgeName, ResId, Message) ->
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
not_found ->
{error, bridge_not_found};
#{enable := true} = Config ->
@ -231,9 +244,14 @@ query_opts(Config) ->
end.
config_key_path() ->
[bridges].
[?ROOT_KEY].
post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
{ok, RawConf};
pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
{ok, convert_certs(NewConf)}.
post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated} =
diff_confs(NewConf, OldConf),
%% The config update will be failed if any task in `perform_bridge_changes` failed.
@ -351,10 +369,74 @@ check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) ->
remove(BridgeType, BridgeName)
end.
%%----------------------------------------------------------------------------------------
%% Data backup
%%----------------------------------------------------------------------------------------
import_config(RawConf) ->
RootKeyPath = config_key_path(),
BridgesConf = maps:get(<<"bridges">>, RawConf, #{}),
OldBridgesConf = emqx:get_raw_config(RootKeyPath, #{}),
MergedConf = merge_confs(OldBridgesConf, BridgesConf),
case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of
{ok, #{raw_config := NewRawConf}} ->
{ok, #{root_key => ?ROOT_KEY, changed => changed_paths(OldBridgesConf, NewRawConf)}};
Error ->
{error, #{root_key => ?ROOT_KEY, reason => Error}}
end.
merge_confs(OldConf, NewConf) ->
AllTypes = maps:keys(maps:merge(OldConf, NewConf)),
lists:foldr(
fun(Type, Acc) ->
NewBridges = maps:get(Type, NewConf, #{}),
OldBridges = maps:get(Type, OldConf, #{}),
Acc#{Type => maps:merge(OldBridges, NewBridges)}
end,
#{},
AllTypes
).
changed_paths(OldRawConf, NewRawConf) ->
maps:fold(
fun(Type, Bridges, ChangedAcc) ->
OldBridges = maps:get(Type, OldRawConf, #{}),
Changed = maps:get(changed, emqx_utils_maps:diff_maps(Bridges, OldBridges)),
[[?ROOT_KEY, Type, K] || K <- maps:keys(Changed)] ++ ChangedAcc
end,
[],
NewRawConf
).
%%========================================================================================
%% Helper functions
%%========================================================================================
convert_certs(BridgesConf) ->
maps:map(
fun(Type, Bridges) ->
maps:map(
fun(Name, BridgeConf) ->
Path = filename:join([?ROOT_KEY, Type, Name]),
case emqx_connector_ssl:convert_certs(Path, BridgeConf) of
{error, Reason} ->
?SLOG(error, #{
msg => "bad_ssl_config",
type => Type,
name => Name,
reason => Reason
}),
throw({bad_ssl_config, Reason});
{ok, BridgeConf1} ->
BridgeConf1
end
end,
Bridges
)
end,
BridgesConf
).
perform_bridge_changes(Tasks) ->
perform_bridge_changes(Tasks, ok).

View File

@ -1,4 +1,4 @@
%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------

View File

@ -1,4 +1,4 @@
%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------

View File

@ -24,6 +24,8 @@
-boot_mnesia({mnesia, [boot]}).
-behaviour(emqx_db_backup).
%% Mnesia bootstrap
-export([mnesia/1]).
@ -54,6 +56,8 @@
default_username/0
]).
-export([backup_tables/0]).
-type emqx_admin() :: #?ADMIN{}.
-define(BOOTSTRAP_USER_TAG, <<"bootstrap user">>).
@ -76,6 +80,12 @@ mnesia(boot) ->
]}
]).
%%--------------------------------------------------------------------
%% Data backup
%%--------------------------------------------------------------------
backup_tables() -> [?ADMIN].
%%--------------------------------------------------------------------
%% bootstrap API
%%--------------------------------------------------------------------

View File

@ -18,6 +18,7 @@
-module(emqx_exhook_mgr).
-behaviour(gen_server).
-behaviour(emqx_config_backup).
-include("emqx_exhook.hrl").
-include_lib("emqx/include/logger.hrl").
@ -66,6 +67,11 @@
-export([roots/0]).
%% Data backup
-export([
import_config/1
]).
%% Running servers
-type state() :: #{servers := servers()}.
@ -98,9 +104,9 @@
-export_type([servers/0, server/0]).
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
-spec start_link() ->
ignore
@ -137,7 +143,7 @@ call(Req) ->
init_ref_counter_table() ->
_ = ets:new(?HOOKS_REF_COUNTER, [named_table, public]).
%%=====================================================================
%%========================================================================================
%% Hocon schema
roots() ->
emqx_exhook_schema:server_config().
@ -179,9 +185,30 @@ post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) ->
Result = call({update_config, UpdateReq, NewConf, OldConf}),
{ok, Result}.
%%--------------------------------------------------------------------
%%========================================================================================
%%----------------------------------------------------------------------------------------
%% Data backup
%%----------------------------------------------------------------------------------------
import_config(#{<<"exhook">> := #{<<"servers">> := Servers} = ExHook}) ->
OldServers = emqx:get_raw_config(?SERVERS, []),
KeyFun = fun(#{<<"name">> := Name}) -> Name end,
ExHook1 = ExHook#{<<"servers">> => emqx_utils:merge_lists(OldServers, Servers, KeyFun)},
case emqx_conf:update(?EXHOOK, ExHook1, #{override_to => cluster}) of
{ok, #{raw_config := #{<<"servers">> := NewRawServers}}} ->
Changed = maps:get(changed, emqx_utils:diff_lists(NewRawServers, OldServers, KeyFun)),
ChangedPaths = [?SERVERS ++ [Name] || {#{<<"name">> := Name}, _} <- Changed],
{ok, #{root_key => ?EXHOOK, changed => ChangedPaths}};
Error ->
{error, #{root_key => ?EXHOOK, reason => Error}}
end;
import_config(_RawConf) ->
{ok, #{root_key => ?EXHOOK, changed => []}}.
%%----------------------------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
init([]) ->
process_flag(trap_exit, true),
@ -333,9 +360,9 @@ terminate(Reason, State = #{servers := Servers}) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
unload_exhooks() ->
[
@ -572,7 +599,7 @@ update_servers(Servers, State) ->
set_disable(Server) ->
Server#{status := disabled, timer := undefined}.
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% Server state persistent
save(Name, ServerState) ->
Saved = persistent_term:get(?APP, []),

View File

@ -18,6 +18,7 @@
-module(emqx_gateway_conf).
-behaviour(emqx_config_handler).
-behaviour(emqx_config_backup).
%% Load/Unload
-export([
@ -64,6 +65,11 @@
post_config_update/5
]).
%% Data backup
-export([
import_config/1
]).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_authentication.hrl").
-define(AUTHN_BIN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY).
@ -76,9 +82,9 @@
-define(IS_SSL(T), (T == <<"ssl_options">> orelse T == <<"dtls_options">>)).
-define(IGNORE_KEYS, [<<"listeners">>, ?AUTHN_BIN]).
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
-define(GATEWAY, [gateway]).
-spec load() -> ok.
@ -89,7 +95,7 @@ load() ->
unload() ->
emqx_conf:remove_handler(?GATEWAY).
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% APIs
-spec load_gateway(atom_or_bin(), map()) -> map_or_err().
@ -365,9 +371,26 @@ ret_listener_or_err(GwName, {LType, LName}, {ok, #{raw_config := GwConf}}) ->
ret_listener_or_err(_, _, Err) ->
Err.
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% Data backup
%%----------------------------------------------------------------------------------------
import_config(RawConf) ->
GatewayConf = maps:get(<<"gateway">>, RawConf, #{}),
OldGatewayConf = emqx:get_raw_config([<<"gateway">>], #{}),
MergedConf = maps:merge(OldGatewayConf, GatewayConf),
case emqx_conf:update([gateway], MergedConf, #{override_to => cluster}) of
{ok, #{raw_config := NewRawConf}} ->
Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawConf, OldGatewayConf)),
ChangedPaths = [[gateway, GwName] || GwName <- maps:keys(Changed)],
{ok, #{root_key => gateway, changed => ChangedPaths}};
Error ->
{error, #{root_key => gateway, reason => Error}}
end.
%%----------------------------------------------------------------------------------------
%% Config Handler
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
-spec pre_config_update(
list(atom()),
@ -793,9 +816,9 @@ post_config_update(?GATEWAY, _Req = #{}, NewConfig, OldConfig, _AppEnvs) ->
),
ok.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% Internal funcs
%%----------------------------------------------------------------------------------------
tune_gw_certs(Fun, GwName, Conf) ->
apply_to_gateway_basic_confs(

View File

@ -183,7 +183,7 @@ delete(Keys, Fields) ->
lists:foldl(fun(Key, Acc) -> lists:keydelete(Key, 1, Acc) end, Fields, Keys).
api_key(get, _) ->
{200, [format(App) || App <- emqx_mgmt_auth:list()]};
{200, [emqx_mgmt_auth:format(App) || App <- emqx_mgmt_auth:list()]};
api_key(post, #{body := App}) ->
#{
<<"name">> := Name,
@ -194,7 +194,7 @@ api_key(post, #{body := App}) ->
Desc = unicode:characters_to_binary(Desc0, unicode),
case emqx_mgmt_auth:create(Name, Enable, ExpiredAt, Desc) of
{ok, NewApp} ->
{200, format(NewApp)};
{200, emqx_mgmt_auth:format(NewApp)};
{error, Reason} ->
{400, #{
code => 'BAD_REQUEST',
@ -206,7 +206,7 @@ api_key(post, #{body := App}) ->
api_key_by_name(get, #{bindings := #{name := Name}}) ->
case emqx_mgmt_auth:read(Name) of
{ok, App} -> {200, format(App)};
{ok, App} -> {200, emqx_mgmt_auth:format(App)};
{error, not_found} -> {404, ?NOT_FOUND_RESPONSE}
end;
api_key_by_name(delete, #{bindings := #{name := Name}}) ->
@ -219,20 +219,9 @@ api_key_by_name(put, #{bindings := #{name := Name}, body := Body}) ->
ExpiredAt = ensure_expired_at(Body),
Desc = maps:get(<<"desc">>, Body, undefined),
case emqx_mgmt_auth:update(Name, Enable, ExpiredAt, Desc) of
{ok, App} -> {200, format(App)};
{ok, App} -> {200, emqx_mgmt_auth:format(App)};
{error, not_found} -> {404, ?NOT_FOUND_RESPONSE}
end.
format(App = #{expired_at := ExpiredAt0, created_at := CreateAt}) ->
ExpiredAt =
case ExpiredAt0 of
infinity -> <<"infinity">>;
_ -> list_to_binary(calendar:system_time_to_rfc3339(ExpiredAt0))
end,
App#{
expired_at => ExpiredAt,
created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt))
}.
ensure_expired_at(#{<<"expired_at">> := ExpiredAt}) when is_integer(ExpiredAt) -> ExpiredAt;
ensure_expired_at(_) -> infinity.

View File

@ -19,7 +19,6 @@
-behaviour(minirest_api).
-export([namespace/0, api_spec/0, paths/0, schema/1, fields/1]).
-import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]).
-export([
listener_type_status/2,
@ -36,6 +35,16 @@
do_list_listeners/0
]).
-import(emqx_dashboard_swagger, [error_codes/2, error_codes/1]).
-import(emqx_mgmt_listeners_conf, [
action/4,
create/3,
ensure_remove/2,
get_raw/2,
update/3
]).
-include_lib("emqx/include/emqx.hrl").
-include_lib("hocon/include/hoconsc.hrl").
@ -44,7 +53,6 @@
-define(LISTENER_NOT_FOUND, <<"Listener id not found">>).
-define(LISTENER_ID_INCONSISTENT, <<"Path and body's listener id not match">>).
-define(ADDR_PORT_INUSE, <<"Addr port in use">>).
-define(OPTS(_OverrideTo_), #{rawconf_with_defaults => true, override_to => _OverrideTo_}).
namespace() -> "listeners".
@ -387,14 +395,13 @@ crud_listeners_by_id(get, #{bindings := #{id := Id0}}) ->
crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
case parse_listener_conf(Body0) of
{Id, Type, Name, Conf} ->
Path = [listeners, Type, Name],
case emqx_conf:get_raw(Path, undefined) of
case get_raw(Type, Name) of
undefined ->
{404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
PrevConf ->
MergeConfT = emqx_utils_maps:deep_merge(PrevConf, Conf),
MergeConf = emqx_listeners:ensure_override_limiter_conf(MergeConfT, Conf),
case update(Path, MergeConf) of
case update(Type, Name, MergeConf) of
{ok, #{raw_config := _RawConf}} ->
crud_listeners_by_id(get, #{bindings => #{id => Id}});
{error, not_found} ->
@ -412,7 +419,7 @@ crud_listeners_by_id(post, #{body := Body}) ->
create_listener(Body);
crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
case ensure_remove([listeners, Type, Name]) of
case ensure_remove(Type, Name) of
{ok, _} -> {204};
{error, Reason} -> {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
end.
@ -457,12 +464,11 @@ restart_listeners_by_id(Method, Body = #{bindings := Bindings}) ->
action_listeners_by_id(post, #{bindings := #{id := Id, action := Action}}) ->
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
Path = [listeners, Type, Name],
case emqx_conf:get_raw(Path, undefined) of
case get_raw(Type, Name) of
undefined ->
{404, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_NOT_FOUND}};
_PrevConf ->
case action(Path, Action, enabled(Action)) of
case action(Type, Name, Action, enabled(Action)) of
{ok, #{raw_config := _RawConf}} ->
{200};
{error, not_found} ->
@ -634,23 +640,6 @@ max_conn(_Int1, <<"infinity">>) -> <<"infinity">>;
max_conn(<<"infinity">>, _Int) -> <<"infinity">>;
max_conn(Int1, Int2) -> Int1 + Int2.
update(Path, Conf) ->
wrap(emqx_conf:update(Path, {update, Conf}, ?OPTS(cluster))).
action(Path, Action, Conf) ->
wrap(emqx_conf:update(Path, {action, Action, Conf}, ?OPTS(cluster))).
create(Path, Conf) ->
wrap(emqx_conf:update(Path, {create, Conf}, ?OPTS(cluster))).
ensure_remove(Path) ->
wrap(emqx_conf:tombstone(Path, ?OPTS(cluster))).
wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason};
wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason};
wrap({error, Reason}) -> {error, Reason};
wrap(Ok) -> Ok.
listener_type_status_example() ->
[
#{
@ -813,8 +802,7 @@ tcp_schema_example() ->
create_listener(Body) ->
case parse_listener_conf(Body) of
{Id, Type, Name, Conf} ->
Path = [listeners, Type, Name],
case create(Path, Conf) of
case create(Type, Name, Conf) of
{ok, #{raw_config := _RawConf}} ->
crud_listeners_by_id(get, #{bindings => #{id => Id}});
{error, already_exist} ->

View File

@ -17,6 +17,8 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-behaviour(emqx_db_backup).
%% API
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
@ -28,12 +30,15 @@
update/4,
delete/1,
list/0,
init_bootstrap_file/0
init_bootstrap_file/0,
format/1
]).
-export([authorize/3]).
-export([post_config_update/5]).
-export([backup_tables/0]).
%% Internal exports (RPC)
-export([
do_update/4,
@ -67,6 +72,12 @@ mnesia(boot) ->
{attributes, record_info(fields, ?APP)}
]).
%%--------------------------------------------------------------------
%% Data backup
%%--------------------------------------------------------------------
backup_tables() -> [?APP].
post_config_update([api_key], _Req, NewConf, _OldConf, _AppEnvs) ->
#{bootstrap_file := File} = NewConf,
case init_bootstrap_file(File) of
@ -127,6 +138,17 @@ do_delete(Name) ->
[_App] -> mnesia:delete({?APP, Name})
end.
format(App = #{expired_at := ExpiredAt0, created_at := CreateAt}) ->
ExpiredAt =
case ExpiredAt0 of
infinity -> <<"infinity">>;
_ -> list_to_binary(calendar:system_time_to_rfc3339(ExpiredAt0))
end,
App#{
expired_at => ExpiredAt,
created_at => list_to_binary(calendar:system_time_to_rfc3339(CreateAt))
}.
list() ->
to_map(ets:match_object(?APP, #?APP{_ = '_'})).

View File

@ -25,6 +25,7 @@
-include("emqx_mgmt.hrl").
-define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~ts~n", [Cmd, Descr])).
-define(DATA_BACKUP_OPTS, #{print_fun => fun emqx_ctl:print/2}).
-export([load/0]).
@ -44,7 +45,8 @@
log/1,
authz/1,
pem_cache/1,
olp/1
olp/1,
data/1
]).
-define(PROC_INFOKEYS, [
@ -739,6 +741,37 @@ olp(_) ->
{"olp disable", "Disable overload protection"}
]).
%%--------------------------------------------------------------------
%% @doc data Command
data(["export"]) ->
case emqx_mgmt_data_backup:export(?DATA_BACKUP_OPTS) of
{ok, #{filename := Filename}} ->
emqx_ctl:print("Data has been successfully exported to ~s.~n", [Filename]);
{error, Reason} ->
Reason1 = emqx_mgmt_data_backup:format_error(Reason),
emqx_ctl:print("[error] Data export failed, reason: ~p.~n", [Reason1])
end;
data(["import", Filename]) ->
case emqx_mgmt_data_backup:import(Filename, ?DATA_BACKUP_OPTS) of
{ok, #{db_errors := DbErrs, config_errors := ConfErrs}} when
map_size(DbErrs) =:= 0, map_size(ConfErrs) =:= 0
->
emqx_ctl:print("Data has been imported successfully.~n");
{ok, _} ->
emqx_ctl:print(
"Data has been imported, but some errors occurred, see the the log above.~n"
);
{error, Reason} ->
Reason1 = emqx_mgmt_data_backup:format_error(Reason),
emqx_ctl:print("[error] Data import failed, reason: ~p.~n", [Reason1])
end;
data(_) ->
emqx_ctl:usage([
{"data import <File>", "Import data from the specified tar archive file"},
{"data export", "Export data"}
]).
%%--------------------------------------------------------------------
%% Dump ETS
%%--------------------------------------------------------------------

View File

@ -0,0 +1,690 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_data_backup).
-export([
export/0,
export/1,
import/1,
import/2,
format_error/1
]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
-include_lib("kernel/include/file.hrl").
-include_lib("emqx/include/logger.hrl").
-define(ROOT_BACKUP_DIR, "backup").
-define(BACKUP_MNESIA_DIR, "mnesia").
-define(TAR_SUFFIX, ".tar.gz").
-define(META_FILENAME, "META.hocon").
-define(CLUSTER_HOCON_FILENAME, "cluster.hocon").
-define(CONF_KEYS, [
<<"delayed">>,
<<"rewrite">>,
<<"retainer">>,
<<"mqtt">>,
<<"alarm">>,
<<"sysmon">>,
<<"sys_topics">>,
<<"limiter">>,
<<"log">>,
<<"persistent_session_store">>,
<<"prometheus">>,
<<"crl_cache">>,
<<"conn_congestion">>,
<<"force_shutdown">>,
<<"flapping_detect">>,
<<"broker">>,
<<"force_gc">>,
<<"zones">>
]).
-define(DEFAULT_OPTS, #{}).
-define(tar(_FileName_), _FileName_ ++ ?TAR_SUFFIX).
-define(fmt_tar_err(_Expr_),
fun() ->
case _Expr_ of
{error, _Reason_} -> {error, erl_tar:format_error(_Reason_)};
_Other_ -> _Other_
end
end()
).
-type backup_file_info() :: #{
filename => binary(),
size => non_neg_integer(),
created_at => binary(),
node => node(),
atom() => _
}.
-type db_error_details() :: #{mria:table() => {error, _}}.
-type config_error_details() :: #{emqx_utils_maps:config_path() => {error, _}}.
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec export() -> {ok, backup_file_info()} | {error, _}.
export() ->
export(?DEFAULT_OPTS).
-spec export(map()) -> {ok, backup_file_info()} | {error, _}.
export(Opts) ->
{BackupName, TarDescriptor} = prepare_new_backup(Opts),
try
do_export(BackupName, TarDescriptor, Opts)
catch
Class:Reason:Stack ->
?SLOG(error, #{
msg => "emqx_data_export_failed",
exception => Class,
reason => Reason,
stacktrace => Stack
}),
{error, Reason}
after
%% erl_tar:close/1 raises error if called on an already closed tar
catch erl_tar:close(TarDescriptor),
file:del_dir_r(BackupName)
end.
-spec import(file:filename_all()) ->
{ok, #{db_errors => db_error_details(), config_errors => config_error_details()}}
| {error, _}.
import(BackupFileName) ->
import(BackupFileName, ?DEFAULT_OPTS).
-spec import(file:filename_all(), map()) ->
{ok, #{db_errors => db_error_details(), config_errors => config_error_details()}}
| {error, _}.
import(BackupFileName, Opts) ->
case is_import_allowed() of
true ->
case lookup_file(str(BackupFileName)) of
{ok, FilePath} ->
do_import(FilePath, Opts);
Err ->
Err
end;
false ->
{error, not_core_node}
end.
format_error(not_core_node) ->
str(
io_lib:format(
"backup data import is only allowed on core EMQX nodes, but requested node ~p is not core",
[node()]
)
);
format_error(ee_to_ce_backup) ->
"importing EMQX Enterprise data backup to EMQX is not allowed";
format_error(missing_backup_meta) ->
"invalid backup archive file: missing " ?META_FILENAME;
format_error(invalid_edition) ->
"invalid backup archive content: wrong EMQX edition value in " ?META_FILENAME;
format_error(invalid_version) ->
"invalid backup archive content: wrong EMQX version value in " ?META_FILENAME;
format_error(bad_archive_dir) ->
"invalid backup archive content: all files in the archive must be under <backup name> directory";
format_error(not_found) ->
"backup file not found";
format_error(bad_backup_name) ->
"invalid backup name: file name must have " ?TAR_SUFFIX " extension";
format_error({unsupported_version, ImportVersion}) ->
str(
io_lib:format(
"[warning] Backup version ~p is newer than EMQX version ~p, import is not allowed.~n",
[str(ImportVersion), str(emqx_release:version())]
)
);
format_error(Reason) ->
Reason.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
prepare_new_backup(Opts) ->
Ts = erlang:system_time(millisecond),
{{Y, M, D}, {HH, MM, SS}} = local_datetime(Ts),
BackupBaseName = str(
io_lib:format(
"emqx-export-~0p-~2..0b-~2..0b-~2..0b-~2..0b-~2..0b.~3..0b",
[Y, M, D, HH, MM, SS, Ts rem 1000]
)
),
BackupName = filename:join(root_backup_dir(), BackupBaseName),
BackupTarName = ?tar(BackupName),
maybe_print("Exporting data to ~p...~n", [BackupTarName], Opts),
{ok, TarDescriptor} = ?fmt_tar_err(erl_tar:open(BackupTarName, [write, compressed])),
{BackupName, TarDescriptor}.
do_export(BackupName, TarDescriptor, Opts) ->
BackupBaseName = filename:basename(BackupName),
BackupTarName = ?tar(BackupName),
Meta = #{
version => emqx_release:version(),
edition => emqx_release:edition()
},
MetaBin = bin(hocon_pp:do(Meta, #{})),
MetaFileName = filename:join(BackupBaseName, ?META_FILENAME),
ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, MetaBin, MetaFileName, [])),
ok = export_cluster_hocon(TarDescriptor, BackupBaseName, Opts),
ok = export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts),
ok = ?fmt_tar_err(erl_tar:close(TarDescriptor)),
{ok, #file_info{
size = Size,
ctime = {{Y1, M1, D1}, {H1, MM1, S1}}
}} = file:read_file_info(BackupTarName),
CreatedAt = io_lib:format("~p-~p-~p ~p:~p:~p", [Y1, M1, D1, H1, MM1, S1]),
{ok, #{
filename => bin(BackupTarName),
size => Size,
created_at => bin(CreatedAt),
node => node()
}}.
export_cluster_hocon(TarDescriptor, BackupBaseName, Opts) ->
maybe_print("Exporting cluster configuration...~n", [], Opts),
RawConf = emqx_config:read_override_conf(#{override_to => cluster}),
maybe_print(
"Exporting additional files from EMQX data_dir: ~p...~n", [str(emqx:data_dir())], Opts
),
RawConf1 = read_data_files(RawConf),
RawConfBin = bin(hocon_pp:do(RawConf1, #{})),
NameInArchive = filename:join(BackupBaseName, ?CLUSTER_HOCON_FILENAME),
ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, RawConfBin, NameInArchive, [])).
export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts) ->
maybe_print("Exporting built-in database...~n", [], Opts),
lists:foreach(
fun(Tab) -> export_mnesia_tab(TarDescriptor, Tab, BackupName, BackupBaseName, Opts) end,
tabs_to_backup()
).
export_mnesia_tab(TarDescriptor, TabName, BackupName, BackupBaseName, Opts) ->
maybe_print("Exporting ~p database table...~n", [TabName], Opts),
{ok, MnesiaBackupName} = do_export_mnesia_tab(TabName, BackupName),
NameInArchive = mnesia_backup_name(BackupBaseName, TabName),
ok = ?fmt_tar_err(erl_tar:add(TarDescriptor, MnesiaBackupName, NameInArchive, [])),
_ = file:delete(MnesiaBackupName),
ok.
do_export_mnesia_tab(TabName, BackupName) ->
Node = node(),
try
{ok, TabName, [Node]} = mnesia:activate_checkpoint(
[{name, TabName}, {min, [TabName]}, {allow_remote, false}]
),
MnesiaBackupName = mnesia_backup_name(BackupName, TabName),
ok = filelib:ensure_dir(MnesiaBackupName),
ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName),
{ok, MnesiaBackupName}
after
mnesia:deactivate_checkpoint(TabName)
end.
-ifdef(TEST).
tabs_to_backup() ->
%% Allow mocking in tests
?MODULE:mnesia_tabs_to_backup().
-else.
tabs_to_backup() ->
mnesia_tabs_to_backup().
-endif.
mnesia_tabs_to_backup() ->
lists:flatten([M:backup_tables() || M <- find_behaviours(emqx_db_backup)]).
mnesia_backup_name(Path, TabName) ->
filename:join([Path, ?BACKUP_MNESIA_DIR, atom_to_list(TabName)]).
is_import_allowed() ->
mria_rlog:role() =:= core.
validate_backup(BackupDir) ->
case hocon:files([filename:join(BackupDir, ?META_FILENAME)]) of
{ok, #{
<<"edition">> := Edition,
<<"version">> := Version
}} = Meta ->
validate(
[
fun() -> check_edition(Edition) end,
fun() -> check_version(Version) end
],
Meta
);
_ ->
?SLOG(error, #{msg => "missing_backup_meta", backup => BackupDir}),
{error, missing_backup_meta}
end.
validate([ValidatorFun | T], OkRes) ->
case ValidatorFun() of
ok -> validate(T, OkRes);
Err -> Err
end;
validate([], OkRes) ->
OkRes.
check_edition(BackupEdition) when BackupEdition =:= <<"ce">>; BackupEdition =:= <<"ee">> ->
Edition = bin(emqx_release:edition()),
case {BackupEdition, Edition} of
{<<"ee">>, <<"ce">>} ->
{error, ee_to_ce_backup};
_ ->
ok
end;
check_edition(BackupEdition) ->
?SLOG(error, #{msg => "invalid_backup_edition", edition => BackupEdition}),
{error, invalid_edition}.
check_version(ImportVersion) ->
case parse_version_no_patch(ImportVersion) of
{ok, {ImportMajorInt, ImportMinorInt}} ->
Version = emqx_release:version(),
{ok, {MajorInt, MinorInt}} = parse_version_no_patch(bin(Version)),
case ImportMajorInt > MajorInt orelse ImportMinorInt > MinorInt of
true ->
%% 4.x backup files are anyway not compatible and will be treated as invalid,
%% before this step,
{error, {unsupported_version, str(ImportVersion)}};
false ->
ok
end;
Err ->
Err
end.
parse_version_no_patch(VersionBin) ->
case string:split(VersionBin, ".", all) of
[Major, Minor | _] ->
{MajorInt, _} = emqx_utils_binary:bin_to_int(Major),
{MinorInt, _} = emqx_utils_binary:bin_to_int(Minor),
{ok, {MajorInt, MinorInt}};
_ ->
?SLOG(error, #{msg => "failed_to_parse_backup_version", version => VersionBin}),
{error, invalid_version}
end.
do_import(BackupFileName, Opts) ->
BackupDir = filename:join(root_backup_dir(), filename:basename(BackupFileName, ?TAR_SUFFIX)),
maybe_print("Importing data from ~p...~n", [BackupFileName], Opts),
try
ok = validate_backup_name(BackupFileName),
ok = extract_backup(BackupFileName),
{ok, _} = validate_backup(BackupDir),
ConfErrors = import_cluster_hocon(BackupDir, Opts),
MnesiaErrors = import_mnesia_tabs(BackupDir, Opts),
?SLOG(info, #{msg => "emqx_data_import_success"}),
{ok, #{db_errors => MnesiaErrors, config_errors => ConfErrors}}
catch
error:{badmatch, {error, Reason}}:Stack ->
?SLOG(error, #{msg => "emqx_data_import_failed", reason => Reason, stacktrace => Stack}),
{error, Reason};
Class:Reason:Stack ->
?SLOG(error, #{
msg => "emqx_data_import_failed",
exception => Class,
reason => Reason,
stacktrace => Stack
}),
{error, Reason}
after
file:del_dir_r(BackupDir)
end.
import_mnesia_tabs(BackupDir, Opts) ->
maybe_print("Importing built-in database...~n", [], Opts),
filter_errors(
lists:foldr(
fun(Tab, Acc) -> Acc#{Tab => import_mnesia_tab(BackupDir, Tab, Opts)} end,
#{},
tabs_to_backup()
)
).
import_mnesia_tab(BackupDir, TabName, Opts) ->
MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName),
case filelib:is_regular(MnesiaBackupFileName) of
true ->
maybe_print("Importing ~p database table...~n", [TabName], Opts),
restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts);
false ->
maybe_print("No backup file for ~p database table...~n", [TabName], Opts),
?SLOG(info, #{msg => "missing_mnesia_backup", table => TabName, backup => BackupDir}),
ok
end.
restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) ->
BackupNameToImport = MnesiaBackupFileName ++ "_for_import",
Prepared =
catch mnesia:traverse_backup(
MnesiaBackupFileName, BackupNameToImport, fun backup_converter/2, 0
),
try
case Prepared of
{ok, _} ->
Restored = mnesia:restore(BackupNameToImport, [{default_op, keep_tables}]),
case Restored of
{atomic, [TabName]} ->
ok;
RestoreErr ->
?SLOG(error, #{
msg => "failed_to_restore_mnesia_backup",
table => TabName,
backup => BackupDir,
reason => RestoreErr
}),
maybe_print_mnesia_import_err(TabName, RestoreErr, Opts),
{error, RestoreErr}
end;
PrepareErr ->
?SLOG(error, #{
msg => "failed_to_prepare_mnesia_backup_for_restoring",
table => TabName,
backup => BackupDir,
reason => PrepareErr
}),
maybe_print_mnesia_import_err(TabName, PrepareErr, Opts),
PrepareErr
end
after
%% Cleanup files as soon as they are not needed any more for more efficient disk usage
_ = file:delete(BackupNameToImport),
_ = file:delete(MnesiaBackupFileName)
end.
backup_converter({schema, Tab, CreateList}, Acc) ->
check_rec_attributes(Tab, CreateList),
{[{schema, Tab, lists:map(fun convert_copies/1, CreateList)}], Acc};
backup_converter(Other, Acc) ->
{[Other], Acc}.
check_rec_attributes(Tab, CreateList) ->
ImportAttributes = proplists:get_value(attributes, CreateList),
Attributes = mnesia:table_info(Tab, attributes),
case ImportAttributes =/= Attributes of
true ->
throw({error, different_table_schema});
false ->
ok
end.
convert_copies({K, [_ | _]}) when K == ram_copies; K == disc_copies; K == disc_only_copies ->
{K, [node()]};
convert_copies(Other) ->
Other.
extract_backup(BackupFileName) ->
BackupDir = root_backup_dir(),
ok = validate_filenames(BackupFileName),
?fmt_tar_err(erl_tar:extract(BackupFileName, [{cwd, BackupDir}, compressed])).
validate_filenames(BackupFileName) ->
{ok, FileNames} = ?fmt_tar_err(erl_tar:table(BackupFileName, [compressed])),
BackupName = filename:basename(BackupFileName, ?TAR_SUFFIX),
IsValid = lists:all(
fun(FileName) ->
[Root | _] = filename:split(FileName),
Root =:= BackupName
end,
FileNames
),
case IsValid of
true -> ok;
false -> {error, bad_archive_dir}
end.
import_cluster_hocon(BackupDir, Opts) ->
HoconFileName = filename:join(BackupDir, ?CLUSTER_HOCON_FILENAME),
case filelib:is_regular(HoconFileName) of
true ->
{ok, RawConf} = hocon:files([HoconFileName]),
{ok, _} = validate_cluster_hocon(RawConf),
maybe_print("Importing cluster configuration...~n", [], Opts),
%% At this point, when all validations have been passed, we want to log errors (if any)
%% but proceed with the next items, instead of aborting the whole import operation
do_import_conf(RawConf, Opts);
false ->
maybe_print("No cluster configuration to be imported.~n", [], Opts),
?SLOG(info, #{
msg => "no_backup_hocon_config_to_import",
backup => BackupDir
}),
#{}
end.
read_data_files(RawConf) ->
DataDir = bin(emqx:data_dir()),
{ok, Cwd} = file:get_cwd(),
AbsDataDir = bin(filename:join(Cwd, DataDir)),
RawConf1 = emqx_authz:maybe_read_acl_file(RawConf),
emqx_utils_maps:deep_convert(RawConf1, fun read_data_file/4, [DataDir, AbsDataDir]).
-define(dir_pattern(_Dir_), <<_Dir_:(byte_size(_Dir_))/binary, _/binary>>).
read_data_file(Key, Val, DataDir, AbsDataDir) ->
Val1 =
case Val of
?dir_pattern(DataDir) = FileName ->
do_read_file(FileName);
?dir_pattern(AbsDataDir) = FileName ->
do_read_file(FileName);
V ->
V
end,
{Key, Val1}.
do_read_file(FileName) ->
case file:read_file(FileName) of
{ok, Content} ->
Content;
{error, Reason} ->
?SLOG(warning, #{
msg => "failed_to_read_data_file",
filename => FileName,
reason => Reason
}),
FileName
end.
validate_cluster_hocon(RawConf) ->
%% write ACL file to comply with the schema...
RawConf1 = emqx_authz:maybe_write_acl_file(RawConf),
emqx_hocon:check(
emqx_conf:schema_module(),
maps:merge(emqx:get_raw_config([]), RawConf1),
#{atom_key => false, required => false}
).
do_import_conf(RawConf, Opts) ->
GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))),
maybe_print_errors(GenConfErrs, Opts),
Errors =
lists:foldr(
fun(Module, ErrorsAcc) ->
Module:import_config(RawConf),
case Module:import_config(RawConf) of
{ok, #{changed := Changed}} ->
maybe_print_changed(Changed, Opts),
ErrorsAcc;
{error, #{root_key := RootKey, reason := Reason}} ->
ErrorsAcc#{[RootKey] => Reason}
end
end,
GenConfErrs,
find_behaviours(emqx_config_backup)
),
maybe_print_errors(Errors, Opts),
Errors.
import_generic_conf(Data) ->
lists:map(
fun(Key) ->
case maps:get(Key, Data, undefined) of
undefined -> {[Key], ok};
Conf -> {[Key], emqx_conf:update([Key], Conf, #{override_to => cluster})}
end
end,
?CONF_KEYS
).
maybe_print_changed(Changed, Opts) ->
lists:foreach(
fun(ChangedPath) ->
maybe_print(
"Config key path ~p was present before import and "
"has been overwritten.~n",
[pretty_path(ChangedPath)],
Opts
)
end,
Changed
).
maybe_print_errors(Errors, Opts) ->
maps:foreach(
fun(Path, Err) ->
maybe_print(
"Failed to import the following config path: ~p, reason: ~p~n",
[pretty_path(Path), Err],
Opts
)
end,
Errors
).
filter_errors(Results) ->
maps:filter(
fun
(_Path, {error, _}) -> true;
(_, _) -> false
end,
Results
).
pretty_path(Path) ->
str(lists:join(".", [str(Part) || Part <- Path])).
str(Data) when is_atom(Data) ->
atom_to_list(Data);
str(Data) ->
unicode:characters_to_list(Data).
bin(Data) when is_atom(Data) ->
atom_to_binary(Data, utf8);
bin(Data) ->
unicode:characters_to_binary(Data).
validate_backup_name(FileName) ->
BaseName = filename:basename(FileName, ?TAR_SUFFIX),
ValidName = BaseName ++ ?TAR_SUFFIX,
case filename:basename(FileName) of
ValidName -> ok;
_ -> {error, bad_backup_name}
end.
lookup_file(FileName) ->
case filelib:is_regular(FileName) of
true ->
{ok, FileName};
false ->
%% Only lookup by basename, don't allow to lookup by file path
case FileName =:= filename:basename(FileName) of
true ->
FilePath = filename:join(root_backup_dir(), FileName),
case filelib:is_file(FilePath) of
true -> {ok, FilePath};
false -> {error, not_found}
end;
false ->
{error, not_found}
end
end.
root_backup_dir() ->
Dir = filename:join(emqx:data_dir(), ?ROOT_BACKUP_DIR),
ok = ensure_path(Dir),
Dir.
-if(?OTP_RELEASE < 25).
ensure_path(Path) -> filelib:ensure_dir(filename:join([Path, "dummy"])).
-else.
ensure_path(Path) -> filelib:ensure_path(Path).
-endif.
local_datetime(MillisecondTs) ->
calendar:system_time_to_local_time(MillisecondTs, millisecond).
maybe_print(Format, Args, #{print_fun := PrintFun}) ->
PrintFun(Format, Args);
maybe_print(_Format, _Args, _Opts) ->
ok.
maybe_print_mnesia_import_err(TabName, Error, Opts) ->
maybe_print(
"[error] Failed to import built-in database table: ~p, reason: ~p~n",
[TabName, Error],
Opts
).
find_behaviours(Behaviour) ->
find_behaviours(Behaviour, apps(), []).
%% Based on minirest_api:find_api_modules/1
find_behaviours(_Behaviour, [] = _Apps, Acc) ->
Acc;
find_behaviours(Behaviour, [App | Apps], Acc) ->
case application:get_key(App, modules) of
undefined ->
Acc;
{ok, Modules} ->
NewAcc = lists:filter(
fun(Module) ->
Info = Module:module_info(attributes),
Bhvrs = lists:flatten(
proplists:get_all_values(behavior, Info) ++
proplists:get_all_values(behaviour, Info)
),
lists:member(Behaviour, Bhvrs)
end,
Modules
),
find_behaviours(Behaviour, Apps, NewAcc ++ Acc)
end.
apps() ->
[
App
|| {App, _, _} <- application:loaded_applications(),
case re:run(atom_to_list(App), "^emqx") of
{match, [{0, 4}]} -> true;
_ -> false
end
].

View File

@ -0,0 +1,96 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_listeners_conf).
-behaviour(emqx_config_backup).
-export([
action/4,
create/3,
ensure_remove/2,
get_raw/2,
update/3
]).
%% Data backup
-export([
import_config/1
]).
-include_lib("emqx/include/logger.hrl").
-define(CONF_ROOT_KEY, listeners).
-define(path(_Type_, _Name_), [?CONF_ROOT_KEY, _Type_, _Name_]).
-define(OPTS, #{rawconf_with_defaults => true, override_to => cluster}).
-define(IMPORT_OPTS, #{override_to => cluster}).
action(Type, Name, Action, Conf) ->
wrap(emqx_conf:update(?path(Type, Name), {action, Action, Conf}, ?OPTS)).
create(Type, Name, Conf) ->
wrap(emqx_conf:update(?path(Type, Name), {create, Conf}, ?OPTS)).
ensure_remove(Type, Name) ->
wrap(emqx_conf:tombstone(?path(Type, Name), ?OPTS)).
get_raw(Type, Name) -> emqx_conf:get_raw(?path(Type, Name), undefined).
update(Type, Name, Conf) ->
wrap(emqx_conf:update(?path(Type, Name), {update, Conf}, ?OPTS)).
wrap({error, {post_config_update, emqx_listeners, Reason}}) -> {error, Reason};
wrap({error, {pre_config_update, emqx_listeners, Reason}}) -> {error, Reason};
wrap({error, Reason}) -> {error, Reason};
wrap(Ok) -> Ok.
%%------------------------------------------------------------------------------
%% Data backup
%%------------------------------------------------------------------------------
import_config(RawConf) ->
NewConf = maps:get(<<"listeners">>, RawConf, #{}),
OldConf = emqx:get_raw_config([?CONF_ROOT_KEY], #{}),
MergedConf = merge_confs(OldConf, NewConf),
case emqx_conf:update([?CONF_ROOT_KEY], MergedConf, ?IMPORT_OPTS) of
{ok, #{raw_config := NewRawConf}} ->
{ok, #{root_key => ?CONF_ROOT_KEY, changed => changed_paths(OldConf, NewRawConf)}};
Error ->
{error, #{root_key => ?CONF_ROOT_KEY, reason => Error}}
end.
merge_confs(OldConf, NewConf) ->
AllTypes = maps:keys(maps:merge(OldConf, NewConf)),
lists:foldr(
fun(Type, Acc) ->
NewListeners = maps:get(Type, NewConf, #{}),
OldListeners = maps:get(Type, OldConf, #{}),
Acc#{Type => maps:merge(OldListeners, NewListeners)}
end,
#{},
AllTypes
).
changed_paths(OldRawConf, NewRawConf) ->
maps:fold(
fun(Type, Listeners, ChangedAcc) ->
OldListeners = maps:get(Type, OldRawConf, #{}),
Changed = maps:get(changed, emqx_utils_maps:diff_maps(Listeners, OldListeners)),
[?path(Type, K) || K <- maps:keys(Changed)] ++ ChangedAcc
end,
[],
NewRawConf
).

View File

@ -0,0 +1,519 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_data_backup_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(BOOTSTRAP_BACKUP, "emqx-export-test-bootstrap-ce.tar.gz").
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
[application:load(App) || App <- apps_to_start() ++ apps_to_load()],
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(t_import_on_cluster, Config) ->
%% Don't import listeners to avoid port conflicts
%% when the same conf will be imported to another cluster
meck:new(emqx_mgmt_listeners_conf, [passthrough]),
meck:new(emqx_gateway_conf, [passthrough]),
meck:expect(
emqx_mgmt_listeners_conf,
import_config,
1,
{ok, #{changed => [], root_key => listeners}}
),
meck:expect(
emqx_gateway_conf,
import_config,
1,
{ok, #{changed => [], root_key => gateway}}
),
[{cluster, cluster(Config)} | setup(Config)];
init_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) ->
[{cluster, cluster(Config)} | setup(Config)];
init_per_testcase(t_mnesia_bad_tab_schema, Config) ->
meck:new(emqx_mgmt_data_backup, [passthrough]),
meck:expect(emqx_mgmt_data_backup, mnesia_tabs_to_backup, 0, [data_backup_test]),
setup(Config);
init_per_testcase(_TestCase, Config) ->
setup(Config).
end_per_testcase(t_import_on_cluster, Config) ->
cleanup_cluster(?config(cluster, Config)),
cleanup(Config),
meck:unload(emqx_mgmt_listeners_conf),
meck:unload(emqx_gateway_conf);
end_per_testcase(t_verify_imported_mnesia_tab_on_cluster, Config) ->
cleanup_cluster(?config(cluster, Config)),
cleanup(Config);
end_per_testcase(t_mnesia_bad_tab_schema, Config) ->
cleanup(Config),
meck:unload(emqx_mgmt_data_backup);
end_per_testcase(_TestCase, Config) ->
cleanup(Config).
t_empty_export_import(_Config) ->
ExpRawConf = emqx:get_raw_config([]),
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
Exp = {ok, #{db_errors => #{}, config_errors => #{}}},
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
?assertEqual(ExpRawConf, emqx:get_raw_config([])),
%% idempotent update assert
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
?assertEqual(ExpRawConf, emqx:get_raw_config([])).
t_cluster_hocon_export_import(Config) ->
RawConfBeforeImport = emqx:get_raw_config([]),
BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP),
Exp = {ok, #{db_errors => #{}, config_errors => #{}}},
?assertEqual(Exp, emqx_mgmt_data_backup:import(BootstrapFile)),
RawConfAfterImport = emqx:get_raw_config([]),
?assertNotEqual(RawConfBeforeImport, RawConfAfterImport),
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
?assertEqual(RawConfAfterImport, emqx:get_raw_config([])),
%% idempotent update assert
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
?assertEqual(RawConfAfterImport, emqx:get_raw_config([])),
%% lookup file inside <data_dir>/backup
?assertEqual(Exp, emqx_mgmt_data_backup:import(filename:basename(FileName))).
t_ee_to_ce_backup(Config) ->
case emqx_release:edition() of
ce ->
EEBackupFileName = filename:join(?config(priv_dir, Config), "export-backup-ee.tar.gz"),
Meta = unicode:characters_to_binary(
hocon_pp:do(#{edition => ee, version => emqx_release:version()}, #{})
),
ok = erl_tar:create(
EEBackupFileName,
[
{"export-backup-ee/cluster.hocon", <<>>},
{"export-backup-ee/META.hocon", Meta}
],
[compressed]
),
ExpReason = ee_to_ce_backup,
?assertEqual(
{error, ExpReason}, emqx_mgmt_data_backup:import(EEBackupFileName)
),
%% Must be translated to a readable string
?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpReason));
ee ->
%% Don't fail if the test is run with emqx-enterprise profile
ok
end.
t_no_backup_file(_Config) ->
ExpReason = not_found,
?assertEqual(
{error, not_found}, emqx_mgmt_data_backup:import("no_such_backup.tar.gz")
),
?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpReason)).
t_bad_backup_file(Config) ->
BadFileName = filename:join(?config(priv_dir, Config), "export-bad-backup-tar-gz"),
ok = file:write_file(BadFileName, <<>>),
NoMetaFileName = filename:join(?config(priv_dir, Config), "export-no-meta.tar.gz"),
ok = erl_tar:create(NoMetaFileName, [{"export-no-meta/cluster.hocon", <<>>}], [compressed]),
BadArchiveDirFileName = filename:join(?config(priv_dir, Config), "export-bad-dir.tar.gz"),
ok = erl_tar:create(
BadArchiveDirFileName,
[
{"tmp/cluster.hocon", <<>>},
{"export-bad-dir-inside/META.hocon", <<>>},
{"/export-bad-dir-inside/mnesia/test_tab", <<>>}
],
[compressed]
),
InvalidEditionFileName = filename:join(
?config(priv_dir, Config), "export-invalid-edition.tar.gz"
),
Meta = unicode:characters_to_binary(
hocon_pp:do(#{edition => "test", version => emqx_release:version()}, #{})
),
ok = erl_tar:create(
InvalidEditionFileName,
[
{"export-invalid-edition/cluster.hocon", <<>>},
{"export-invalid-edition/META.hocon", Meta}
],
[compressed]
),
InvalidVersionFileName = filename:join(
?config(priv_dir, Config), "export-invalid-version.tar.gz"
),
Meta1 = unicode:characters_to_binary(
hocon_pp:do(#{edition => emqx_release:edition(), version => "test"}, #{})
),
ok = erl_tar:create(
InvalidVersionFileName,
[
{"export-invalid-version/cluster.hocon", <<>>},
{"export-invalid-version/META.hocon", Meta1}
],
[compressed]
),
BadFileNameReason = bad_backup_name,
NoMetaReason = missing_backup_meta,
BadArchiveDirReason = bad_archive_dir,
InvalidEditionReason = invalid_edition,
InvalidVersionReason = invalid_version,
?assertEqual({error, BadFileNameReason}, emqx_mgmt_data_backup:import(BadFileName)),
?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(BadFileNameReason)),
?assertEqual({error, NoMetaReason}, emqx_mgmt_data_backup:import(NoMetaFileName)),
?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(NoMetaReason)),
?assertEqual(
{error, BadArchiveDirReason},
emqx_mgmt_data_backup:import(BadArchiveDirFileName)
),
?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(BadArchiveDirReason)),
?assertEqual(
{error, InvalidEditionReason},
emqx_mgmt_data_backup:import(InvalidEditionFileName)
),
?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(InvalidEditionReason)),
?assertEqual(
{error, InvalidVersionReason},
emqx_mgmt_data_backup:import(InvalidVersionFileName)
),
?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(InvalidVersionReason)).
t_future_version(Config) ->
CurrentVersion = list_to_binary(emqx_release:version()),
[_, _ | Patch] = string:split(CurrentVersion, ".", all),
{ok, {MajorInt, MinorInt}} = emqx_mgmt_data_backup:parse_version_no_patch(CurrentVersion),
FutureMajorVersion = recompose_version(MajorInt + 1, MinorInt, Patch),
FutureMinorVersion = recompose_version(MajorInt, MinorInt + 1, Patch),
[MajorMeta, MinorMeta] =
[
unicode:characters_to_binary(
hocon_pp:do(#{edition => emqx_release:edition(), version => V}, #{})
)
|| V <- [FutureMajorVersion, FutureMinorVersion]
],
MajorFileName = filename:join(?config(priv_dir, Config), "export-future-major-ver.tar.gz"),
MinorFileName = filename:join(?config(priv_dir, Config), "export-future-minor-ver.tar.gz"),
ok = erl_tar:create(
MajorFileName,
[
{"export-future-major-ver/cluster.hocon", <<>>},
{"export-future-major-ver/META.hocon", MajorMeta}
],
[compressed]
),
ok = erl_tar:create(
MinorFileName,
[
{"export-future-minor-ver/cluster.hocon", <<>>},
{"export-future-minor-ver/META.hocon", MinorMeta}
],
[compressed]
),
ExpMajorReason = {unsupported_version, FutureMajorVersion},
ExpMinorReason = {unsupported_version, FutureMinorVersion},
?assertEqual({error, ExpMajorReason}, emqx_mgmt_data_backup:import(MajorFileName)),
?assertEqual({error, ExpMinorReason}, emqx_mgmt_data_backup:import(MinorFileName)),
?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpMajorReason)),
?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ExpMinorReason)).
t_bad_config(Config) ->
BadConfigFileName = filename:join(?config(priv_dir, Config), "export-bad-config-backup.tar.gz"),
Meta = unicode:characters_to_binary(
hocon_pp:do(#{edition => emqx_release:edition(), version => emqx_release:version()}, #{})
),
BadConfigMap = #{
<<"listeners">> =>
#{
<<"bad-type">> =>
#{<<"bad-name">> => #{<<"bad-field">> => <<"bad-val">>}}
}
},
BadConfig = unicode:characters_to_binary(hocon_pp:do(BadConfigMap, #{})),
ok = erl_tar:create(
BadConfigFileName,
[
{"export-bad-config-backup/cluster.hocon", BadConfig},
{"export-bad-config-backup/META.hocon", Meta}
],
[compressed]
),
Res = emqx_mgmt_data_backup:import(BadConfigFileName),
?assertMatch({error, #{kind := validation_error}}, Res).
t_import_on_cluster(Config) ->
%% Randomly chosen config key to verify import result additionally
?assertEqual([], emqx:get_config([authentication])),
BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP),
ExpImportRes = {ok, #{db_errors => #{}, config_errors => #{}}},
?assertEqual(ExpImportRes, emqx_mgmt_data_backup:import(BootstrapFile)),
ImportedAuthnConf = emqx:get_config([authentication]),
?assertMatch([_ | _], ImportedAuthnConf),
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
{ok, Cwd} = file:get_cwd(),
AbsFilePath = filename:join(Cwd, FileName),
[CoreNode1, _CoreNode2, ReplicantNode] = NodesList = ?config(cluster, Config),
ReplImportReason = not_core_node,
?assertEqual(
{error, ReplImportReason},
rpc:call(ReplicantNode, emqx_mgmt_data_backup, import, [AbsFilePath])
),
?assertMatch([_ | _], emqx_mgmt_data_backup:format_error(ReplImportReason)),
[?assertEqual([], rpc:call(N, emqx, get_config, [[authentication]])) || N <- NodesList],
?assertEqual(
ExpImportRes,
rpc:call(CoreNode1, emqx_mgmt_data_backup, import, [AbsFilePath])
),
[
?assertEqual(
authn_ids(ImportedAuthnConf),
authn_ids(rpc:call(N, emqx, get_config, [[authentication]]))
)
|| N <- NodesList
].
t_verify_imported_mnesia_tab_on_cluster(Config) ->
UsersToExport = users(<<"user_to_export_">>),
UsersBeforeImport = users(<<"user_before_import_">>),
[{ok, _} = emqx_dashboard_admin:add_user(U, U, U) || U <- UsersToExport],
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
{ok, Cwd} = file:get_cwd(),
AbsFilePath = filename:join(Cwd, FileName),
[CoreNode1, CoreNode2, ReplicantNode] = NodesList = ?config(cluster, Config),
[
{ok, _} = rpc:call(CoreNode1, emqx_dashboard_admin, add_user, [U, U, U])
|| U <- UsersBeforeImport
],
?assertEqual(
{ok, #{db_errors => #{}, config_errors => #{}}},
rpc:call(CoreNode1, emqx_mgmt_data_backup, import, [AbsFilePath])
),
[Tab] = emqx_dashboard_admin:backup_tables(),
AllUsers = lists:sort(mnesia:dirty_all_keys(Tab) ++ UsersBeforeImport),
[
?assertEqual(
AllUsers,
lists:sort(rpc:call(N, mnesia, dirty_all_keys, [Tab]))
)
|| N <- [CoreNode1, CoreNode2]
],
%% Give some extra time to replicant to import data...
timer:sleep(3000),
?assertEqual(AllUsers, lists:sort(rpc:call(ReplicantNode, mnesia, dirty_all_keys, [Tab]))),
[rpc:call(N, ekka, leave, []) || N <- lists:reverse(NodesList)],
[emqx_common_test_helpers:stop_slave(N) || N <- NodesList].
t_mnesia_bad_tab_schema(_Config) ->
OldAttributes = [id, name, description],
ok = create_test_tab(OldAttributes),
ok = mria:dirty_write({data_backup_test, <<"id">>, <<"old_name">>, <<"old_description">>}),
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
{atomic, ok} = mnesia:delete_table(data_backup_test),
NewAttributes = [id, name, description, new_field],
ok = create_test_tab(NewAttributes),
NewRec =
{data_backup_test, <<"id">>, <<"new_name">>, <<"new_description">>, <<"new_field_value">>},
ok = mria:dirty_write(NewRec),
?assertEqual(
{ok, #{
db_errors =>
#{data_backup_test => {error, {"Backup traversal failed", different_table_schema}}},
config_errors => #{}
}},
emqx_mgmt_data_backup:import(FileName)
),
?assertEqual([NewRec], mnesia:dirty_read(data_backup_test, <<"id">>)),
?assertEqual([<<"id">>], mnesia:dirty_all_keys(data_backup_test)).
t_read_files(_Config) ->
DataDir = emqx:data_dir(),
%% Relative "data" path is set in init_per_testcase/2, asserting it must be safe
?assertEqual("data", DataDir),
{ok, Cwd} = file:get_cwd(),
AbsDataDir = filename:join(Cwd, DataDir),
FileBaseName = "t_read_files_tmp_file",
TestFileAbsPath = iolist_to_binary(filename:join(AbsDataDir, FileBaseName)),
TestFilePath = iolist_to_binary(filename:join(DataDir, FileBaseName)),
TestFileContent = <<"test_file_content">>,
ok = file:write_file(TestFileAbsPath, TestFileContent),
RawConf = #{
<<"test_rootkey">> => #{
<<"test_field">> => <<"test_field_path">>,
<<"abs_data_dir_path_file">> => TestFileAbsPath,
<<"rel_data_dir_path_file">> => TestFilePath,
<<"path_outside_data_dir">> => <<"/tmp/some-file">>
}
},
RawConf1 = emqx_utils_maps:deep_put(
[<<"test_rootkey">>, <<"abs_data_dir_path_file">>], RawConf, TestFileContent
),
ExpectedConf = emqx_utils_maps:deep_put(
[<<"test_rootkey">>, <<"rel_data_dir_path_file">>], RawConf1, TestFileContent
),
?assertEqual(ExpectedConf, emqx_mgmt_data_backup:read_data_files(RawConf)).
%%------------------------------------------------------------------------------
%% Internal test helpers
%%------------------------------------------------------------------------------
setup(Config) ->
%% avoid port conflicts if the cluster is started
AppHandler = fun
(emqx_dashboard) ->
ok = emqx_config:put([dashboard, listeners, http, bind], 0);
(_) ->
ok
end,
ok = emqx_common_test_helpers:start_apps(apps_to_start(), AppHandler),
PrevDataDir = application:get_env(emqx, data_dir),
application:set_env(emqx, data_dir, "data"),
[{previous_emqx_data_dir, PrevDataDir} | Config].
cleanup(Config) ->
emqx_common_test_helpers:stop_apps(apps_to_start()),
case ?config(previous_emqx_data_dir, Config) of
undefined ->
application:unset_env(emqx, data_dir);
{ok, Val} ->
application:set_env(emqx, data_dir, Val)
end.
cleanup_cluster(ClusterNodes) ->
[rpc:call(N, ekka, leave, []) || N <- lists:reverse(ClusterNodes)],
[emqx_common_test_helpers:stop_slave(N) || N <- ClusterNodes].
users(Prefix) ->
[
<<Prefix/binary, (integer_to_binary(abs(erlang:unique_integer())))/binary>>
|| _ <- lists:seq(1, 10)
].
authn_ids(AuthnConf) ->
lists:sort([emqx_authentication:authenticator_id(Conf) || Conf <- AuthnConf]).
recompose_version(MajorInt, MinorInt, Patch) ->
unicode:characters_to_list(
[integer_to_list(MajorInt + 1), $., integer_to_list(MinorInt), $. | Patch]
).
cluster(Config) ->
PrivDataDir = ?config(priv_dir, Config),
[{Core1, Core1Opts}, {Core2, Core2Opts}, {Replicant, ReplOpts}] =
emqx_common_test_helpers:emqx_cluster(
[
{core, data_backup_core1},
{core, data_backup_core2},
{replicant, data_backup_replicant}
],
#{
priv_data_dir => PrivDataDir,
schema_mod => emqx_conf_schema,
apps => apps_to_start(),
load_apps => apps_to_start() ++ apps_to_load(),
env => [{mria, db_backend, rlog}],
load_schema => true,
start_autocluster => true,
join_to => true,
listener_ports => [],
conf => [{[dashboard, listeners, http, bind], 0}],
env_handler =>
fun(_) ->
application:set_env(emqx, boot_modules, [broker, router])
end
}
),
Node1 = emqx_common_test_helpers:start_slave(Core1, Core1Opts),
Node2 = emqx_common_test_helpers:start_slave(Core2, Core2Opts),
#{conf := _ReplConf, env := ReplEnv} = ReplOpts,
ClusterDiscovery = {static, [{seeds, [Node1, Node2]}]},
ReplOpts1 = maps:remove(
join_to,
ReplOpts#{
env => [{ekka, cluster_discovery, ClusterDiscovery} | ReplEnv],
env_handler => fun(_) ->
application:set_env(emqx, boot_modules, [broker, router]),
application:set_env(
ekka,
cluster_discovery,
ClusterDiscovery
)
end
}
),
ReplNode = emqx_common_test_helpers:start_slave(Replicant, ReplOpts1),
[Node1, Node2, ReplNode].
create_test_tab(Attributes) ->
ok = mria:create_table(data_backup_test, [
{type, set},
{rlog_shard, data_backup_test_shard},
{storage, disc_copies},
{record_name, data_backup_test},
{attributes, Attributes},
{storage_properties, [
{ets, [
{read_concurrency, true},
{write_concurrency, true}
]}
]}
]),
ok = mria:wait_for_tables([data_backup_test]).
apps_to_start() ->
[
emqx,
emqx_conf,
emqx_psk,
emqx_management,
emqx_dashboard,
emqx_authz,
emqx_authn,
emqx_rule_engine,
emqx_retainer,
emqx_prometheus,
emqx_modules,
emqx_gateway,
emqx_exhook,
emqx_bridge,
emqx_auto_subscribe
].
apps_to_load() ->
[
emqx_gateway_lwm2m,
emqx_gateway_coap,
emqx_gateway_exproto,
emqx_gateway_stomp,
emqx_gateway_mqttsn
].

View File

@ -98,9 +98,9 @@
-define(FORMAT_FUN, {?MODULE, format_delayed}).
-define(NOW, erlang:system_time(milli_seconds)).
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
mnesia(boot) ->
ok = mria:create_table(?TAB, [
{type, ordered_set},
@ -110,9 +110,9 @@ mnesia(boot) ->
{attributes, record_info(fields, delayed_message)}
]).
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Hooks
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
on_message_publish(
Msg = #message{
id = Id,
@ -143,9 +143,9 @@ on_message_publish(
on_message_publish(Msg) ->
{ok, Msg}.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Start delayed publish server
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
-spec start_link() -> emqx_types:startlink_ret().
start_link() ->
@ -270,9 +270,9 @@ post_config_update(_KeyPath, _ConfigReq, NewConf, _OldConf, _AppEnvs) ->
Enable = maps:get(enable, NewConf, undefined),
load_or_unload(Enable).
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% gen_server callback
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
init([]) ->
ok = mria:wait_for_tables([?TAB]),
@ -335,9 +335,9 @@ terminate(_Reason, #{stats_timer := StatsTimer} = State) ->
code_change(_Vsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Telemetry
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
-spec get_basic_usage_info() -> #{delayed_message_count => non_neg_integer()}.
get_basic_usage_info() ->
@ -348,9 +348,9 @@ get_basic_usage_info() ->
end,
#{delayed_message_count => DelayedCount}.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Ensure the stats
-spec ensure_stats_event(state()) -> state().

View File

@ -49,9 +49,12 @@
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
%%--------------------------------------------------------------------
-define(update(_Rules_),
emqx_conf:update([rewrite], _Rules_, #{override_to => cluster})
).
%%------------------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
enable() ->
emqx_conf:add_handler([rewrite], ?MODULE),
@ -67,7 +70,7 @@ list() ->
emqx_conf:get_raw([<<"rewrite">>], []).
update(Rules0) ->
case emqx_conf:update([rewrite], Rules0, #{override_to => cluster}) of
case ?update(Rules0) of
{ok, _} ->
ok;
{error, Reason} ->
@ -109,18 +112,19 @@ rewrite_publish(Message = #message{topic = Topic}, Rules) ->
Binds = fill_client_binds(Message),
{ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Telemetry
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
-spec get_basic_usage_info() -> #{topic_rewrite_rule_count => non_neg_integer()}.
get_basic_usage_info() ->
RewriteRules = list(),
#{topic_rewrite_rule_count => length(RewriteRules)}.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
compile(Rules) ->
lists:foldl(
fun(Rule, {Publish, Subscribe, Error}) ->

View File

@ -2,7 +2,7 @@
{application, emqx_psk, [
{description, "EMQX PSK"},
% strict semver, bump manually!
{vsn, "5.0.1"},
{vsn, "5.0.2"},
{modules, []},
{registered, [emqx_psk_sup]},
{applications, [kernel, stdlib]},

View File

@ -17,6 +17,8 @@
-module(emqx_psk).
-behaviour(gen_server).
-behaviour(emqx_db_backup).
-behaviour(emqx_config_backup).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
@ -48,6 +50,12 @@
insert_psks/1
]).
%% Data backup
-export([
import_config/1,
backup_tables/0
]).
-record(psk_entry, {
psk_id :: binary(),
shared_secret :: binary(),
@ -86,6 +94,12 @@ mnesia(boot) ->
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]).
%%------------------------------------------------------------------------------
%% Data backup
%%------------------------------------------------------------------------------
backup_tables() -> [?TAB].
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
@ -115,9 +129,27 @@ start_link() ->
stop() ->
gen_server:stop(?MODULE).
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Data backup
%%------------------------------------------------------------------------------
import_config(#{<<"psk_authentication">> := PskConf}) ->
case emqx_conf:update([psk_authentication], PskConf, #{override_to => cluster}) of
{ok, _} ->
case get_config(enable) of
true -> load();
false -> ok
end,
{ok, #{root_key => psk_authentication, changed => []}};
Error ->
{error, #{root_key => psk_authentication, reason => Error}}
end;
import_config(_RawConf) ->
{ok, #{root_key => psk_authentication, changed => []}}.
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
init(_Opts) ->
_ =

View File

@ -82,9 +82,9 @@
-callback clean(context()) -> ok.
-callback size(context()) -> non_neg_integer().
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Hook API
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
-spec on_session_subscribed(_, _, emqx_types:subopts(), _) -> any().
on_session_subscribed(_, _, #{share := ShareName}, _) when ShareName =/= undefined ->
ok;
@ -118,9 +118,9 @@ on_message_publish(Msg = #message{flags = #{retain := true}}, Context) ->
on_message_publish(Msg, _) ->
{ok, Msg}.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% @doc Start the retainer
-spec start_link() -> emqx_types:startlink_ret().
@ -169,9 +169,9 @@ call(Req) ->
stats_fun() ->
gen_server:cast(?MODULE, ?FUNCTION_NAME).
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
-spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}.
get_basic_usage_info() ->
@ -183,9 +183,9 @@ get_basic_usage_info() ->
#{retained_messages => 0}
end.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
init([]) ->
erlang:process_flag(trap_exit, true),
@ -248,9 +248,9 @@ terminate(_Reason, #{clear_timer := ClearTimer}) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%%------------------------------------------------------------------------------
-spec new_state() -> state().
new_state() ->
#{

View File

@ -18,6 +18,7 @@
-behaviour(gen_server).
-behaviour(emqx_config_handler).
-behaiour(emqx_config_backup).
-include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl").
@ -78,6 +79,11 @@
code_change/3
]).
%% Data backup
-export([
import_config/1
]).
-define(RULE_ENGINE, ?MODULE).
-define(T_CALL, infinity).
@ -105,7 +111,7 @@
start_link() ->
gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []).
%%------------------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% The config handler for emqx_rule_engine
%%------------------------------------------------------------------------------
post_config_update(?RULE_PATH(RuleId), _Req, NewRule, undefined, _AppEnvs) ->
@ -142,9 +148,9 @@ post_config_update([rule_engine], _Req, #{rules := NewRules}, #{rules := OldRule
{error, Error}
end.
%%------------------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% APIs for rules
%%------------------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
-spec load_rules() -> ok.
load_rules() ->
@ -185,9 +191,9 @@ delete_rule(RuleId) when is_binary(RuleId) ->
insert_rule(Rule) ->
gen_server:call(?RULE_ENGINE, {insert_rule, Rule}, ?T_CALL).
%%------------------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% Rule Management
%%------------------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
-spec get_rules() -> [rule()].
get_rules() ->
@ -301,9 +307,9 @@ unload_hooks_for_rule(#{id := Id, from := Topics}) ->
Topics
).
%%------------------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% Telemetry helper functions
%%------------------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
-spec get_basic_usage_info() ->
#{
@ -362,9 +368,27 @@ tally_referenced_bridges(BridgeIDs, Acc0) ->
BridgeIDs
).
%%------------------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% Data backup
%%----------------------------------------------------------------------------------------
import_config(#{<<"rule_engine">> := #{<<"rules">> := NewRules} = RuleEngineConf}) ->
OldRules = emqx:get_raw_config(?KEY_PATH, #{}),
RuleEngineConf1 = RuleEngineConf#{<<"rules">> => maps:merge(OldRules, NewRules)},
case emqx_conf:update([rule_engine], RuleEngineConf1, #{override_to => cluster}) of
{ok, #{raw_config := #{<<"rules">> := NewRawRules}}} ->
Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawRules, OldRules)),
ChangedPaths = [?RULE_PATH(Id) || Id <- maps:keys(Changed)],
{ok, #{root_key => rule_engine, changed => ChangedPaths}};
Error ->
{error, #{root_key => rule_engine, reason => Error}}
end;
import_config(_RawConf) ->
{ok, #{root_key => rule_engine, changed => []}}.
%%----------------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
init([]) ->
_TableId = ets:new(?KV_TAB, [
@ -404,9 +428,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
%%----------------------------------------------------------------------------------------
parse_and_insert(Params = #{id := RuleId, sql := Sql, actions := Actions}, CreatedAt) ->
case emqx_rule_sqlparser:parse(Sql) of

View File

@ -56,7 +56,8 @@
safe_to_existing_atom/2,
pub_props_to_packet/1,
safe_filename/1,
diff_lists/3
diff_lists/3,
merge_lists/3
]).
-export([
@ -819,6 +820,42 @@ diff_lists(New, Old, KeyFunc) when is_list(New) andalso is_list(Old) ->
changed => lists:reverse(Changed)
}.
%% @doc Merges two lists preserving the original order of elements in both lists.
%% KeyFunc must extract a unique key from each element.
%% If two keys exist in both lists, the value in List1 is superseded by the value in List2, but
%% the element position in the result list will equal its position in List1.
%% Example:
%% emqx_utils:merge_append_lists(
%% [#{id => a, val => old}, #{id => b, val => old}],
%% [#{id => a, val => new}, #{id => c}, #{id => b, val => new}, #{id => d}],
%% fun(#{id := Id}) -> Id end).
%% [#{id => a,val => new},
%% #{id => b,val => new},
%% #{id => c},
%% #{id => d}]
-spec merge_lists(list(T), list(T), KeyFunc) -> list(T) when
KeyFunc :: fun((T) -> any()),
T :: any().
merge_lists(List1, List2, KeyFunc) ->
WithKeysList2 = lists:map(fun(E) -> {KeyFunc(E), E} end, List2),
WithKeysList1 = lists:map(
fun(E) ->
K = KeyFunc(E),
case lists:keyfind(K, 1, WithKeysList2) of
false -> {K, E};
WithKey1 -> WithKey1
end
end,
List1
),
NewWithKeysList2 = lists:filter(
fun({K, _}) ->
not lists:keymember(K, 1, WithKeysList1)
end,
WithKeysList2
),
[E || {_, E} <- WithKeysList1 ++ NewWithKeysList2].
search(_ExpectValue, _KeyFunc, []) ->
false;
search(ExpectValue, KeyFunc, [Item | List]) ->

View File

@ -0,0 +1,4 @@
Implement configuration and user data import/export CLI.
The `emqx ctl export` and `emqx ctl import` commands allow to export configuration and built-in database
data from a running EMQX cluster and later import it to the same or another running EMQX cluster.

View File

@ -5,6 +5,7 @@
-behaviour(gen_server).
-behaviour(emqx_config_handler).
-behaviour(emqx_config_backup).
-include("emqx_ee_schema_registry.hrl").
-include_lib("emqx/include/logger.hrl").
@ -13,9 +14,7 @@
%% API
-export([
start_link/0,
get_serde/1,
add_schema/2,
get_schema/1,
delete_schema/1,
@ -34,6 +33,11 @@
%% `emqx_config_handler' API
-export([post_config_update/5]).
%% Data backup
-export([
import_config/1
]).
-type schema() :: #{
type := serde_type(),
source := binary(),
@ -129,7 +133,50 @@ post_config_update(
{error, Reason, SerdesToRollback} ->
lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
{error, Reason}
end.
end;
post_config_update(?CONF_KEY_PATH, _Cmd, NewConf = #{schemas := NewSchemas}, OldConf, _AppEnvs) ->
OldSchemas = maps:get(schemas, OldConf, #{}),
#{
added := Added,
changed := Changed0,
removed := Removed
} = emqx_utils_maps:diff_maps(NewSchemas, OldSchemas),
Changed = maps:map(fun(_N, {_Old, New}) -> New end, Changed0),
RemovedNames = maps:keys(Removed),
case RemovedNames of
[] ->
ok;
_ ->
async_delete_serdes(RemovedNames)
end,
SchemasToBuild = maps:to_list(maps:merge(Changed, Added)),
case build_serdes(SchemasToBuild) of
ok ->
{ok, NewConf};
{error, Reason, SerdesToRollback} ->
lists:foreach(fun ensure_serde_absent/1, SerdesToRollback),
{error, Reason}
end;
post_config_update(_Path, _Cmd, NewConf, _OldConf, _AppEnvs) ->
{ok, NewConf}.
%%-------------------------------------------------------------------------------------------------
%% Data backup
%%-------------------------------------------------------------------------------------------------
import_config(#{<<"schema_registry">> := #{<<"schemas">> := Schemas} = SchemaRegConf}) ->
OldSchemas = emqx:get_raw_config([?CONF_KEY_ROOT, schemas], #{}),
SchemaRegConf1 = SchemaRegConf#{<<"schemas">> => maps:merge(OldSchemas, Schemas)},
case emqx_conf:update(?CONF_KEY_PATH, SchemaRegConf1, #{override_to => cluster}) of
{ok, #{raw_config := #{<<"schemas">> := NewRawSchemas}}} ->
Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawSchemas, OldSchemas)),
ChangedPaths = [[?CONF_KEY_ROOT, schemas, Name] || Name <- maps:keys(Changed)],
{ok, #{root_key => ?CONF_KEY_ROOT, changed => ChangedPaths}};
Error ->
{error, #{root_key => ?CONF_KEY_ROOT, reason => Error}}
end;
import_config(_RawConf) ->
{ok, #{root_key => ?CONF_KEY_ROOT, changed => []}}.
%%-------------------------------------------------------------------------------------------------
%% `gen_server' API

View File

@ -11,9 +11,13 @@
start(_StartType, _StartArgs) ->
ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity),
%% HTTP API handler
emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry),
%% Conf load / data import handler
emqx_conf:add_handler(?CONF_KEY_PATH, emqx_ee_schema_registry),
emqx_ee_schema_registry_sup:start_link().
stop(_State) ->
emqx_conf:remove_handler([?CONF_KEY_ROOT, schemas, '?']),
emqx_conf:remove_handler(?CONF_KEY_PATH),
ok.

View File

@ -700,3 +700,34 @@ t_cluster_serde_build(Config) ->
]
),
ok.
t_import_config(_Config) ->
RawConf = #{
<<"schema_registry">> =>
#{
<<"schemas">> =>
#{
<<"my_avro_schema">> =>
#{
<<"description">> => <<"My Avro Schema">>,
<<"source">> =>
<<"{\"type\":\"record\",\"fields\":[{\"type\":\"int\",\"name\":\"i\"},{\"type\":\"string\",\"name\":\"s\"}]}">>,
<<"type">> => <<"avro">>
}
}
}
},
RawConf1 = emqx_utils_maps:deep_put(
[<<"schema_registry">>, <<"schemas">>, <<"my_avro_schema">>, <<"description">>],
RawConf,
<<"Updated description">>
),
Path = [schema_registry, schemas, <<"my_avro_schema">>],
?assertEqual(
{ok, #{root_key => schema_registry, changed => []}},
emqx_ee_schema_registry:import_config(RawConf)
),
?assertEqual(
{ok, #{root_key => schema_registry, changed => [Path]}},
emqx_ee_schema_registry:import_config(RawConf1)
).