fix(data_backup): re-index retained messages after importing

This commit is contained in:
Serge Tupchii 2024-06-18 16:14:13 +03:00
parent 6bde6aa711
commit 60882a616e
6 changed files with 103 additions and 5 deletions

View File

@ -18,6 +18,8 @@
-type traverse_break_reason() :: over | migrate.
-type opts() :: #{print_fun => fun((io:format(), [term()]) -> ok)}.
-callback backup_tables() -> [mria:table()].
%% validate the backup
@ -31,6 +33,9 @@
-callback migrate_mnesia_backup(tuple()) -> {ok, tuple()} | {error, term()}.
-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1]).
%% NOTE: currently, this is called only when the table has been restored successfully.
-callback on_backup_table_imported(mria:table(), opts()) -> ok | {error, term()}.
-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1, on_backup_table_imported/2]).
-export_type([traverse_break_reason/0]).

View File

@ -549,6 +549,8 @@ import_mnesia_tabs(BackupDir, Opts) ->
)
).
-spec import_mnesia_tab(file:filename_all(), module(), mria:table(), map()) ->
ok | {ok, no_backup_file} | {error, term()} | no_return().
import_mnesia_tab(BackupDir, Mod, TabName, Opts) ->
MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName),
case filelib:is_regular(MnesiaBackupFileName) of
@ -572,7 +574,7 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) ->
Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]),
case Restored of
{atomic, [TabName]} ->
ok;
on_table_imported(Mod, TabName, Opts);
RestoreErr ->
?SLOG(error, #{
msg => "failed_to_restore_mnesia_backup",
@ -598,6 +600,27 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) ->
_ = file:delete(MnesiaBackupFileName)
end.
on_table_imported(Mod, Tab, Opts) ->
case erlang:function_exported(Mod, on_backup_table_imported, 2) of
true ->
try
Mod:on_backup_table_imported(Tab, Opts)
catch
Class:Reason:Stack ->
?SLOG(error, #{
msg => "post_database_import_callback_failed",
table => Tab,
module => Mod,
exception => Class,
reason => Reason,
stacktrace => Stack
}),
{error, Reason}
end;
false ->
ok
end.
%% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema.
%% Looks like there is no clean way to abort traversal without triggering any error reporting,
%% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided...

View File

@ -114,6 +114,36 @@ t_cluster_hocon_import_mqtt_subscribers_retainer_messages(Config) ->
end,
ok.
t_import_retained_messages(Config) ->
FName = "emqx-export-ce-retained-msgs-test.tar.gz",
BackupFile = filename:join(?config(data_dir, Config), FName),
Exp = {ok, #{db_errors => #{}, config_errors => #{}}},
?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)),
%% verify that retainer messages are imported
?assertMatch(
{ok, [#message{payload = <<"Hi 1!!!">>}]},
emqx_retainer:read_message(<<"t/backup-retainer/test1">>)
),
?assertMatch(
{ok, [#message{payload = <<"Hi 5!!!">>}]},
emqx_retainer:read_message(<<"t/backup-retainer/test5">>)
),
%% verify that messages are re-indexed
?assertMatch(
{ok, _, [
#message{payload = <<"Hi 5!!!">>},
#message{payload = <<"Hi 4!!!">>},
#message{payload = <<"Hi 3!!!">>},
#message{payload = <<"Hi 2!!!">>},
#message{payload = <<"Hi 1!!!">>}
]},
emqx_retainer:page_read(<<"t/backup-retainer/#">>, 1, 5)
),
%% Export and import again
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)).
t_cluster_hocon_export_import(Config) ->
RawConfBeforeImport = emqx:get_raw_config([]),
BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP),

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [
{description, "EMQX Retainer"},
% strict semver, bump manually!
{vsn, "5.0.23"},
{vsn, "5.0.24"},
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]},

View File

@ -56,7 +56,10 @@
-export([populate_index_meta/0]).
-export([reindex/3]).
-export([backup_tables/0]).
-export([
backup_tables/0,
on_backup_table_imported/2
]).
-record(retained_message, {topic, msg, expiry_time}).
-record(retained_index, {key, expiry_time}).
@ -80,8 +83,45 @@ topics() ->
%%--------------------------------------------------------------------
%% Data backup
%%--------------------------------------------------------------------
backup_tables() ->
[?TAB_MESSAGE].
[?TAB_MESSAGE || is_enabled()].
on_backup_table_imported(?TAB_MESSAGE, Opts) ->
case is_enabled() of
true ->
maybe_print("Starting reindexing retained messages ~n", [], Opts),
Res = reindex(false, mk_status_fun(Opts)),
maybe_print("Reindexing retained messages finished~n", [], Opts),
Res;
false ->
ok
end;
on_backup_table_imported(_Tab, _Opts) ->
ok.
mk_status_fun(Opts) ->
fun(Done) ->
log_status(Done),
maybe_print("Reindexed ~p messages~n", [Done], Opts)
end.
maybe_print(Fmt, Args, #{print_fun := Fun}) when is_function(Fun, 2) ->
Fun(Fmt, Args);
maybe_print(_Fmt, _Args, _Opts) ->
ok.
log_status(Done) ->
?SLOG(
info,
#{
msg => "retainer_message_record_reindexing_progress",
done => Done
}
).
is_enabled() ->
emqx_retainer:enabled() andalso emqx_retainer:backend_module() =:= ?MODULE.
%%--------------------------------------------------------------------
%% emqx_retainer callbacks