Merge pull request #11842 from SergeTupchiy/EMQX-11272-fix-kafka-bridge-v1-import

EMQX-11272 fix kafka bridge v1 import
This commit is contained in:
Kjell Winblad 2023-10-31 07:58:54 +01:00 committed by GitHub
commit aea449306a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 63 additions and 12 deletions

View File

@ -62,7 +62,9 @@
%% Data backup
-export([
import_config/1
import_config/1,
%% exported for emqx_bridge_v2
import_config/4
]).
-export([query_opts/1]).
@ -442,15 +444,18 @@ do_check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) ->
%%----------------------------------------------------------------------------------------
import_config(RawConf) ->
RootKeyPath = config_key_path(),
BridgesConf = maps:get(<<"bridges">>, RawConf, #{}),
import_config(RawConf, <<"bridges">>, ?ROOT_KEY, config_key_path()).
%% Used in emqx_bridge_v2
import_config(RawConf, RawConfKey, RootKey, RootKeyPath) ->
BridgesConf = maps:get(RawConfKey, RawConf, #{}),
OldBridgesConf = emqx:get_raw_config(RootKeyPath, #{}),
MergedConf = merge_confs(OldBridgesConf, BridgesConf),
case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of
{ok, #{raw_config := NewRawConf}} ->
{ok, #{root_key => ?ROOT_KEY, changed => changed_paths(OldBridgesConf, NewRawConf)}};
{ok, #{root_key => RootKey, changed => changed_paths(OldBridgesConf, NewRawConf)}};
Error ->
{error, #{root_key => ?ROOT_KEY, reason => Error}}
{error, #{root_key => RootKey, reason => Error}}
end.
merge_confs(OldConf, NewConf) ->

View File

@ -16,7 +16,7 @@
-module(emqx_bridge_v2).
-behaviour(emqx_config_handler).
% -behaviour(emqx_config_backup).
-behaviour(emqx_config_backup).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
@ -77,6 +77,11 @@
pre_config_update/3
]).
%% Data backup
-export([
import_config/1
]).
%% Compatibility API
-export([
@ -735,6 +740,14 @@ bridge_v2_type_to_connector_type(kafka_producer) ->
bridge_v2_type_to_connector_type(azure_event_hub) ->
azure_event_hub.
%%====================================================================
%% Data backup API
%%====================================================================
import_config(RawConf) ->
%% bridges v2 structure
emqx_bridge:import_config(RawConf, <<"bridges_v2">>, ?ROOT_KEY, config_key_path()).
%%====================================================================
%% Config Update Handler API
%%====================================================================

View File

@ -134,7 +134,7 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
Result = perform_connector_changes([
#{action => fun emqx_connector_resource:remove/4, data => Removed},
#{
action => fun emqx_connector_resource:create/3,
action => fun emqx_connector_resource:create/4,
data => Added,
on_exception_fn => fun emqx_connector_resource:remove/4
},

View File

@ -56,7 +56,7 @@ enterprise_fields_connectors() -> [].
-endif.
connector_type_to_bridge_types(kafka_producer) -> [kafka_producer];
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(azure_event_hub) -> [azure_event_hub].
actions_config_name() -> <<"bridges_v2">>.

View File

@ -61,6 +61,12 @@
<<"slow_subs">>
]).
%% emqx_bridge_v2 depends on emqx_connector, so connectors need to be imported first
-define(IMPORT_ORDER, [
emqx_connector,
emqx_bridge_v2
]).
-define(DEFAULT_OPTS, #{}).
-define(tar(_FileName_), _FileName_ ++ ?TAR_SUFFIX).
-define(fmt_tar_err(_Expr_),
@ -462,11 +468,12 @@ import_cluster_hocon(BackupDir, Opts) ->
case filelib:is_regular(HoconFileName) of
true ->
{ok, RawConf} = hocon:files([HoconFileName]),
{ok, _} = validate_cluster_hocon(RawConf),
RawConf1 = upgrade_raw_conf(emqx_conf:schema_module(), RawConf),
{ok, _} = validate_cluster_hocon(RawConf1),
maybe_print("Importing cluster configuration...~n", [], Opts),
%% At this point, when all validations have been passed, we want to log errors (if any)
%% but proceed with the next items, instead of aborting the whole import operation
do_import_conf(RawConf, Opts);
do_import_conf(RawConf1, Opts);
false ->
maybe_print("No cluster configuration to be imported.~n", [], Opts),
?SLOG(info, #{
@ -476,6 +483,16 @@ import_cluster_hocon(BackupDir, Opts) ->
#{}
end.
upgrade_raw_conf(SchemaMod, RawConf) ->
_ = SchemaMod:module_info(),
case erlang:function_exported(SchemaMod, upgrade_raw_conf, 1) of
true ->
%% TODO make it a schema module behaviour in hocon_schema
apply(SchemaMod, upgrade_raw_conf, [RawConf]);
false ->
RawConf
end.
read_data_files(RawConf) ->
DataDir = bin(emqx:data_dir()),
{ok, Cwd} = file:get_cwd(),
@ -523,7 +540,7 @@ do_import_conf(RawConf, Opts) ->
GenConfErrs = filter_errors(maps:from_list(import_generic_conf(RawConf))),
maybe_print_errors(GenConfErrs, Opts),
Errors =
lists:foldr(
lists:foldl(
fun(Module, ErrorsAcc) ->
case Module:import_config(RawConf) of
{ok, #{changed := Changed}} ->
@ -534,11 +551,27 @@ do_import_conf(RawConf, Opts) ->
end
end,
GenConfErrs,
find_behaviours(emqx_config_backup)
sort_importer_modules(find_behaviours(emqx_config_backup))
),
maybe_print_errors(Errors, Opts),
Errors.
sort_importer_modules(Modules) ->
lists:sort(
fun(M1, M2) -> order(M1, ?IMPORT_ORDER) =< order(M2, ?IMPORT_ORDER) end,
Modules
).
order(Elem, List) ->
order(Elem, List, 0).
order(_Elem, [], Order) ->
Order;
order(Elem, [Elem | _], Order) ->
Order;
order(Elem, [_ | T], Order) ->
order(Elem, T, Order + 1).
import_generic_conf(Data) ->
lists:map(
fun(Key) ->