From 60882a616ebea7bca3c381ffdab3de22e707c3c0 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 18 Jun 2024 16:14:13 +0300 Subject: [PATCH 1/3] fix(data_backup): re-index retained messages after importing --- apps/emqx/src/bhvrs/emqx_db_backup.erl | 7 ++- .../src/emqx_mgmt_data_backup.erl | 25 +++++++++- .../test/emqx_mgmt_data_backup_SUITE.erl | 30 ++++++++++++ .../emqx-export-ce-retained-msgs-test.tar.gz | Bin 0 -> 1928 bytes apps/emqx_retainer/src/emqx_retainer.app.src | 2 +- .../src/emqx_retainer_mnesia.erl | 44 +++++++++++++++++- 6 files changed, 103 insertions(+), 5 deletions(-) create mode 100644 apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-ce-retained-msgs-test.tar.gz diff --git a/apps/emqx/src/bhvrs/emqx_db_backup.erl b/apps/emqx/src/bhvrs/emqx_db_backup.erl index 310baeb9b..4b8107565 100644 --- a/apps/emqx/src/bhvrs/emqx_db_backup.erl +++ b/apps/emqx/src/bhvrs/emqx_db_backup.erl @@ -18,6 +18,8 @@ -type traverse_break_reason() :: over | migrate. +-type opts() :: #{print_fun => fun((io:format(), [term()]) -> ok)}. + -callback backup_tables() -> [mria:table()]. %% validate the backup @@ -31,6 +33,9 @@ -callback migrate_mnesia_backup(tuple()) -> {ok, tuple()} | {error, term()}. --optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1]). +%% NOTE: currently, this is called only when the table has been restored successfully. +-callback on_backup_table_imported(mria:table(), opts()) -> ok | {error, term()}. + +-optional_callbacks([validate_mnesia_backup/1, migrate_mnesia_backup/1, on_backup_table_imported/2]). -export_type([traverse_break_reason/0]). diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index c919aa506..ab0deaa87 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -549,6 +549,8 @@ import_mnesia_tabs(BackupDir, Opts) -> ) ). +-spec import_mnesia_tab(file:filename_all(), module(), mria:table(), map()) -> + ok | {ok, no_backup_file} | {error, term()} | no_return(). import_mnesia_tab(BackupDir, Mod, TabName, Opts) -> MnesiaBackupFileName = mnesia_backup_name(BackupDir, TabName), case filelib:is_regular(MnesiaBackupFileName) of @@ -572,7 +574,7 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) -> Restored = mnesia:restore(BackupFile, [{default_op, keep_tables}]), case Restored of {atomic, [TabName]} -> - ok; + on_table_imported(Mod, TabName, Opts); RestoreErr -> ?SLOG(error, #{ msg => "failed_to_restore_mnesia_backup", @@ -598,6 +600,27 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, Mod, TabName, Opts) -> _ = file:delete(MnesiaBackupFileName) end. +on_table_imported(Mod, Tab, Opts) -> + case erlang:function_exported(Mod, on_backup_table_imported, 2) of + true -> + try + Mod:on_backup_table_imported(Tab, Opts) + catch + Class:Reason:Stack -> + ?SLOG(error, #{ + msg => "post_database_import_callback_failed", + table => Tab, + module => Mod, + exception => Class, + reason => Reason, + stacktrace => Stack + }), + {error, Reason} + end; + false -> + ok + end. + %% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema. %% Looks like there is no clean way to abort traversal without triggering any error reporting, %% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided... diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index fee392479..fb5d5988c 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -114,6 +114,36 @@ t_cluster_hocon_import_mqtt_subscribers_retainer_messages(Config) -> end, ok. +t_import_retained_messages(Config) -> + FName = "emqx-export-ce-retained-msgs-test.tar.gz", + BackupFile = filename:join(?config(data_dir, Config), FName), + Exp = {ok, #{db_errors => #{}, config_errors => #{}}}, + ?assertEqual(Exp, emqx_mgmt_data_backup:import(BackupFile)), + %% verify that retainer messages are imported + ?assertMatch( + {ok, [#message{payload = <<"Hi 1!!!">>}]}, + emqx_retainer:read_message(<<"t/backup-retainer/test1">>) + ), + ?assertMatch( + {ok, [#message{payload = <<"Hi 5!!!">>}]}, + emqx_retainer:read_message(<<"t/backup-retainer/test5">>) + ), + + %% verify that messages are re-indexed + ?assertMatch( + {ok, _, [ + #message{payload = <<"Hi 5!!!">>}, + #message{payload = <<"Hi 4!!!">>}, + #message{payload = <<"Hi 3!!!">>}, + #message{payload = <<"Hi 2!!!">>}, + #message{payload = <<"Hi 1!!!">>} + ]}, + emqx_retainer:page_read(<<"t/backup-retainer/#">>, 1, 5) + ), + %% Export and import again + {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), + ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)). + t_cluster_hocon_export_import(Config) -> RawConfBeforeImport = emqx:get_raw_config([]), BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP), diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-ce-retained-msgs-test.tar.gz b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE_data/emqx-export-ce-retained-msgs-test.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..6442a03a1b76a9b17e78f02b2440c4c794e12a29 GIT binary patch literal 1928 zcmV;32Y2`%iwFP!000001MOT*Y#T)s-q?=qI8BpQR4EiBw?fc@l6e2@atLiDT0l^^ zRHCP@#=Dbv6MJ{NJI=;!qpISB5E7+Gg$ojg9*`9Ve^wGG4y5xvRvAy|E!P^0{!m>`a9Q$*CMMv?e6W*s`R8u3%^dOqOg@B^a{e1vI0> zb;<6ZLV1fO^{bkK=V?6=ODh=kVkoc6C=V@Y1Z$*HbkM{yG$T3o4S;D9WSb@W2#63< z%@&FK>_869RTX>4$|2We8Qwy>AI}pL(Kd#Q;Cqr`>IODR*^uEW6HWWlwp&DpR9uzp zumf3POqQ=l95dmG<)Ud~O)dfFW)aWODx$Kk&#GWRF093w{K2^kUq1Zvs21-#@Xr!O z4ApQAcg4!%2DmMRLWr4}*+`EL&e@XjH04qr-rs_^LLg*M^6F zJLcvpc7h5%*gT6%aE(onVdpwxp**p-sFq|EArR2w8yZIr?E1lrBWL~#;>c!4i~4UE zYiOhC_@Cu!#{VK0=zllRA^o5H9;&|CVEk_wb;tjPv8?dno>2B`yL`ePE1MoehU&=p zFuib>exw|-a&{DJD4WOSbs#$d@2E=k9}znNr4Vth+m!lR`M&hr^g>y_A6p9KyXVoG z{+F{&sQ_;eTCE;7(SI?;iT?3_u>ad7bV&b4Z^8KglwbdM)ODhj!JvMb9v%CSc<)=4 zoi!T-{REy;w!`pB+JjCQC`m*u;B^@0hv*Y4Rpa~tn0D!sh2}7HXLxa@Y8nuRxT6;g z6CM;Rji&^Vv&~^@RkMW~N8UP94mbaQcrLkt8!n!L4el8|{Rg}`|- zsT5(H1wN%{l)K-Kx>dV$(CoM!H!wD-Gk|FkbQUr2=>t=KQ6S6Ii7xATJ3=eAomtB-L3rP&mG+GG6KrevJ=80THaFJJEn<3fGLcYeFz zFh1yGoZFvzMi7Llwiy5V*-go~;4v<^j6d>SM;I6DGG6@Svo{>Z<37ew7Ku~5sI0_Q^*FrVw)Ok+5cbt&cBPM`#)*_`CpzFgZ00zphNn9ItK!? z!>|9_YMf|g7^uzf(RIIj?fFM4d;UWPvMih4}y^G#f|Mg(2|Nca4`kzG_ zWVYHy^ZoyfPyadkUqS-??*clc|Fh?y>RW#OAFOhsmC9e8(_=G$=(1Hcqkre@Zzx-$ z>q*Fh;eH4~Ek%u}`3)8JYu{Wcm4*Foa0sBMu@?Q8oYU&7Z=VzZ5(0BQRB9~uAo zFP0U9_5ZHG+Z0TmP9&35>#?RJPkSOlr8ha5qyoAC`RJ==T$Pyhh2WZ1<3 literal 0 HcmV?d00001 diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 7bcde8d50..4f778769d 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.23"}, + {vsn, "5.0.24"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index daaa776b7..cb9aa14f7 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -56,7 +56,10 @@ -export([populate_index_meta/0]). -export([reindex/3]). --export([backup_tables/0]). +-export([ + backup_tables/0, + on_backup_table_imported/2 +]). -record(retained_message, {topic, msg, expiry_time}). -record(retained_index, {key, expiry_time}). @@ -80,8 +83,45 @@ topics() -> %%-------------------------------------------------------------------- %% Data backup %%-------------------------------------------------------------------- + backup_tables() -> - [?TAB_MESSAGE]. + [?TAB_MESSAGE || is_enabled()]. + +on_backup_table_imported(?TAB_MESSAGE, Opts) -> + case is_enabled() of + true -> + maybe_print("Starting reindexing retained messages ~n", [], Opts), + Res = reindex(false, mk_status_fun(Opts)), + maybe_print("Reindexing retained messages finished~n", [], Opts), + Res; + false -> + ok + end; +on_backup_table_imported(_Tab, _Opts) -> + ok. + +mk_status_fun(Opts) -> + fun(Done) -> + log_status(Done), + maybe_print("Reindexed ~p messages~n", [Done], Opts) + end. + +maybe_print(Fmt, Args, #{print_fun := Fun}) when is_function(Fun, 2) -> + Fun(Fmt, Args); +maybe_print(_Fmt, _Args, _Opts) -> + ok. + +log_status(Done) -> + ?SLOG( + info, + #{ + msg => "retainer_message_record_reindexing_progress", + done => Done + } + ). + +is_enabled() -> + emqx_retainer:enabled() andalso emqx_retainer:backend_module() =:= ?MODULE. %%-------------------------------------------------------------------- %% emqx_retainer callbacks From baa79962891ff3a597d8d9dfdf465faf46e095d6 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 18 Jun 2024 17:24:53 +0300 Subject: [PATCH 2/3] fix(data_backup): allow exporting `ram_copies` Mnesia tables Currently, ram tables can be used for message retainer. --- .../src/emqx_mgmt_data_backup.erl | 10 +++++-- .../test/emqx_mgmt_data_backup_SUITE.erl | 29 +++++++++++++++++++ 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index ab0deaa87..50ddc33ca 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -408,9 +408,13 @@ export_mnesia_tab(TarDescriptor, TabName, BackupName, BackupBaseName, Opts) -> do_export_mnesia_tab(TabName, BackupName) -> Node = node(), try - {ok, TabName, [Node]} = mnesia:activate_checkpoint( - [{name, TabName}, {min, [TabName]}, {allow_remote, false}] - ), + Opts0 = [{name, TabName}, {min, [TabName]}, {allow_remote, false}], + Opts = + case mnesia:table_info(TabName, storage_type) of + ram_copies -> [{ram_overrides_dump, true} | Opts0]; + _ -> Opts0 + end, + {ok, TabName, [Node]} = mnesia:activate_checkpoint(Opts), MnesiaBackupName = mnesia_backup_name(BackupName, TabName), ok = filelib:ensure_dir(MnesiaBackupName), ok = mnesia:backup_checkpoint(TabName, MnesiaBackupName), diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index fb5d5988c..4e584824d 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -19,6 +19,7 @@ -compile(nowarn_export_all). -include_lib("emqx_utils/include/emqx_message.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -144,6 +145,34 @@ t_import_retained_messages(Config) -> {ok, #{filename := FileName}} = emqx_mgmt_data_backup:export(), ?assertEqual(Exp, emqx_mgmt_data_backup:import(FileName)). +t_export_ram_retained_messages(_Config) -> + {ok, _} = emqx_retainer:update_config( + #{ + <<"enable">> => true, + <<"backend">> => #{<<"storage_type">> => <<"ram">>} + } + ), + ?assertEqual(ram_copies, mnesia:table_info(emqx_retainer_message, storage_type)), + Topic = <<"t/backup_test_export_retained_ram/1">>, + Payload = <<"backup_test_retained_ram">>, + Msg = emqx_message:make( + <<"backup_test">>, + ?QOS_0, + Topic, + Payload, + #{retain => true}, + #{} + ), + _ = emqx_broker:publish(Msg), + {ok, #{filename := BackupFileName}} = emqx_mgmt_data_backup:export(), + ok = emqx_retainer:delete(Topic), + ?assertEqual({ok, []}, emqx_retainer:read_message(Topic)), + ?assertEqual( + {ok, #{db_errors => #{}, config_errors => #{}}}, + emqx_mgmt_data_backup:import(BackupFileName) + ), + ?assertMatch({ok, [#message{payload = Payload}]}, emqx_retainer:read_message(Topic)). + t_cluster_hocon_export_import(Config) -> RawConfBeforeImport = emqx:get_raw_config([]), BootstrapFile = filename:join(?config(data_dir, Config), ?BOOTSTRAP_BACKUP), From 12363cec4adc700558a5e45189a3df6a3858a334 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Tue, 18 Jun 2024 18:22:51 +0300 Subject: [PATCH 3/3] chore: add changelog --- changes/ce/fix-13293.en.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changes/ce/fix-13293.en.md diff --git a/changes/ce/fix-13293.en.md b/changes/ce/fix-13293.en.md new file mode 100644 index 000000000..0d5734e4c --- /dev/null +++ b/changes/ce/fix-13293.en.md @@ -0,0 +1,4 @@ +- Automatically re-index imported retained messages during restoring a data backup file. Previously, it was needed to manually trigger re-indexing with `emqx ctl retainer reindex start` CLI + after the data backup file is imported. + +- Allow exporting retained messages to a backup file if the configured storage_type (`retainer.backend.storage_type`) is `ram`. Previously, retained messages could be exported only if `disc` storage_type was configured.