Merge pull request #13293 from SergeTupchiy/EMQX-12345-reindex-retained-msgs-on-import

reindex retained msgs on import
This commit is contained in:
SergeTupchiy 2024-06-19 11:23:08 +03:00 committed by GitHub
commit 0d098a01ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 143 additions and 8 deletions

View File

@ -18,6 +18,8 @@
-type traverse_break_reason() :: over | migrate. -type traverse_break_reason() :: over | migrate.
-type opts() :: #{print_fun => fun((io:format(), [term()]) -> ok)}.
-callback backup_tables() -> [mria:table()]. -callback backup_tables() -> [mria:table()].
%% validate the backup %% validate the backup
@ -31,6 +33,9 @@
-callback migrate_mnesia_backup(tuple()) -> {ok, tuple()} | {error, term()}. -callback migrate_mnesia_backup(tuple()) -> {ok, tuple()} | {error, term()}.
-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1]). %% NOTE: currently, this is called only when the table has been restored successfully.
-callback on_backup_table_imported(mria:table(), opts()) -> ok | {error, term()}.
-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1, on_backup_table_imported/2]).
-export_type([traverse_break_reason/0]). -export_type([traverse_break_reason/0]).

View File

@ -408,9 +408,13 @@ export_mnesia_tab(TarDescriptor, TabName, BackupName, BackupBaseName, Opts) ->
do_export_mnesia_tab(TabName, BackupName) -> do_export_mnesia_tab(TabName, BackupName) ->
Node = node(), Node = node(),
try try
{ok, TabName, [Node]} = mnesia:activate_checkpoint( Opts0 = [{name, TabName}, {min, [TabName]}, {allow_remote, false}],
[{name, TabName}, {min, [TabName]}, {allow_remote, false}] Opts =
), case mnesia:table_info(TabName, storage_type) of
ram_copies -> [{ram_overrides_dump, true} | Opts0];
_ -> Opts0
end,
{ok, TabName, [Node]} = mnesia:activate_checkpoint(Opts),
MnesiaBackupName = mnesia_backup_name(BackupName, TabName), MnesiaBackupName = mnesia_backup_name(BackupName, TabName),
ok = filelib:ensure_dir(MnesiaBackupName), ok = filelib:ensure_dir(MnesiaBackupName),
ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName), ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName),
@ -549,6 +553,8 @@ import_mnesia_tabs(BackupDir, Opts) ->
) )
). ).
-spec import_mnesia_tab(file:filename_all(), module(), mria:table(), map()) ->
ok | {ok, no_backup_file} | {error, term()} | no_return().
import_mnesia_tab(BackupDir, Mod, TabName, Opts) -> import_mnesia_tab(BackupDir, Mod, TabName, Opts) ->
MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName), MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName),
case filelib:is_regular(MnesiaBackupFileName) of case filelib:is_regular(MnesiaBackupFileName) of
@ -572,7 +578,7 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) ->
Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]), Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]),
case Restored of case Restored of
{atomic, [TabName]} -> {atomic, [TabName]} ->
ok; on_table_imported(Mod, TabName, Opts);
RestoreErr -> RestoreErr ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_restore_mnesia_backup", msg => "failed_to_restore_mnesia_backup",
@ -598,6 +604,27 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) ->
_ = file:delete(MnesiaBackupFileName) _ = file:delete(MnesiaBackupFileName)
end. end.
on_table_imported(Mod, Tab, Opts) ->
case erlang:function_exported(Mod, on_backup_table_imported, 2) of
true ->
try
Mod:on_backup_table_imported(Tab, Opts)
catch
Class:Reason:Stack ->
?SLOG(error, #{
msg => "post_database_import_callback_failed",
table => Tab,
module => Mod,
exception => Class,
reason => Reason,
stacktrace => Stack
}),
{error, Reason}
end;
false ->
ok
end.
%% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema. %% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema.
%% Looks like there is no clean way to abort traversal without triggering any error reporting, %% Looks like there is no clean way to abort traversal without triggering any error reporting,
%% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided... %% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided...

View File

