feat(backup): add migration mechanism when import backup data

This commit is contained in:
firest 2023-10-17 14:38:16 +08:00
parent db3915d472
commit 26ec860d96
4 changed files with 180 additions and 41 deletions

View File

@ -16,4 +16,21 @@
-module(emqx_db_backup). -module(emqx_db_backup).
-type traverse_break_reason() :: over | migrate.
-callback backup_tables() -> [mria:table()]. -callback backup_tables() -> [mria:table()].
%% validate the backup
%% return `ok` to traverse the next item
%% return `{ok, over}` to finish the traverse
%% return `{ok, migrate}` to call the migration callback
-callback validate_mnesia_backup(tuple()) ->
ok
| {ok, traverse_break_reason()}
| {error, term()}.
-callback migrate_mnesia_backup(tuple()) -> {ok, tuple()} | {error, term()}.
-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1]).
-export_type([traverse_break_reason/0]).

View File

@ -38,7 +38,7 @@
-export([authorize/4]). -export([authorize/4]).
-export([post_config_update/5]). -export([post_config_update/5]).
-export([backup_tables/0]). -export([backup_tables/0, validate_mnesia_backup/1, migrate_mnesia_backup/1]).
%% Internal exports (RPC) %% Internal exports (RPC)
-export([ -export([
@ -84,6 +84,35 @@ mnesia(boot) ->
backup_tables() -> [?APP]. backup_tables() -> [?APP].
validate_mnesia_backup({schema, _Tab, CreateList} = Schema) ->
case emqx_mgmt_data_backup:default_validate_mnesia_backup(Schema) of
ok ->
{ok, over};
_ ->
case proplists:get_value(attributes, CreateList) of
[name, api_key, api_secret_hash, enable, desc, expired_at, created_at] ->
{ok, migrate};
Fields ->
{error, {unknow_fields, Fields}}
end
end;
validate_mnesia_backup(_Other) ->
ok.
migrate_mnesia_backup({schema, Tab, CreateList}) ->
case proplists:get_value(attributes, CreateList) of
[name, api_key, api_secret_hash, enable, desc, expired_at, created_at] = Fields ->
NewFields = Fields ++ [role, extra],
CreateList2 = lists:keyreplace(
attributes, 1, CreateList, {attributes, NewFields}
),
{ok, {schema, Tab, CreateList2}};
Fields ->
{error, {unknow_fields, Fields}}
end;
migrate_mnesia_backup(Data) ->
{ok, do_table_migrate(Data)}.
post_config_update([api_key], _Req, NewConf, _OldConf, _AppEnvs) -> post_config_update([api_key], _Req, NewConf, _OldConf, _AppEnvs) ->
#{bootstrap_file := File} = NewConf, #{bootstrap_file := File} = NewConf,
case init_bootstrap_file(File) of case init_bootstrap_file(File) of
@ -203,7 +232,9 @@ ensure_not_undefined(New, _Old) -> New.
to_map(Apps) when is_list(Apps) -> to_map(Apps) when is_list(Apps) ->
[to_map(App) || App <- Apps]; [to_map(App) || App <- Apps];
to_map(#?APP{name = N, api_key = K, enable = E, expired_at = ET, created_at = CT, desc = D}) -> to_map(#?APP{
name = N, api_key = K, enable = E, expired_at = ET, created_at = CT, desc = D, role = Role
}) ->
#{ #{
name => N, name => N,
api_key => K, api_key => K,
@ -211,7 +242,8 @@ to_map(#?APP{name = N, api_key = K, enable = E, expired_at = ET, created_at = CT
expired_at => ET, expired_at => ET,
created_at => CT, created_at => CT,
desc => D, desc => D,
expired => is_expired(ET) expired => is_expired(ET),
role => Role
}. }.
is_expired(undefined) -> false; is_expired(undefined) -> false;
@ -397,24 +429,22 @@ maybe_migrate_table(Fields) ->
true -> true ->
ok; ok;
false -> false ->
TransFun = fun(App) -> TransFun = fun do_table_migrate/1,
case App of
{?APP, Name, Key, Hash, Enable, Desc, ExpiredAt, CreatedAt} ->
#?APP{
name = Name,
api_key = Key,
api_secret_hash = Hash,
enable = Enable,
desc = Desc,
expired_at = ExpiredAt,
created_at = CreatedAt,
role = ?ROLE_API_VIEWER,
extra = #{}
};
#?APP{} ->
App
end
end,
{atomic, ok} = mnesia:transform_table(?APP, TransFun, Fields, ?APP), {atomic, ok} = mnesia:transform_table(?APP, TransFun, Fields, ?APP),
ok ok
end. end.
do_table_migrate({?APP, Name, Key, Hash, Enable, Desc, ExpiredAt, CreatedAt}) ->
#?APP{
name = Name,
api_key = Key,
api_secret_hash = Hash,
enable = Enable,
desc = Desc,
expired_at = ExpiredAt,
created_at = CreatedAt,
role = ?ROLE_API_DEFAULT,
extra = #{}
};
do_table_migrate(#?APP{} = App) ->
App.

View File

@ -24,6 +24,8 @@
format_error/1 format_error/1
]). ]).
-export([default_validate_mnesia_backup/1]).
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
@ -223,7 +225,15 @@ export_cluster_hocon(TarDescriptor, BackupBaseName, Opts) ->
export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts) -> export_mnesia_tabs(TarDescriptor, BackupName, BackupBaseName, Opts) ->
maybe_print("Exporting built-in database...~n", [], Opts), maybe_print("Exporting built-in database...~n", [], Opts),
lists:foreach( lists:foreach(
fun(Tab) -> export_mnesia_tab(TarDescriptor, Tab, BackupName, BackupBaseName, Opts) end, fun(Mod) ->
Tabs = Mod:backup_tables(),
lists:foreach(
fun(Tab) ->
export_mnesia_tab(TarDescriptor, Tab, BackupName, BackupBaseName, Opts)
end,
Tabs
)
end,
tabs_to_backup() tabs_to_backup()
). ).
@ -259,7 +269,7 @@ tabs_to_backup() ->
-endif. -endif.
mnesia_tabs_to_backup() -> mnesia_tabs_to_backup() ->
lists:flatten([M:backup_tables() || M <- find_behaviours(emqx_db_backup)]). lists:flatten([M || M <- find_behaviours(emqx_db_backup)]).
mnesia_backup_name(Path, TabName) -> mnesia_backup_name(Path, TabName) ->
filename:join([Path, ?BACKUP_MNESIA_DIR, atom_to_list(TabName)]). filename:join([Path, ?BACKUP_MNESIA_DIR, atom_to_list(TabName)]).
@ -364,36 +374,42 @@ import_mnesia_tabs(BackupDir, Opts) ->
maybe_print("Importing built-in database...~n", [], Opts), maybe_print("Importing built-in database...~n", [], Opts),
filter_errors( filter_errors(
lists:foldr( lists:foldr(
fun(Tab, Acc) -> Acc#{Tab => import_mnesia_tab(BackupDir, Tab, Opts)} end, fun(Mod, Acc) ->
Tabs = Mod:backup_tables(),
lists:foldr(
fun(Tab, InAcc) ->
InAcc#{Tab => import_mnesia_tab(BackupDir, Mod, Tab, Opts)}
end,
Acc,
Tabs
)
end,
#{}, #{},
tabs_to_backup() tabs_to_backup()
) )
). ).
import_mnesia_tab(BackupDir, TabName, Opts) -> import_mnesia_tab(BackupDir, Mod, TabName, Opts) ->
MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName), MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName),
case filelib:is_regular(MnesiaBackupFileName) of case filelib:is_regular(MnesiaBackupFileName) of
true -> true ->
maybe_print("Importing ~p database table...~n", [TabName], Opts), maybe_print("Importing ~p database table...~n", [TabName], Opts),
restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts); restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts);
false -> false ->
maybe_print("No backup file for ~p database table...~n", [TabName], Opts), maybe_print("No backup file for ~p database table...~n", [TabName], Opts),
?SLOG(info, #{msg => "missing_mnesia_backup", table => TabName, backup => BackupDir}), ?SLOG(info, #{msg => "missing_mnesia_backup", table => TabName, backup => BackupDir}),
ok ok
end. end.
restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) -> restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) ->
Validated = Validated = validate_mnesia_backup(MnesiaBackupFileName, Mod),
catch mnesia:traverse_backup(
MnesiaBackupFileName, mnesia_backup, dummy, read_only, fun validate_mnesia_backup/2, 0
),
try try
case Validated of case Validated of
{ok, _} -> {ok, #{backup_file := BackupFile}} ->
%% As we use keep_tables option, we don't need to modify 'copies' (nodes) %% As we use keep_tables option, we don't need to modify 'copies' (nodes)
%% in a backup file before restoring it, as `mnsia:restore/2` will ignore %% in a backup file before restoring it, as `mnsia:restore/2` will ignore
%% backed-up schema and keep the current table schema unchanged %% backed-up schema and keep the current table schema unchanged
Restored = mnesia:restore(MnesiaBackupFileName, [{default_op, keep_tables}]), Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]),
case Restored of case Restored of
{atomic, [TabName]} -> {atomic, [TabName]} ->
ok; ok;
@ -425,17 +441,81 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) ->
%% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema. %% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema.
%% Looks like there is no clean way to abort traversal without triggering any error reporting, %% Looks like there is no clean way to abort traversal without triggering any error reporting,
%% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided... %% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided...
validate_mnesia_backup({schema, Tab, CreateList} = Schema, Acc) -> validate_mnesia_backup(MnesiaBackupFileName, Mod) ->
Init = #{backup_file => MnesiaBackupFileName},
Validated =
catch mnesia:traverse_backup(
MnesiaBackupFileName,
mnesia_backup,
dummy,
read_only,
mnesia_backup_validator(Mod),
Init
),
case Validated of
ok ->
{ok, Init};
{error, {_, over}} ->
{ok, Init};
{error, {_, migrate}} ->
migrate_mnesia_backup(MnesiaBackupFileName, Mod, Init);
Error ->
Error
end.
%% if the module has validator callback, use it else use the default
mnesia_backup_validator(Mod) ->
Validator =
case erlang:function_exported(Mod, validate_mnesia_backup, 1) of
true ->
fun Mod:validate_mnesia_backup/1;
_ ->
fun default_validate_mnesia_backup/1
end,
fun(Schema, Acc) ->
case Validator(Schema) of
ok ->
{[Schema], Acc};
{ok, Break} ->
throw({error, Break});
Error ->
throw(Error)
end
end.
default_validate_mnesia_backup({schema, Tab, CreateList}) ->
ImportAttributes = proplists:get_value(attributes, CreateList), ImportAttributes = proplists:get_value(attributes, CreateList),
Attributes = mnesia:table_info(Tab, attributes), Attributes = mnesia:table_info(Tab, attributes),
case ImportAttributes =/= Attributes of case ImportAttributes == Attributes of
true -> true ->
throw({error, different_table_schema}); ok;
false -> false ->
{[Schema], Acc} {error, different_table_schema}
end; end;
validate_mnesia_backup(Other, Acc) -> default_validate_mnesia_backup(_Other) ->
{[Other], Acc}. ok.
migrate_mnesia_backup(MnesiaBackupFileName, Mod, Acc) ->
case erlang:function_exported(Mod, migrate_mnesia_backup, 1) of
true ->
MigrateFile = MnesiaBackupFileName ++ ".migrate",
Migrator = fun(Schema, InAcc) ->
case Mod:migrate_mnesia_backup(Schema) of
{ok, NewSchema} ->
{[NewSchema], InAcc};
Error ->
throw(Error)
end
end,
catch mnesia:traverse_backup(
MnesiaBackupFileName,
MigrateFile,
Migrator,
Acc#{backup_file := MigrateFile}
);
_ ->
{error, no_migrator}
end.
extract_backup(BackupFileName) -> extract_backup(BackupFileName) ->
BackupDir = root_backup_dir(), BackupDir = root_backup_dir(),

