From d1cd5dd817ee1912e9013a82d690c3a8a33dd526 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Mon, 30 Oct 2023 15:57:13 +0200 Subject: [PATCH 1/5] fix(emqx_mgmt_data_backup): upgrade raw conf before validating and importing --- .../emqx_management/src/emqx_mgmt_data_backup.erl | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index 0717e8285..3e0b0a534 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -462,11 +462,12 @@ import_cluster_hocon(BackupDir, Opts) -> case filelib:is_regular(HoconFileName) of true -> {ok, RawConf} = hocon:files([HoconFileName]), - {ok, _} = validate_cluster_hocon(RawConf), + RawConf1 = upgrade_raw_conf(emqx_conf:schema_module(), RawConf), + {ok, _} = validate_cluster_hocon(RawConf1), maybe_print("Importing cluster configuration...~n", [], Opts), %% At this point, when all validations have been passed, we want to log errors (if any) %% but proceed with the next items, instead of aborting the whole import operation - do_import_conf(RawConf, Opts); + do_import_conf(RawConf1, Opts); false -> maybe_print("No cluster configuration to be imported.~n", [], Opts), ?SLOG(info, #{ @@ -476,6 +477,16 @@ import_cluster_hocon(BackupDir, Opts) -> #{} end. +upgrade_raw_conf(SchemaMod, RawConf) -> + _ = SchemaMod:module_info(), + case erlang:function_exported(SchemaMod, upgrade_raw_conf, 1) of + true -> + %% TODO make it a schema module behaviour in hocon_schema + apply(SchemaMod, upgrade_raw_conf, [RawConf]); + false -> + RawConf + end. + read_data_files(RawConf) -> DataDir = bin(emqx:data_dir()), {ok, Cwd} = file:get_cwd(), From 0935bb6225bd5e562c06879f212bef00405f8585 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Mon, 30 Oct 2023 15:58:06 +0200 Subject: [PATCH 2/5] fix(emqx_connector): fix badarity error --- apps/emqx_connector/src/emqx_connector.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 81d20b647..f07e038d2 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -134,7 +134,7 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> Result = perform_connector_changes([ #{action => fun emqx_connector_resource:remove/4, data => Removed}, #{ - action => fun emqx_connector_resource:create/3, + action => fun emqx_connector_resource:create/4, data => Added, on_exception_fn => fun emqx_connector_resource:remove/4 }, From d94193ac15c58081b2445b94e612032e17c38e24 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Mon, 30 Oct 2023 15:58:48 +0200 Subject: [PATCH 3/5] fix(emqx_connector_schema): add kafka alias for kafka_producer --- apps/emqx_connector/src/schema/emqx_connector_schema.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 9aad0ecc7..ab2213793 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -56,7 +56,7 @@ enterprise_fields_connectors() -> []. -endif. -connector_type_to_bridge_types(kafka_producer) -> [kafka_producer]; +connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(azure_event_hub) -> [azure_event_hub]. actions_config_name() -> <<"bridges_v2">>. From bc8c29182005c9decd84510d9c2569b7e13cacf8 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Mon, 30 Oct 2023 17:46:05 +0200 Subject: [PATCH 4/5] fix(emqx_bridge): add import_config/1 cb to emqx_bridge_v2 --- apps/emqx_bridge/src/emqx_bridge.erl | 15 ++++++++++----- apps/emqx_bridge/src/emqx_bridge_v2.erl | 15 ++++++++++++++- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index fd0ce0d31..a3e06a819 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -62,7 +62,9 @@ %% Data backup -export([ - import_config/1 + import_config/1, + %% exported for emqx_bridge_v2 + import_config/4 ]). -export([query_opts/1]). @@ -442,15 +444,18 @@ do_check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) -> %%---------------------------------------------------------------------------------------- import_config(RawConf) -> - RootKeyPath = config_key_path(), - BridgesConf = maps:get(<<"bridges">>, RawConf, #{}), + import_config(RawConf, <<"bridges">>, ?ROOT_KEY, config_key_path()). + +%% Used in emqx_bridge_v2 +import_config(RawConf, RawConfKey, RootKey, RootKeyPath) -> + BridgesConf = maps:get(RawConfKey, RawConf, #{}), OldBridgesConf = emqx:get_raw_config(RootKeyPath, #{}), MergedConf = merge_confs(OldBridgesConf, BridgesConf), case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of {ok, #{raw_config := NewRawConf}} -> - {ok, #{root_key => ?ROOT_KEY, changed => changed_paths(OldBridgesConf, NewRawConf)}}; + {ok, #{root_key => RootKey, changed => changed_paths(OldBridgesConf, NewRawConf)}}; Error -> - {error, #{root_key => ?ROOT_KEY, reason => Error}} + {error, #{root_key => RootKey, reason => Error}} end. merge_confs(OldConf, NewConf) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 7a2d112ab..4be0ead57 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -16,7 +16,7 @@ -module(emqx_bridge_v2). -behaviour(emqx_config_handler). -% -behaviour(emqx_config_backup). +-behaviour(emqx_config_backup). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -77,6 +77,11 @@ pre_config_update/3 ]). +%% Data backup +-export([ + import_config/1 +]). + %% Compatibility API -export([ @@ -731,6 +736,14 @@ bridge_v2_type_to_connector_type(kafka_producer) -> bridge_v2_type_to_connector_type(azure_event_hub) -> azure_event_hub. +%%==================================================================== +%% Data backup API +%%==================================================================== + +import_config(RawConf) -> + %% bridges v2 structure + emqx_bridge:import_config(RawConf, <<"bridges_v2">>, ?ROOT_KEY, config_key_path()). + %%==================================================================== %% Config Update Handler API %%==================================================================== From 0562585c8fbc56c23c1a6d74a7c5cf0e76e30c75 Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Mon, 30 Oct 2023 17:47:03 +0200 Subject: [PATCH 5/5] fix(emqx_mgmt_data_backup): implement importer modules ordering `emqx_bridge_v2` depends on `emqx_connector`, so connectors must be imported first. --- .../src/emqx_mgmt_data_backup.erl | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index 3e0b0a534..5bf036e35 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -61,6 +61,12 @@ <<"slow_subs">> ]). +%% emqx_bridge_v2 depends on emqx_connector, so connectors need to be imported first +-define(IMPORT_ORDER, [ + emqx_connector, + emqx_bridge_v2 +]). + -define(DEFAULT_OPTS, #{}). -define(tar(_FileName_), _FileName_ ++ ?TAR_SUFFIX). -define(fmt_tar_err(_Expr_), @@ -534,7 +540,7 @@ do_import_conf(RawConf, Opts) -> GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))), maybe_print_errors(GenConfErrs, Opts), Errors = - lists:foldr( + lists:foldl( fun(Module, ErrorsAcc) -> case Module:import_config(RawConf) of {ok, #{changed := Changed}} -> @@ -545,11 +551,27 @@ do_import_conf(RawConf, Opts) -> end end, GenConfErrs, - find_behaviours(emqx_config_backup) + sort_importer_modules(find_behaviours(emqx_config_backup)) ), maybe_print_errors(Errors, Opts), Errors. +sort_importer_modules(Modules) -> + lists:sort( + fun(M1, M2) -> order(M1, ?IMPORT_ORDER) =< order(M2, ?IMPORT_ORDER) end, + Modules + ). + +order(Elem, List) -> + order(Elem, List, 0). + +order(_Elem, [], Order) -> + Order; +order(Elem, [Elem | _], Order) -> + Order; +order(Elem, [_ | T], Order) -> + order(Elem, T, Order + 1). + import_generic_conf(Data) -> lists:map( fun(Key) ->