@ -19,6 +19,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -114,6 +115,64 @@ t_cluster_hocon_import_mqtt_subscribers_retainer_messages(Config) ->
end, end,
ok. ok.
t_import_retained_messages(Config) ->
FName = "emqx-export-ce-retained-msgs-test.tar.gz",
BackupFile = filename:join(?config(data_dir, Config), FName),
Exp = {ok, #{db_errors => #{}, config_errors => #{}}},
?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)),
%% verify that retainer messages are imported
?assertMatch(
{ok, [#message{payload = <<"Hi 1!!!">>}]},
emqx_retainer:read_message(<<"t/backup-retainer/test1">>)
),
?assertMatch(
{ok, [#message{payload = <<"Hi 5!!!">>}]},
emqx_retainer:read_message(<<"t/backup-retainer/test5">>)
),
%% verify that messages are re-indexed
?assertMatch(
{ok, _, [
#message{payload = <<"Hi 5!!!">>},
#message{payload = <<"Hi 4!!!">>},
#message{payload = <<"Hi 3!!!">>},
#message{payload = <<"Hi 2!!!">>},
#message{payload = <<"Hi 1!!!">>}
]},
emqx_retainer:page_read(<<"t/backup-retainer/#">>, 1, 5)
),
%% Export and import again
{ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(),
?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)).
t_export_ram_retained_messages(_Config) ->
{ok, _} = emqx_retainer:update_config(
#{
<<"enable">> => true,
<<"backend">> => #{<<"storage_type">> => <<"ram">>}
}
),
?assertEqual(ram_copies, mnesia:table_info(emqx_retainer_message, storage_type)),
Topic = <<"t/backup_test_export_retained_ram/1">>,
Payload = <<"backup_test_retained_ram">>,
Msg = emqx_message:make(
<<"backup_test">>,
?QOS_0,
Topic,
Payload,
#{retain => true},
#{}
),
_ = emqx_broker:publish(Msg),
{ok, #{filename := BackupFileName}} = emqx_mgmt_data_backup:export(),
ok = emqx_retainer:delete(Topic),
?assertEqual({ok, []}, emqx_retainer:read_message(Topic)),
?assertEqual(
{ok, #{db_errors => #{}, config_errors => #{}}},
emqx_mgmt_data_backup:import(BackupFileName)
),
?assertMatch({ok, [#message{payload = Payload}]}, emqx_retainer:read_message(Topic)).
t_cluster_hocon_export_import(Config) -> t_cluster_hocon_export_import(Config) ->
RawConfBeforeImport = emqx:get_raw_config([]), RawConfBeforeImport = emqx:get_raw_config([]),
BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP), BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP),

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [ {application, emqx_retainer, [
{description, "EMQX Retainer"}, {description, "EMQX Retainer"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.23"}, {vsn, "5.0.24"},
{modules, []}, {modules, []},
{registered, [emqx_retainer_sup]}, {registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]}, {applications, [kernel, stdlib, emqx, emqx_ctl]},

View File

@ -56,7 +56,10 @@
-export([populate_index_meta/0]). -export([populate_index_meta/0]).
-export([reindex/3]). -export([reindex/3]).
-export([backup_tables/0]). -export([
backup_tables/0,
on_backup_table_imported/2
]).
-record(retained_message, {topic, msg, expiry_time}). -record(retained_message, {topic, msg, expiry_time}).
-record(retained_index, {key, expiry_time}). -record(retained_index, {key, expiry_time}).
@ -80,8 +83,45 @@ topics() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Data backup %% Data backup
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
backup_tables() -> backup_tables() ->
[?TAB_MESSAGE]. [?TAB_MESSAGE || is_enabled()].
on_backup_table_imported(?TAB_MESSAGE, Opts) ->
case is_enabled() of
true ->
maybe_print("Starting reindexing retained messages ~n", [], Opts),
Res = reindex(false, mk_status_fun(Opts)),
maybe_print("Reindexing retained messages finished~n", [], Opts),
Res;
false ->
ok
end;
on_backup_table_imported(_Tab, _Opts) ->
ok.
mk_status_fun(Opts) ->
fun(Done) ->
log_status(Done),
maybe_print("Reindexed ~p messages~n", [Done], Opts)
end.
maybe_print(Fmt, Args, #{print_fun := Fun}) when is_function(Fun, 2) ->
Fun(Fmt, Args);
maybe_print(_Fmt, _Args, _Opts) ->
ok.
log_status(Done) ->
?SLOG(
info,
#{
msg => "retainer_message_record_reindexing_progress",
done => Done
}
).
is_enabled() ->
emqx_retainer:enabled() andalso emqx_retainer:backend_module() =:= ?MODULE.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% emqx_retainer callbacks %% emqx_retainer callbacks

View File

@ -0,0 +1,4 @@
- Automatically re-index imported retained messages during restoring a data backup file. Previously, it was needed to manually trigger re-indexing with `emqx ctl retainer reindex start` CLI
after the data backup file is imported.
- Allow exporting retained messages to a backup file if the configured storage_type (`retainer.backend.storage_type`) is `ram`. Previously, retained messages could be exported only if `disc` storage_type was configured.