diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index bdb9cf666..b83a46903 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -57,7 +57,8 @@ <<"flapping_detect">>, <<"broker">>, <<"force_gc">>, - <<"zones">> + <<"zones">>, + <<"slow_subs">> ]). -define(DEFAULT_OPTS, #{}). diff --git a/apps/emqx_modules/src/emqx_modules.app.src b/apps/emqx_modules/src/emqx_modules.app.src index 1b934e015..4de1c2e9b 100644 --- a/apps/emqx_modules/src/emqx_modules.app.src +++ b/apps/emqx_modules/src/emqx_modules.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_modules, [ {description, "EMQX Modules"}, - {vsn, "5.0.17"}, + {vsn, "5.0.18"}, {modules, []}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, {mod, {emqx_modules_app, []}}, diff --git a/apps/emqx_modules/src/emqx_modules_conf.erl b/apps/emqx_modules/src/emqx_modules_conf.erl index 69a69cb12..ebe9b83df 100644 --- a/apps/emqx_modules/src/emqx_modules_conf.erl +++ b/apps/emqx_modules/src/emqx_modules_conf.erl @@ -18,6 +18,7 @@ -module(emqx_modules_conf). -behaviour(emqx_config_handler). +-behaviour(emqx_config_backup). %% Load/Unload -export([ @@ -37,6 +38,11 @@ post_config_update/5 ]). +%% Data backup +-export([ + import_config/1 +]). + %%-------------------------------------------------------------------- %% Load/Unload %%-------------------------------------------------------------------- @@ -78,6 +84,20 @@ remove_topic_metrics(Topic) -> {error, Reason} -> {error, Reason} end. +%%-------------------------------------------------------------------- +%% Data backup (Topic-Metrics) +%%-------------------------------------------------------------------- + +import_config(#{<<"topic_metrics">> := Topics}) -> + case emqx_conf:update([topic_metrics], {merge_topics, Topics}, #{override_to => cluster}) of + {ok, _} -> + {ok, #{root_key => topic_metrics, changed => []}}; + Error -> + {error, #{root_key => topic_metrics, reason => Error}} + end; +import_config(_RawConf) -> + {ok, #{root_key => topic_metrics, changed => []}}. + %%-------------------------------------------------------------------- %% Config Handler %%-------------------------------------------------------------------- @@ -103,7 +123,13 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) -> {ok, RawConf -- [Topic]}; _ -> {error, not_found} - end. + end; +pre_config_update(_, {merge_topics, NewConf}, OldConf) -> + KeyFun = fun(#{<<"topic">> := T}) -> T end, + MergedConf = emqx_utils:merge_lists(OldConf, NewConf, KeyFun), + {ok, MergedConf}; +pre_config_update(_, NewConf, _OldConf) -> + {ok, NewConf}. -spec post_config_update( list(atom()), @@ -113,7 +139,6 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) -> emqx_config:app_envs() ) -> ok | {ok, Result :: any()} | {error, Reason :: term()}. - post_config_update( _, {add_topic_metrics, Topic}, @@ -135,6 +160,19 @@ post_config_update( case emqx_topic_metrics:deregister(Topic) of ok -> ok; {error, Reason} -> {error, Reason} + end; +post_config_update(_, _UpdateReq, NewConfig, OldConfig, _AppEnvs) -> + #{ + removed := Removed, + added := Added + } = emqx_utils:diff_lists(NewConfig, OldConfig, fun(#{topic := T}) -> T end), + Deregistered = [emqx_topic_metrics:deregister(T) || #{topic := T} <- Removed], + Registered = [emqx_topic_metrics:register(T) || #{topic := T} <- Added], + DeregisteredErrs = [Res || Res <- Deregistered, Res =/= ok, Res =/= {error, topic_not_found}], + RegisteredErrs = [Res || Res <- Registered, Res =/= ok, Res =/= {error, already_existed}], + case DeregisteredErrs ++ RegisteredErrs of + [] -> ok; + Errs -> {error, Errs} end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_modules/test/emqx_modules_conf_SUITE.erl b/apps/emqx_modules/test/emqx_modules_conf_SUITE.erl index 14e477bf9..b95cc2fe3 100644 --- a/apps/emqx_modules/test/emqx_modules_conf_SUITE.erl +++ b/apps/emqx_modules/test/emqx_modules_conf_SUITE.erl @@ -39,12 +39,46 @@ end_per_suite(_Conf) -> init_per_testcase(_CaseName, Conf) -> Conf. +end_per_testcase(_CaseName, _Conf) -> + [emqx_modules_conf:remove_topic_metrics(T) || T <- emqx_modules_conf:topic_metrics()], + ok. + %%-------------------------------------------------------------------- %% Cases %%-------------------------------------------------------------------- -t_topic_metrics_list(_) -> - ok. - t_topic_metrics_add_remove(_) -> - ok. + ?assertEqual([], emqx_modules_conf:topic_metrics()), + ?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic">>)), + ?assertEqual([<<"test-topic">>], emqx_modules_conf:topic_metrics()), + ?assertEqual(ok, emqx_modules_conf:remove_topic_metrics(<<"test-topic">>)), + ?assertEqual([], emqx_modules_conf:topic_metrics()), + ?assertMatch({error, _}, emqx_modules_conf:remove_topic_metrics(<<"test-topic">>)). + +t_topic_metrics_merge_update(_) -> + ?assertEqual([], emqx_modules_conf:topic_metrics()), + ?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-import1">>)), + ?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-import2">>)), + ImportConf = #{ + <<"topic_metrics">> => + [ + #{<<"topic">> => <<"imported_topic1">>}, + #{<<"topic">> => <<"imported_topic2">>} + ] + }, + ?assertMatch({ok, _}, emqx_modules_conf:import_config(ImportConf)), + ExpTopics = [ + <<"test-topic-before-import1">>, + <<"test-topic-before-import2">>, + <<"imported_topic1">>, + <<"imported_topic2">> + ], + ?assertEqual(ExpTopics, emqx_modules_conf:topic_metrics()). + +t_topic_metrics_update(_) -> + ?assertEqual([], emqx_modules_conf:topic_metrics()), + ?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-update1">>)), + ?assertMatch({ok, _}, emqx_modules_conf:add_topic_metrics(<<"test-topic-before-update2">>)), + UpdConf = [#{<<"topic">> => <<"new_topic1">>}, #{<<"topic">> => <<"new_topic2">>}], + ?assertMatch({ok, _}, emqx_conf:update([topic_metrics], UpdConf, #{override_to => cluster})), + ?assertEqual([<<"new_topic1">>, <<"new_topic2">>], emqx_modules_conf:topic_metrics()). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index f0640c7dc..66c82d3a1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). -behaviour(emqx_config_handler). --behaiour(emqx_config_backup). +-behaviour(emqx_config_backup). -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). diff --git a/changes/ce/fix-11322.en.md b/changes/ce/fix-11322.en.md new file mode 100644 index 000000000..fca72c01a --- /dev/null +++ b/changes/ce/fix-11322.en.md @@ -0,0 +1,4 @@ +Import additional configurations from EMQX backup file (`emqx ctl import` command): + - rule_engine (previously not imported due to the bug) + - topic_metrics (previously not implemented) + - slow_subs (previously not implemented).