View File

@ -23,6 +23,7 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(ROLE_SUPERUSER, <<"administrator">>). -define(ROLE_SUPERUSER, <<"administrator">>).
-define(ROLE_API_SUPERUSER, <<"api_administrator">>).
-define(BOOTSTRAP_BACKUP, "emqx-export-test-bootstrap-ce.tar.gz"). -define(BOOTSTRAP_BACKUP, "emqx-export-test-bootstrap-ce.tar.gz").
all() -> all() ->
@ -56,7 +57,7 @@ init_per_testcase(TC = t_verify_imported_mnesia_tab_on_cluster, Config) ->
[{cluster, cluster(TC, Config)} | setup(TC, Config)]; [{cluster, cluster(TC, Config)} | setup(TC, Config)];
init_per_testcase(t_mnesia_bad_tab_schema, Config) -> init_per_testcase(t_mnesia_bad_tab_schema, Config) ->
meck:new(emqx_mgmt_data_backup, [passthrough]), meck:new(emqx_mgmt_data_backup, [passthrough]),
meck:expect(TC = emqx_mgmt_data_backup, mnesia_tabs_to_backup, 0, [data_backup_test]), meck:expect(TC = emqx_mgmt_data_backup, mnesia_tabs_to_backup, 0, [?MODULE]),
setup(TC, Config); setup(TC, Config);
init_per_testcase(TC, Config) -> init_per_testcase(TC, Config) ->
setup(TC, Config). setup(TC, Config).
@ -99,7 +100,15 @@ t_cluster_hocon_export_import(Config) ->
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)), ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)),
?assertEqual(RawConfAfterImport, emqx:get_raw_config([])), ?assertEqual(RawConfAfterImport, emqx:get_raw_config([])),
%% lookup file inside <data_dir>/backup %% lookup file inside <data_dir>/backup
?assertEqual(Exp, emqx_mgmt_data_backup:import(filename:basename(FileName))). ?assertEqual(Exp, emqx_mgmt_data_backup:import(filename:basename(FileName))),
%% backup data migration test
?assertMatch([_, _, _], ets:tab2list(emqx_app)),
?assertMatch(
{ok, #{name := <<"key_to_export2">>, role := ?ROLE_API_SUPERUSER}},
emqx_mgmt_auth:read(<<"key_to_export2">>)
),
ok.
t_ee_to_ce_backup(Config) -> t_ee_to_ce_backup(Config) ->
case emqx_release:edition() of case emqx_release:edition() of
@ -329,6 +338,9 @@ t_verify_imported_mnesia_tab_on_cluster(Config) ->
timer:sleep(3000), timer:sleep(3000),
?assertEqual(AllUsers, lists:sort(rpc:call(ReplicantNode, mnesia, dirty_all_keys, [Tab]))). ?assertEqual(AllUsers, lists:sort(rpc:call(ReplicantNode, mnesia, dirty_all_keys, [Tab]))).
backup_tables() ->
[data_backup_test].
t_mnesia_bad_tab_schema(_Config) -> t_mnesia_bad_tab_schema(_Config) ->
OldAttributes = [id, name, description], OldAttributes = [id, name, description],
ok = create_test_tab(OldAttributes), ok = create_test_tab(OldAttributes),