fix(action): upgrade and downgrade strategy

This commit fixes the upgrade and downgrade strategy when upgrading
from a bridge V1 to connector and action or the other way around so that
the custom callbacks get the complete unchanged input instead of the
result of the automatic translation. The automatic translation is used
if the callback is not defined.
This commit is contained in:
Kjell Winblad 2023-11-20 16:40:27 +01:00
parent 79a764f117
commit 3aa8044475
5 changed files with 107 additions and 77 deletions

View File

@ -26,8 +26,11 @@
bridge_v1_type_to_action_type/1, bridge_v1_type_to_action_type/1,
is_action_type/1, is_action_type/1,
registered_schema_modules/0, registered_schema_modules/0,
action_to_bridge_v1_fixup/2, connector_action_config_to_bridge_v1_config/3,
bridge_v1_to_action_fixup/2 has_custom_connector_action_config_to_bridge_v1_config/1,
bridge_v1_config_to_action_config/3,
has_custom_bridge_v1_config_to_action_config/1,
transform_bridge_v1_config_to_action_config/4
]). ]).
-callback bridge_v1_type_name() -> atom(). -callback bridge_v1_type_name() -> atom().
@ -35,14 +38,20 @@
-callback connector_type_name() -> atom(). -callback connector_type_name() -> atom().
-callback schema_module() -> atom(). -callback schema_module() -> atom().
%% Define this if the automatic config downgrade is not enough for the bridge. %% Define this if the automatic config downgrade is not enough for the bridge.
-callback action_to_bridge_v1_fixup(map()) -> map(). -callback connector_action_config_to_bridge_v1_config(
ConnectorConfig :: map(), ActionConfig :: map()
) -> map().
%% Define this if the automatic config upgrade is not enough for the bridge. %% Define this if the automatic config upgrade is not enough for the bridge.
-callback bridge_v1_to_action_fixup(map()) -> map(). %% If you want to make use of the automatic config upgrade, you can call
%% emqx_action_info:transform_bridge_v1_config_to_action_config/4 in your
%% implementation and do some adjustments on the result.
-callback bridge_v1_config_to_action_config(BridgeV1Config :: map(), ConnectorName :: binary()) ->
map().
-optional_callbacks([ -optional_callbacks([
bridge_v1_type_name/0, bridge_v1_type_name/0,
action_to_bridge_v1_fixup/1, connector_action_config_to_bridge_v1_config/2,
bridge_v1_to_action_fixup/1 bridge_v1_config_to_action_config/2
]). ]).
%% ==================================================================== %% ====================================================================
@ -121,45 +130,29 @@ registered_schema_modules() ->
Schemas = maps:get(action_type_to_schema_module, InfoMap), Schemas = maps:get(action_type_to_schema_module, InfoMap),
maps:to_list(Schemas). maps:to_list(Schemas).
action_to_bridge_v1_fixup(ActionOrBridgeType, Config) -> has_custom_connector_action_config_to_bridge_v1_config(ActionOrBridgeType) ->
Module = get_action_info_module(ActionOrBridgeType), Module = get_action_info_module(ActionOrBridgeType),
case erlang:function_exported(Module, action_to_bridge_v1_fixup, 1) of erlang:function_exported(Module, connector_action_config_to_bridge_v1_config, 2).
true ->
Module:action_to_bridge_v1_fixup(Config);
false ->
Config
end.
bridge_v1_to_action_fixup(ActionOrBridgeType, Config0) -> connector_action_config_to_bridge_v1_config(ActionOrBridgeType, ConnectorConfig, ActionConfig) ->
Module = get_action_info_module(ActionOrBridgeType), Module = get_action_info_module(ActionOrBridgeType),
case erlang:function_exported(Module, bridge_v1_to_action_fixup, 1) of %% should only be called if defined
true -> Module:connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig).
Config1 = Module:bridge_v1_to_action_fixup(Config0),
common_bridge_v1_to_action_adapter(Config1);
false ->
common_bridge_v1_to_action_adapter(Config0)
end.
%% ==================================================================== has_custom_bridge_v1_config_to_action_config(ActionOrBridgeType) ->
%% Helper fns Module = get_action_info_module(ActionOrBridgeType),
%% ==================================================================== erlang:function_exported(Module, bridge_v1_config_to_action_config, 2).
common_bridge_v1_to_action_adapter(RawConfig) -> bridge_v1_config_to_action_config(ActionOrBridgeType, BridgeV1Config, ConnectorName) ->
TopKeys = [ Module = get_action_info_module(ActionOrBridgeType),
<<"enable">>, %% should only be called if defined
<<"connector">>, Module:bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName).
<<"local_topic">>,
<<"resource_opts">>, transform_bridge_v1_config_to_action_config(
<<"description">>, BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
<<"parameters">> ) ->
], emqx_connector_schema:transform_bridge_v1_config_to_action_config(
TopMap = maps:with(TopKeys, RawConfig), BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
RestMap = maps:without(TopKeys, RawConfig),
%% Other parameters should be stuffed into `parameters'
emqx_utils_maps:update_if_present(
<<"parameters">>,
fun(Old) -> emqx_utils_maps:deep_merge(Old, RestMap) end,
TopMap
). ).
%% ==================================================================== %% ====================================================================

View File

@ -1102,10 +1102,23 @@ bridge_v1_lookup_and_transform_helper(
<<"actions">>, <<"actions">>,
emqx_bridge_v2_schema emqx_bridge_v2_schema
), ),
BridgeV1ConfigFinal =
case
emqx_action_info:has_custom_connector_action_config_to_bridge_v1_config(BridgeV1Type)
of
false ->
BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2), BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2),
BridgeV1Config2 = maps:merge(BridgeV1Config1, ConnectorRawConfig2), %% Move parameters to the top level
BridgeV1Config3 = emqx_action_info:action_to_bridge_v1_fixup(BridgeV2Type, BridgeV1Config2), ParametersMap = maps:get(<<"parameters">>, BridgeV1Config1, #{}),
BridgeV1Tmp = maps:put(raw_config, BridgeV1Config3, BridgeV2), BridgeV1Config2 = maps:remove(<<"parameters">>, BridgeV1Config1),
BridgeV1Config3 = emqx_utils_maps:deep_merge(BridgeV1Config2, ParametersMap),
emqx_utils_maps:deep_merge(ConnectorRawConfig2, BridgeV1Config3);
true ->
emqx_action_info:connector_action_config_to_bridge_v1_config(
BridgeV1Type, ConnectorRawConfig2, BridgeV2RawConfig2
)
end,
BridgeV1Tmp = maps:put(raw_config, BridgeV1ConfigFinal, BridgeV2),
BridgeV1 = maps:remove(status, BridgeV1Tmp), BridgeV1 = maps:remove(status, BridgeV1Tmp),
BridgeV2Status = maps:get(status, BridgeV2, undefined), BridgeV2Status = maps:get(status, BridgeV2, undefined),
BridgeV2Error = maps:get(error, BridgeV2, undefined), BridgeV2Error = maps:get(error, BridgeV2, undefined),

View File

@ -11,8 +11,8 @@
action_type_name/0, action_type_name/0,
connector_type_name/0, connector_type_name/0,
schema_module/0, schema_module/0,
action_to_bridge_v1_fixup/1, connector_action_config_to_bridge_v1_config/2,
bridge_v1_to_action_fixup/1 bridge_v1_config_to_action_config/2
]). ]).
bridge_v1_type_name() -> azure_event_hub_producer. bridge_v1_type_name() -> azure_event_hub_producer.
@ -23,8 +23,10 @@ connector_type_name() -> azure_event_hub_producer.
schema_module() -> emqx_bridge_azure_event_hub. schema_module() -> emqx_bridge_azure_event_hub.
action_to_bridge_v1_fixup(Config) -> connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
emqx_bridge_kafka_action_info:action_to_bridge_v1_fixup(Config). emqx_bridge_kafka_action_info:connector_action_config_to_bridge_v1_config(
ConnectorConfig, ActionConfig
).
bridge_v1_to_action_fixup(Config) -> bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
emqx_bridge_kafka_action_info:bridge_v1_to_action_fixup(Config). bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName).

View File

@ -11,8 +11,8 @@
action_type_name/0, action_type_name/0,
connector_type_name/0, connector_type_name/0,
schema_module/0, schema_module/0,
action_to_bridge_v1_fixup/1, connector_action_config_to_bridge_v1_config/2,
bridge_v1_to_action_fixup/1 bridge_v1_config_to_action_config/2
]). ]).
bridge_v1_type_name() -> kafka. bridge_v1_type_name() -> kafka.
@ -23,17 +23,23 @@ connector_type_name() -> kafka_producer.
schema_module() -> emqx_bridge_kafka. schema_module() -> emqx_bridge_kafka.
action_to_bridge_v1_fixup(Config) -> connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, Config). BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2).
bridge_v1_to_action_fixup(Config0) -> bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
BridgeV1Conf,
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, schema_module(), kafka_producer
),
KafkaMap = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config0), KafkaMap = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config0),
Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config0), Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config0),
Config2 = maps:put(<<"parameters">>, KafkaMap, Config1), Config2 = maps:put(<<"parameters">>, KafkaMap, Config1),
maps:with(producer_action_field_keys(), Config2). maps:with(producer_action_field_keys(), Config2).
%%------------------------------------------------------------------------------------------ %%------------------------------------------------------------------------------------------
%% Internal helper fns %% Internal helper functions
%%------------------------------------------------------------------------------------------ %%------------------------------------------------------------------------------------------
producer_action_field_keys() -> producer_action_field_keys() ->

View File

@ -22,7 +22,10 @@
-import(hoconsc, [mk/2, ref/2]). -import(hoconsc, [mk/2, ref/2]).
-export([transform_bridges_v1_to_connectors_and_bridges_v2/1]). -export([
transform_bridges_v1_to_connectors_and_bridges_v2/1,
transform_bridge_v1_config_to_action_config/4
]).
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]). -export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
@ -124,23 +127,39 @@ split_bridge_to_connector_and_action(
#{<<"connector">> := ConnectorName0} -> ConnectorName0; #{<<"connector">> := ConnectorName0} -> ConnectorName0;
_ -> generate_connector_name(ConnectorsMap, BridgeName, 0) _ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
end, end,
%% Add connector field to action map ActionMap =
ActionMap = transform_bridge_v1_to_action( case emqx_action_info:has_custom_bridge_v1_config_to_action_config(BridgeType) of
BridgeType, BridgeV1Conf, ConnectorName, ConnectorFields true ->
), emqx_action_info:bridge_v1_config_to_action_config(
BridgeType, BridgeV1Conf, ConnectorName
);
false ->
transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, ConnectorFields
)
end,
{BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}. {BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
transform_bridge_v1_to_action(BridgeType, BridgeV1Conf, ConnectorName, ConnectorFields) -> transform_bridge_v1_config_to_action_config(
BridgeV1ConfKey = <<"__bridge_v1_conf__">>, BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
) ->
ConnectorFields = ConnectorConfSchemaMod:fields(ConnectorConfSchemaName),
transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, ConnectorFields
).
transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, ConnectorFields
) ->
TopKeys = [ TopKeys = [
<<"enable">>, <<"enable">>,
<<"connector">>, <<"connector">>,
<<"local_topic">>, <<"local_topic">>,
<<"resource_opts">>, <<"resource_opts">>,
<<"description">>, <<"description">>,
<<"parameters">>, <<"parameters">>
BridgeV1ConfKey
], ],
TopKeysMap = maps:from_keys(TopKeys, true),
%% Remove connector fields %% Remove connector fields
ActionMap0 = lists:foldl( ActionMap0 = lists:foldl(
fun fun
@ -148,7 +167,11 @@ transform_bridge_v1_to_action(BridgeType, BridgeV1Conf, ConnectorName, Connector
%% Enable filed is used in both %% Enable filed is used in both
ToTransformSoFar; ToTransformSoFar;
({ConnectorFieldName, _Spec}, ToTransformSoFar) -> ({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
case maps:is_key(to_bin(ConnectorFieldName), BridgeV1Conf) of ConnectorFieldNameBin = to_bin(ConnectorFieldName),
case
maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) andalso
(not maps:is_key(ConnectorFieldNameBin, TopKeysMap))
of
true -> true ->
maps:remove(to_bin(ConnectorFieldName), ToTransformSoFar); maps:remove(to_bin(ConnectorFieldName), ToTransformSoFar);
false -> false ->
@ -158,19 +181,12 @@ transform_bridge_v1_to_action(BridgeType, BridgeV1Conf, ConnectorName, Connector
BridgeV1Conf, BridgeV1Conf,
ConnectorFields ConnectorFields
), ),
%% Add special key as the whole original bridge config might be needed by
%% the fixup function
ActionMap1 = emqx_utils_maps:deep_put([BridgeV1ConfKey], ActionMap0, BridgeV1Conf),
%% Add the connector field %% Add the connector field
ActionMap2 = maps:put(<<"connector">>, ConnectorName, ActionMap1), ActionMap1 = maps:put(<<"connector">>, ConnectorName, ActionMap0),
TopMap = maps:with(TopKeys, ActionMap2), TopMap = maps:with(TopKeys, ActionMap1),
RestMap = maps:without(TopKeys, ActionMap2), RestMap = maps:without(TopKeys, ActionMap1),
%% Other parameters should be stuffed into `parameters' %% Other parameters should be stuffed into `parameters'
ActionMap = emqx_utils_maps:deep_merge(TopMap, #{<<"parameters">> => RestMap}), emqx_utils_maps:deep_merge(TopMap, #{<<"parameters">> => RestMap}).
%% Run the fixup callback if it is defined
FixedActionMap = emqx_action_info:bridge_v1_to_action_fixup(BridgeType, ActionMap),
%% remove the special key as it is not needed anymore
maps:without([BridgeV1ConfKey], FixedActionMap).
generate_connector_name(ConnectorsMap, BridgeName, Attempt) -> generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
ConnectorNameList = ConnectorNameList =