diff --git a/apps/emqx/src/bhvrs/emqx_db_backup.erl b/apps/emqx/src/bhvrs/emqx_db_backup.erl index 310baeb9b..4b8107565 100644 --- a/apps/emqx/src/bhvrs/emqx_db_backup.erl +++ b/apps/emqx/src/bhvrs/emqx_db_backup.erl @@ -18,6 +18,8 @@ -type traverse_break_reason() :: over | migrate. +-type opts() :: #{print_fun => fun((io:format(), [term()]) -> ok)}. + -callback backup_tables() -> [mria:table()]. %% validate the backup @@ -31,6 +33,9 @@ -callback migrate_mnesia_backup(tuple()) -> {ok, tuple()} | {error, term()}. --optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1]). +%% NOTE: currently, this is called only when the table has been restored successfully. +-callback on_backup_table_imported(mria:table(), opts()) -> ok | {error, term()}. + +-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1, on_backup_table_imported/2]). -export_type([traverse_break_reason/0]). diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index c919aa506..50ddc33ca 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -408,9 +408,13 @@ export_mnesia_tab(TarDescriptor, TabName, BackupName, BackupBaseName, Opts) -> do_export_mnesia_tab(TabName, BackupName) -> Node = node(), try - {ok, TabName, [Node]} = mnesia:activate_checkpoint( - [{name, TabName}, {min, [TabName]}, {allow_remote, false}] - ), + Opts0 = [{name, TabName}, {min, [TabName]}, {allow_remote, false}], + Opts = + case mnesia:table_info(TabName, storage_type) of + ram_copies -> [{ram_overrides_dump, true} | Opts0]; + _ -> Opts0 + end, + {ok, TabName, [Node]} = mnesia:activate_checkpoint(Opts), MnesiaBackupName = mnesia_backup_name(BackupName, TabName), ok = filelib:ensure_dir(MnesiaBackupName), ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName), @@ -549,6 +553,8 @@ import_mnesia_tabs(BackupDir, Opts) -> ) ). +-spec import_mnesia_tab(file:filename_all(), module(), mria:table(), map()) -> + ok | {ok, no_backup_file} | {error, term()} | no_return(). import_mnesia_tab(BackupDir, Mod, TabName, Opts) -> MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName), case filelib:is_regular(MnesiaBackupFileName) of @@ -572,7 +578,7 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) -> Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]), case Restored of {atomic, [TabName]} -> - ok; + on_table_imported(Mod, TabName, Opts); RestoreErr -> ?SLOG(error, #{ msg => "failed_to_restore_mnesia_backup", @@ -598,6 +604,27 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) -> _ = file:delete(MnesiaBackupFileName) end. +on_table_imported(Mod, Tab, Opts) -> + case erlang:function_exported(Mod, on_backup_table_imported, 2) of + true -> + try + Mod:on_backup_table_imported(Tab, Opts) + catch + Class:Reason:Stack -> + ?SLOG(error, #{ + msg => "post_database_import_callback_failed", + table => Tab, + module => Mod, + exception => Class, + reason => Reason, + stacktrace => Stack + }), + {error, Reason} + end; + false -> + ok + end. + %% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema. %% Looks like there is no clean way to abort traversal without triggering any error reporting, %% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided... diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index fee392479..4e584824d 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -19,6 +19,7 @@ -compile(nowarn_export_all). -include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -114,6 +115,64 @@ t_cluster_hocon_import_mqtt_subscribers_retainer_messages(Config) -> end, ok. +t_import_retained_messages(Config) -> + FName = "emqx-export-ce-retained-msgs-test.tar.gz", + BackupFile = filename:join(?config(data_dir, Config), FName), + Exp = {ok, #{db_errors => #{}, config_errors => #{}}}, + ?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)), + %% verify that retainer messages are imported + ?assertMatch( + {ok, [#message{payload = <<"Hi 1!!!">>}]}, + emqx_retainer:read_message(<<"t/backup-retainer/test1">>) + ), + ?assertMatch( + {ok, [#message{payload = <<"Hi 5!!!">>}]}, + emqx_retainer:read_message(<<"t/backup-retainer/test5">>) + ), + + %% verify that messages are re-indexed + ?assertMatch( + {ok, _, [ + #message{payload = <<"Hi 5!!!">>}, + #message{payload = <<"Hi 4!!!">>}, + #message{payload = <<"Hi 3!!!">>}, + #message{payload = <<"Hi 2!!!">>}, + #message{payload = <<"Hi 1!!!">>} + ]}, + emqx_retainer:page_read(<<"t/backup-retainer/#">>, 1, 5) + ), + %% Export and import again + {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), + ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)). + +t_export_ram_retained_messages(_Config) -> + {ok, _} = emqx_retainer:update_config( + #{ + <<"enable">> => true, + <<"backend">> => #{<<"storage_type">> => <<"ram">>} + } + ), + ?assertEqual(ram_copies, mnesia:table_info(emqx_retainer_message, storage_type)), + Topic = <<"t/backup_test_export_retained_ram/1">>, + Payload = <<"backup_test_retained_ram">>, + Msg = emqx_message:make( + <<"backup_test">>, + ?QOS_0, + Topic, + Payload, + #{retain => true}, + #{} + ), + _ = emqx_broker:publish(Msg), + {ok, #{filename := BackupFileName}} = emqx_mgmt_data_backup:export(), + ok = emqx_retainer:delete(Topic), + ?assertEqual({ok, []}, emqx_retainer:read_message(Topic)), + ?assertEqual( + {ok, #{db_errors => #{}, config_errors => #{}}}, + emqx_mgmt_data_backup:import(BackupFileName) + ), + ?assertMatch({ok, [#message{payload = Payload}]}, emqx_retainer:read_message(Topic)). + t_cluster_hocon_export_import(Config) -> RawConfBeforeImport = emqx:get_raw_config([]), BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP), diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-ce-retained-msgs-test.tar.gz b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-ce-retained-msgs-test.tar.gz new file mode 100644 index 000000000..6442a03a1 Binary files /dev/null and b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-ce-retained-msgs-test.tar.gz differ diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 7bcde8d50..4f778769d 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.23"}, + {vsn, "5.0.24"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index daaa776b7..cb9aa14f7 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -56,7 +56,10 @@ -export([populate_index_meta/0]). -export([reindex/3]). --export([backup_tables/0]). +-export([ + backup_tables/0, + on_backup_table_imported/2 +]). -record(retained_message, {topic, msg, expiry_time}). -record(retained_index, {key, expiry_time}). @@ -80,8 +83,45 @@ topics() -> %%-------------------------------------------------------------------- %% Data backup %%-------------------------------------------------------------------- + backup_tables() -> - [?TAB_MESSAGE]. + [?TAB_MESSAGE || is_enabled()]. + +on_backup_table_imported(?TAB_MESSAGE, Opts) -> + case is_enabled() of + true -> + maybe_print("Starting reindexing retained messages ~n", [], Opts), + Res = reindex(false, mk_status_fun(Opts)), + maybe_print("Reindexing retained messages finished~n", [], Opts), + Res; + false -> + ok + end; +on_backup_table_imported(_Tab, _Opts) -> + ok. + +mk_status_fun(Opts) -> + fun(Done) -> + log_status(Done), + maybe_print("Reindexed ~p messages~n", [Done], Opts) + end. + +maybe_print(Fmt, Args, #{print_fun := Fun}) when is_function(Fun, 2) -> + Fun(Fmt, Args); +maybe_print(_Fmt, _Args, _Opts) -> + ok. + +log_status(Done) -> + ?SLOG( + info, + #{ + msg => "retainer_message_record_reindexing_progress", + done => Done + } + ). + +is_enabled() -> + emqx_retainer:enabled() andalso emqx_retainer:backend_module() =:= ?MODULE. %%-------------------------------------------------------------------- %% emqx_retainer callbacks diff --git a/changes/ce/fix-13293.en.md b/changes/ce/fix-13293.en.md new file mode 100644 index 000000000..0d5734e4c --- /dev/null +++ b/changes/ce/fix-13293.en.md @@ -0,0 +1,4 @@ +- Automatically re-index imported retained messages during restoring a data backup file. Previously, it was needed to manually trigger re-indexing with `emqx ctl retainer reindex start` CLI + after the data backup file is imported. + +- Allow exporting retained messages to a backup file if the configured storage_type (`retainer.backend.storage_type`) is `ram`. Previously, retained messages could be exported only if `disc` storage_type was configured.