diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index fd0ce0d31..a3e06a819 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -62,7 +62,9 @@ %% Data backup -export([ - import_config/1 + import_config/1, + %% exported for emqx_bridge_v2 + import_config/4 ]). -export([query_opts/1]). @@ -442,15 +444,18 @@ do_check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) -> %%---------------------------------------------------------------------------------------- import_config(RawConf) -> - RootKeyPath = config_key_path(), - BridgesConf = maps:get(<<"bridges">>, RawConf, #{}), + import_config(RawConf, <<"bridges">>, ?ROOT_KEY, config_key_path()). + +%% Used in emqx_bridge_v2 +import_config(RawConf, RawConfKey, RootKey, RootKeyPath) -> + BridgesConf = maps:get(RawConfKey, RawConf, #{}), OldBridgesConf = emqx:get_raw_config(RootKeyPath, #{}), MergedConf = merge_confs(OldBridgesConf, BridgesConf), case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of {ok, #{raw_config := NewRawConf}} -> - {ok, #{root_key => ?ROOT_KEY, changed => changed_paths(OldBridgesConf, NewRawConf)}}; + {ok, #{root_key => RootKey, changed => changed_paths(OldBridgesConf, NewRawConf)}}; Error -> - {error, #{root_key => ?ROOT_KEY, reason => Error}} + {error, #{root_key => RootKey, reason => Error}} end. merge_confs(OldConf, NewConf) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index fc70ee024..b8aa62ef6 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -16,7 +16,7 @@ -module(emqx_bridge_v2). -behaviour(emqx_config_handler). -% -behaviour(emqx_config_backup). +-behaviour(emqx_config_backup). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -77,6 +77,11 @@ pre_config_update/3 ]). +%% Data backup +-export([ + import_config/1 +]). + %% Compatibility API -export([ @@ -735,6 +740,14 @@ bridge_v2_type_to_connector_type(kafka_producer) -> bridge_v2_type_to_connector_type(azure_event_hub) -> azure_event_hub. +%%==================================================================== +%% Data backup API +%%==================================================================== + +import_config(RawConf) -> + %% bridges v2 structure + emqx_bridge:import_config(RawConf, <<"bridges_v2">>, ?ROOT_KEY, config_key_path()). + %%==================================================================== %% Config Update Handler API %%==================================================================== diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 81d20b647..f07e038d2 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -134,7 +134,7 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> Result = perform_connector_changes([ #{action => fun emqx_connector_resource:remove/4, data => Removed}, #{ - action => fun emqx_connector_resource:create/3, + action => fun emqx_connector_resource:create/4, data => Added, on_exception_fn => fun emqx_connector_resource:remove/4 }, diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 9aad0ecc7..ab2213793 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -56,7 +56,7 @@ enterprise_fields_connectors() -> []. -endif. -connector_type_to_bridge_types(kafka_producer) -> [kafka_producer]; +connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(azure_event_hub) -> [azure_event_hub]. actions_config_name() -> <<"bridges_v2">>. diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index 0717e8285..5bf036e35 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -61,6 +61,12 @@ <<"slow_subs">> ]). +%% emqx_bridge_v2 depends on emqx_connector, so connectors need to be imported first +-define(IMPORT_ORDER, [ + emqx_connector, + emqx_bridge_v2 +]). + -define(DEFAULT_OPTS, #{}). -define(tar(_FileName_), _FileName_ ++ ?TAR_SUFFIX). -define(fmt_tar_err(_Expr_), @@ -462,11 +468,12 @@ import_cluster_hocon(BackupDir, Opts) -> case filelib:is_regular(HoconFileName) of true -> {ok, RawConf} = hocon:files([HoconFileName]), - {ok, _} = validate_cluster_hocon(RawConf), + RawConf1 = upgrade_raw_conf(emqx_conf:schema_module(), RawConf), + {ok, _} = validate_cluster_hocon(RawConf1), maybe_print("Importing cluster configuration...~n", [], Opts), %% At this point, when all validations have been passed, we want to log errors (if any) %% but proceed with the next items, instead of aborting the whole import operation - do_import_conf(RawConf, Opts); + do_import_conf(RawConf1, Opts); false -> maybe_print("No cluster configuration to be imported.~n", [], Opts), ?SLOG(info, #{ @@ -476,6 +483,16 @@ import_cluster_hocon(BackupDir, Opts) -> #{} end. +upgrade_raw_conf(SchemaMod, RawConf) -> + _ = SchemaMod:module_info(), + case erlang:function_exported(SchemaMod, upgrade_raw_conf, 1) of + true -> + %% TODO make it a schema module behaviour in hocon_schema + apply(SchemaMod, upgrade_raw_conf, [RawConf]); + false -> + RawConf + end. + read_data_files(RawConf) -> DataDir = bin(emqx:data_dir()), {ok, Cwd} = file:get_cwd(), @@ -523,7 +540,7 @@ do_import_conf(RawConf, Opts) -> GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))), maybe_print_errors(GenConfErrs, Opts), Errors = - lists:foldr( + lists:foldl( fun(Module, ErrorsAcc) -> case Module:import_config(RawConf) of {ok, #{changed := Changed}} -> @@ -534,11 +551,27 @@ do_import_conf(RawConf, Opts) -> end end, GenConfErrs, - find_behaviours(emqx_config_backup) + sort_importer_modules(find_behaviours(emqx_config_backup)) ), maybe_print_errors(Errors, Opts), Errors. +sort_importer_modules(Modules) -> + lists:sort( + fun(M1, M2) -> order(M1, ?IMPORT_ORDER) =< order(M2, ?IMPORT_ORDER) end, + Modules + ). + +order(Elem, List) -> + order(Elem, List, 0). + +order(_Elem, [], Order) -> + Order; +order(Elem, [Elem | _], Order) -> + Order; +order(Elem, [_ | T], Order) -> + order(Elem, T, Order + 1). + import_generic_conf(Data) -> lists:map( fun(Key) ->