diff --git a/apps/emqx/src/bhvrs/emqx_db_backup.erl b/apps/emqx/src/bhvrs/emqx_db_backup.erl index 310baeb9b..4b8107565 100644 --- a/apps/emqx/src/bhvrs/emqx_db_backup.erl +++ b/apps/emqx/src/bhvrs/emqx_db_backup.erl @@ -18,6 +18,8 @@ -type traverse_break_reason() :: over | migrate. +-type opts() :: #{print_fun => fun((io:format(), [term()]) -> ok)}. + -callback backup_tables() -> [mria:table()]. %% validate the backup @@ -31,6 +33,9 @@ -callback migrate_mnesia_backup(tuple()) -> {ok, tuple()} | {error, term()}. --optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1]). +%% NOTE: currently, this is called only when the table has been restored successfully. +-callback on_backup_table_imported(mria:table(), opts()) -> ok | {error, term()}. + +-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1, on_backup_table_imported/2]). -export_type([traverse_break_reason/0]). diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index c919aa506..ab0deaa87 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -549,6 +549,8 @@ import_mnesia_tabs(BackupDir, Opts) -> ) ). +-spec import_mnesia_tab(file:filename_all(), module(), mria:table(), map()) -> + ok | {ok, no_backup_file} | {error, term()} | no_return(). import_mnesia_tab(BackupDir, Mod, TabName, Opts) -> MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName), case filelib:is_regular(MnesiaBackupFileName) of @@ -572,7 +574,7 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) -> Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]), case Restored of {atomic, [TabName]} -> - ok; + on_table_imported(Mod, TabName, Opts); RestoreErr -> ?SLOG(error, #{ msg => "failed_to_restore_mnesia_backup", @@ -598,6 +600,27 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) -> _ = file:delete(MnesiaBackupFileName) end. +on_table_imported(Mod, Tab, Opts) -> + case erlang:function_exported(Mod, on_backup_table_imported, 2) of + true -> + try + Mod:on_backup_table_imported(Tab, Opts) + catch + Class:Reason:Stack -> + ?SLOG(error, #{ + msg => "post_database_import_callback_failed", + table => Tab, + module => Mod, + exception => Class, + reason => Reason, + stacktrace => Stack + }), + {error, Reason} + end; + false -> + ok + end. + %% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema. %% Looks like there is no clean way to abort traversal without triggering any error reporting, %% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided... diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index fee392479..fb5d5988c 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -114,6 +114,36 @@ t_cluster_hocon_import_mqtt_subscribers_retainer_messages(Config) -> end, ok. +t_import_retained_messages(Config) -> + FName = "emqx-export-ce-retained-msgs-test.tar.gz", + BackupFile = filename:join(?config(data_dir, Config), FName), + Exp = {ok, #{db_errors => #{}, config_errors => #{}}}, + ?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)), + %% verify that retainer messages are imported + ?assertMatch( + {ok, [#message{payload = <<"Hi 1!!!">>}]}, + emqx_retainer:read_message(<<"t/backup-retainer/test1">>) + ), + ?assertMatch( + {ok, [#message{payload = <<"Hi 5!!!">>}]}, + emqx_retainer:read_message(<<"t/backup-retainer/test5">>) + ), + + %% verify that messages are re-indexed + ?assertMatch( + {ok, _, [ + #message{payload = <<"Hi 5!!!">>}, + #message{payload = <<"Hi 4!!!">>}, + #message{payload = <<"Hi 3!!!">>}, + #message{payload = <<"Hi 2!!!">>}, + #message{payload = <<"Hi 1!!!">>} + ]}, + emqx_retainer:page_read(<<"t/backup-retainer/#">>, 1, 5) + ), + %% Export and import again + {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), + ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)). + t_cluster_hocon_export_import(Config) -> RawConfBeforeImport = emqx:get_raw_config([]), BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP), diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-ce-retained-msgs-test.tar.gz b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-ce-retained-msgs-test.tar.gz new file mode 100644 index 000000000..6442a03a1 Binary files /dev/null and b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-ce-retained-msgs-test.tar.gz differ diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 7bcde8d50..4f778769d 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.23"}, + {vsn, "5.0.24"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index daaa776b7..cb9aa14f7 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -56,7 +56,10 @@ -export([populate_index_meta/0]). -export([reindex/3]). --export([backup_tables/0]). +-export([ + backup_tables/0, + on_backup_table_imported/2 +]). -record(retained_message, {topic, msg, expiry_time}). -record(retained_index, {key, expiry_time}). @@ -80,8 +83,45 @@ topics() -> %%-------------------------------------------------------------------- %% Data backup %%-------------------------------------------------------------------- + backup_tables() -> - [?TAB_MESSAGE]. + [?TAB_MESSAGE || is_enabled()]. + +on_backup_table_imported(?TAB_MESSAGE, Opts) -> + case is_enabled() of + true -> + maybe_print("Starting reindexing retained messages ~n", [], Opts), + Res = reindex(false, mk_status_fun(Opts)), + maybe_print("Reindexing retained messages finished~n", [], Opts), + Res; + false -> + ok + end; +on_backup_table_imported(_Tab, _Opts) -> + ok. + +mk_status_fun(Opts) -> + fun(Done) -> + log_status(Done), + maybe_print("Reindexed ~p messages~n", [Done], Opts) + end. + +maybe_print(Fmt, Args, #{print_fun := Fun}) when is_function(Fun, 2) -> + Fun(Fmt, Args); +maybe_print(_Fmt, _Args, _Opts) -> + ok. + +log_status(Done) -> + ?SLOG( + info, + #{ + msg => "retainer_message_record_reindexing_progress", + done => Done + } + ). + +is_enabled() -> + emqx_retainer:enabled() andalso emqx_retainer:backend_module() =:= ?MODULE. %%-------------------------------------------------------------------- %% emqx_retainer callbacks