feat: callbacks for fixup after automatic Bridge V1 upgrade/downgrade
This commit adds callbacks to the emqx_action_info module for doing fixes (such as changing a field name) after the automatic split of a Bridge V1 config or the merge of connector and action configs for the compatibility layer.
This commit is contained in:
parent
9feba802e9
commit
86c126ffcd
|
@ -25,15 +25,25 @@
|
||||||
action_type_to_bridge_v1_type/1,
|
action_type_to_bridge_v1_type/1,
|
||||||
bridge_v1_type_to_action_type/1,
|
bridge_v1_type_to_action_type/1,
|
||||||
is_action_type/1,
|
is_action_type/1,
|
||||||
registered_schema_modules/0
|
registered_schema_modules/0,
|
||||||
|
action_to_bridge_v1_fixup/2,
|
||||||
|
bridge_v1_to_action_fixup/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-callback bridge_v1_type_name() -> atom().
|
-callback bridge_v1_type_name() -> atom().
|
||||||
-callback action_type_name() -> atom().
|
-callback action_type_name() -> atom().
|
||||||
-callback connector_type_name() -> atom().
|
-callback connector_type_name() -> atom().
|
||||||
-callback schema_module() -> atom().
|
-callback schema_module() -> atom().
|
||||||
|
%% Define this if the automatic config downgrade is not enough for the bridge.
|
||||||
|
-callback action_to_bridge_v1_fixup(map()) -> term().
|
||||||
|
%% Define this if the automatic config upgrade is not enough for the bridge.
|
||||||
|
-callback bridge_v1_to_action_fixup(map()) -> term().
|
||||||
|
|
||||||
-optional_callbacks([bridge_v1_type_name/0]).
|
-optional_callbacks([
|
||||||
|
bridge_v1_type_name/0,
|
||||||
|
action_to_bridge_v1_fixup/1,
|
||||||
|
bridge_v1_to_action_fixup/1
|
||||||
|
]).
|
||||||
|
|
||||||
%% ====================================================================
|
%% ====================================================================
|
||||||
%% Hadcoded list of info modules for actions
|
%% Hadcoded list of info modules for actions
|
||||||
|
@ -111,10 +121,33 @@ registered_schema_modules() ->
|
||||||
Schemas = maps:get(action_type_to_schema_module, InfoMap),
|
Schemas = maps:get(action_type_to_schema_module, InfoMap),
|
||||||
maps:to_list(Schemas).
|
maps:to_list(Schemas).
|
||||||
|
|
||||||
|
action_to_bridge_v1_fixup(ActionOrBridgeType, Config) ->
|
||||||
|
Module = get_action_info_module(ActionOrBridgeType),
|
||||||
|
case erlang:function_exported(Module, action_to_bridge_v1_fixup, 1) of
|
||||||
|
true ->
|
||||||
|
Module:action_to_bridge_v1_fixup(Config);
|
||||||
|
false ->
|
||||||
|
Config
|
||||||
|
end.
|
||||||
|
|
||||||
|
bridge_v1_to_action_fixup(ActionOrBridgeType, Config) ->
|
||||||
|
Module = get_action_info_module(ActionOrBridgeType),
|
||||||
|
case erlang:function_exported(Module, bridge_v1_to_action_fixup, 1) of
|
||||||
|
true ->
|
||||||
|
Module:bridge_v1_to_action_fixup(Config);
|
||||||
|
false ->
|
||||||
|
Config
|
||||||
|
end.
|
||||||
|
|
||||||
%% ====================================================================
|
%% ====================================================================
|
||||||
%% Internal functions for building the info map and accessing it
|
%% Internal functions for building the info map and accessing it
|
||||||
%% ====================================================================
|
%% ====================================================================
|
||||||
|
|
||||||
|
get_action_info_module(ActionOrBridgeType) ->
|
||||||
|
InfoMap = info_map(),
|
||||||
|
ActionInfoModuleMap = maps:get(action_type_to_info_module, InfoMap),
|
||||||
|
maps:get(ActionOrBridgeType, ActionInfoModuleMap).
|
||||||
|
|
||||||
internal_emqx_action_persistent_term_info_key() ->
|
internal_emqx_action_persistent_term_info_key() ->
|
||||||
?FUNCTION_NAME.
|
?FUNCTION_NAME.
|
||||||
|
|
||||||
|
@ -162,7 +195,8 @@ initial_info_map() ->
|
||||||
bridge_v1_type_to_action_type => #{},
|
bridge_v1_type_to_action_type => #{},
|
||||||
action_type_to_bridge_v1_type => #{},
|
action_type_to_bridge_v1_type => #{},
|
||||||
action_type_to_connector_type => #{},
|
action_type_to_connector_type => #{},
|
||||||
action_type_to_schema_module => #{}
|
action_type_to_schema_module => #{},
|
||||||
|
action_type_to_info_module => #{}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
get_info_map(Module) ->
|
get_info_map(Module) ->
|
||||||
|
@ -196,5 +230,9 @@ get_info_map(Module) ->
|
||||||
},
|
},
|
||||||
action_type_to_schema_module => #{
|
action_type_to_schema_module => #{
|
||||||
ActionType => Module:schema_module()
|
ActionType => Module:schema_module()
|
||||||
|
},
|
||||||
|
action_type_to_info_module => #{
|
||||||
|
ActionType => Module,
|
||||||
|
BridgeV1Type => Module
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -900,7 +900,7 @@ format_resource(
|
||||||
case emqx_bridge_v2:is_bridge_v2_type(Type) of
|
case emqx_bridge_v2:is_bridge_v2_type(Type) of
|
||||||
true ->
|
true ->
|
||||||
%% The defaults are already filled in
|
%% The defaults are already filled in
|
||||||
downgrade_raw_conf(Type, RawConf);
|
RawConf;
|
||||||
false ->
|
false ->
|
||||||
fill_defaults(Type, RawConf)
|
fill_defaults(Type, RawConf)
|
||||||
end,
|
end,
|
||||||
|
@ -1164,19 +1164,3 @@ upgrade_type(Type) ->
|
||||||
|
|
||||||
downgrade_type(Type) ->
|
downgrade_type(Type) ->
|
||||||
emqx_bridge_lib:downgrade_type(Type).
|
emqx_bridge_lib:downgrade_type(Type).
|
||||||
|
|
||||||
%% TODO: move it to callback
|
|
||||||
downgrade_raw_conf(kafka_producer, RawConf) ->
|
|
||||||
rename(<<"parameters">>, <<"kafka">>, RawConf);
|
|
||||||
downgrade_raw_conf(azure_event_hub_producer, RawConf) ->
|
|
||||||
rename(<<"parameters">>, <<"kafka">>, RawConf);
|
|
||||||
downgrade_raw_conf(_Type, RawConf) ->
|
|
||||||
RawConf.
|
|
||||||
|
|
||||||
rename(OldKey, NewKey, Map) ->
|
|
||||||
case maps:find(OldKey, Map) of
|
|
||||||
{ok, Value} ->
|
|
||||||
maps:remove(OldKey, maps:put(NewKey, Value, Map));
|
|
||||||
error ->
|
|
||||||
Map
|
|
||||||
end.
|
|
||||||
|
|
|
@ -1104,7 +1104,8 @@ bridge_v1_lookup_and_transform_helper(
|
||||||
),
|
),
|
||||||
BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2),
|
BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2),
|
||||||
BridgeV1Config2 = maps:merge(BridgeV1Config1, ConnectorRawConfig2),
|
BridgeV1Config2 = maps:merge(BridgeV1Config1, ConnectorRawConfig2),
|
||||||
BridgeV1Tmp = maps:put(raw_config, BridgeV1Config2, BridgeV2),
|
BridgeV1Config3 = emqx_action_info:action_to_bridge_v1_fixup(BridgeV2Type, BridgeV1Config2),
|
||||||
|
BridgeV1Tmp = maps:put(raw_config, BridgeV1Config3, BridgeV2),
|
||||||
BridgeV1 = maps:remove(status, BridgeV1Tmp),
|
BridgeV1 = maps:remove(status, BridgeV1Tmp),
|
||||||
BridgeV2Status = maps:get(status, BridgeV2, undefined),
|
BridgeV2Status = maps:get(status, BridgeV2, undefined),
|
||||||
BridgeV2Error = maps:get(error, BridgeV2, undefined),
|
BridgeV2Error = maps:get(error, BridgeV2, undefined),
|
||||||
|
|
|
@ -10,7 +10,9 @@
|
||||||
bridge_v1_type_name/0,
|
bridge_v1_type_name/0,
|
||||||
action_type_name/0,
|
action_type_name/0,
|
||||||
connector_type_name/0,
|
connector_type_name/0,
|
||||||
schema_module/0
|
schema_module/0,
|
||||||
|
action_to_bridge_v1_fixup/1,
|
||||||
|
bridge_v1_to_action_fixup/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
bridge_v1_type_name() -> azure_event_hub_producer.
|
bridge_v1_type_name() -> azure_event_hub_producer.
|
||||||
|
@ -20,3 +22,19 @@ action_type_name() -> azure_event_hub_producer.
|
||||||
connector_type_name() -> azure_event_hub_producer.
|
connector_type_name() -> azure_event_hub_producer.
|
||||||
|
|
||||||
schema_module() -> emqx_bridge_azure_event_hub.
|
schema_module() -> emqx_bridge_azure_event_hub.
|
||||||
|
|
||||||
|
action_to_bridge_v1_fixup(Config) ->
|
||||||
|
rename(<<"parameters">>, <<"kafka">>, Config).
|
||||||
|
|
||||||
|
rename(OldKey, NewKey, Map) ->
|
||||||
|
case maps:find(OldKey, Map) of
|
||||||
|
{ok, Value} ->
|
||||||
|
maps:remove(OldKey, maps:put(NewKey, Value, Map));
|
||||||
|
error ->
|
||||||
|
Map
|
||||||
|
end.
|
||||||
|
|
||||||
|
bridge_v1_to_action_fixup(Config) ->
|
||||||
|
KafkaField = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config, #{}),
|
||||||
|
Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config),
|
||||||
|
emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaField}).
|
||||||
|
|
|
@ -610,7 +610,7 @@ producer_opts() ->
|
||||||
].
|
].
|
||||||
|
|
||||||
%% Since e5.3.1, we want to rename the field 'kafka' to 'parameters'
|
%% Since e5.3.1, we want to rename the field 'kafka' to 'parameters'
|
||||||
%% Hoever we need to keep it backward compatible for generated schema json (version 0.1.0)
|
%% However we need to keep it backward compatible for generated schema json (version 0.1.0)
|
||||||
%% since schema is data for the 'schemas' API.
|
%% since schema is data for the 'schemas' API.
|
||||||
parameters_field() ->
|
parameters_field() ->
|
||||||
{Name, Alias} =
|
{Name, Alias} =
|
||||||
|
|
|
@ -10,7 +10,9 @@
|
||||||
bridge_v1_type_name/0,
|
bridge_v1_type_name/0,
|
||||||
action_type_name/0,
|
action_type_name/0,
|
||||||
connector_type_name/0,
|
connector_type_name/0,
|
||||||
schema_module/0
|
schema_module/0,
|
||||||
|
action_to_bridge_v1_fixup/1,
|
||||||
|
bridge_v1_to_action_fixup/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
bridge_v1_type_name() -> kafka.
|
bridge_v1_type_name() -> kafka.
|
||||||
|
@ -20,3 +22,19 @@ action_type_name() -> kafka_producer.
|
||||||
connector_type_name() -> kafka_producer.
|
connector_type_name() -> kafka_producer.
|
||||||
|
|
||||||
schema_module() -> emqx_bridge_kafka.
|
schema_module() -> emqx_bridge_kafka.
|
||||||
|
|
||||||
|
action_to_bridge_v1_fixup(Config) ->
|
||||||
|
rename(<<"parameters">>, <<"kafka">>, Config).
|
||||||
|
|
||||||
|
rename(OldKey, NewKey, Map) ->
|
||||||
|
case maps:find(OldKey, Map) of
|
||||||
|
{ok, Value} ->
|
||||||
|
maps:remove(OldKey, maps:put(NewKey, Value, Map));
|
||||||
|
error ->
|
||||||
|
Map
|
||||||
|
end.
|
||||||
|
|
||||||
|
bridge_v1_to_action_fixup(Config) ->
|
||||||
|
KafkaField = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config, #{}),
|
||||||
|
Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config),
|
||||||
|
emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaField}).
|
||||||
|
|
|
@ -203,11 +203,21 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
|
||||||
[<<"bridges">>, to_bin(BridgeType), BridgeName],
|
[<<"bridges">>, to_bin(BridgeType), BridgeName],
|
||||||
RawConfigSoFar1
|
RawConfigSoFar1
|
||||||
),
|
),
|
||||||
|
%% Take fields that should be in the top level of the action map
|
||||||
|
TopLevelFields = [<<"resource_opts">>, <<"enable">>, <<"connector">>],
|
||||||
|
TopLevelActionFields = maps:with(TopLevelFields, ActionMap),
|
||||||
|
ParametersActionFields = maps:without(TopLevelFields, ActionMap),
|
||||||
|
%% Action map should be wrapped under parameters key
|
||||||
|
WrappedParameters = #{<<"parameters">> => ParametersActionFields},
|
||||||
|
FinalActionMap = maps:merge(TopLevelActionFields, WrappedParameters),
|
||||||
|
FixedActionMap = emqx_action_info:bridge_v1_to_action_fixup(
|
||||||
|
BridgeType, FinalActionMap
|
||||||
|
),
|
||||||
%% Add action
|
%% Add action
|
||||||
RawConfigSoFar3 = emqx_utils_maps:deep_put(
|
RawConfigSoFar3 = emqx_utils_maps:deep_put(
|
||||||
[actions_config_name(), to_bin(maybe_rename(BridgeType)), BridgeName],
|
[actions_config_name(), to_bin(maybe_rename(BridgeType)), BridgeName],
|
||||||
RawConfigSoFar2,
|
RawConfigSoFar2,
|
||||||
ActionMap
|
FixedActionMap
|
||||||
),
|
),
|
||||||
RawConfigSoFar3
|
RawConfigSoFar3
|
||||||
end,
|
end,
|
||||||
|
|
Loading…
Reference in New Issue