Merge pull request #11980 from kjellwinblad/kjell/fix_fixup_callbacks/EMQX-11428
fix: bridge to action upgrade fixup hook should run after upgrade
This commit is contained in:
commit
6f8630304d
|
@ -26,8 +26,13 @@
|
|||
bridge_v1_type_to_action_type/1,
|
||||
is_action_type/1,
|
||||
registered_schema_modules/0,
|
||||
action_to_bridge_v1_fixup/2,
|
||||
bridge_v1_to_action_fixup/2
|
||||
connector_action_config_to_bridge_v1_config/3,
|
||||
has_custom_connector_action_config_to_bridge_v1_config/1,
|
||||
bridge_v1_config_to_connector_config/2,
|
||||
has_custom_bridge_v1_config_to_connector_config/1,
|
||||
bridge_v1_config_to_action_config/3,
|
||||
has_custom_bridge_v1_config_to_action_config/1,
|
||||
transform_bridge_v1_config_to_action_config/4
|
||||
]).
|
||||
|
||||
-callback bridge_v1_type_name() ->
|
||||
|
@ -40,14 +45,23 @@
|
|||
-callback connector_type_name() -> atom().
|
||||
-callback schema_module() -> atom().
|
||||
%% Define this if the automatic config downgrade is not enough for the bridge.
|
||||
-callback action_to_bridge_v1_fixup(map()) -> map().
|
||||
-callback connector_action_config_to_bridge_v1_config(
|
||||
ConnectorConfig :: map(), ActionConfig :: map()
|
||||
) -> map().
|
||||
%% Define this if the automatic config upgrade is not enough for the connector.
|
||||
-callback bridge_v1_config_to_connector_config(BridgeV1Config :: map()) -> map().
|
||||
%% Define this if the automatic config upgrade is not enough for the bridge.
|
||||
-callback bridge_v1_to_action_fixup(map()) -> map().
|
||||
%% If you want to make use of the automatic config upgrade, you can call
|
||||
%% emqx_action_info:transform_bridge_v1_config_to_action_config/4 in your
|
||||
%% implementation and do some adjustments on the result.
|
||||
-callback bridge_v1_config_to_action_config(BridgeV1Config :: map(), ConnectorName :: binary()) ->
|
||||
map().
|
||||
|
||||
-optional_callbacks([
|
||||
bridge_v1_type_name/0,
|
||||
action_to_bridge_v1_fixup/1,
|
||||
bridge_v1_to_action_fixup/1
|
||||
connector_action_config_to_bridge_v1_config/2,
|
||||
bridge_v1_config_to_connector_config/1,
|
||||
bridge_v1_config_to_action_config/2
|
||||
]).
|
||||
|
||||
%% ====================================================================
|
||||
|
@ -132,45 +146,38 @@ registered_schema_modules() ->
|
|||
Schemas = maps:get(action_type_to_schema_module, InfoMap),
|
||||
maps:to_list(Schemas).
|
||||
|
||||
action_to_bridge_v1_fixup(ActionOrBridgeType, Config) ->
|
||||
has_custom_connector_action_config_to_bridge_v1_config(ActionOrBridgeType) ->
|
||||
Module = get_action_info_module(ActionOrBridgeType),
|
||||
case erlang:function_exported(Module, action_to_bridge_v1_fixup, 1) of
|
||||
true ->
|
||||
Module:action_to_bridge_v1_fixup(Config);
|
||||
false ->
|
||||
Config
|
||||
end.
|
||||
erlang:function_exported(Module, connector_action_config_to_bridge_v1_config, 2).
|
||||
|
||||
bridge_v1_to_action_fixup(ActionOrBridgeType, Config0) ->
|
||||
connector_action_config_to_bridge_v1_config(ActionOrBridgeType, ConnectorConfig, ActionConfig) ->
|
||||
Module = get_action_info_module(ActionOrBridgeType),
|
||||
case erlang:function_exported(Module, bridge_v1_to_action_fixup, 1) of
|
||||
true ->
|
||||
Config1 = Module:bridge_v1_to_action_fixup(Config0),
|
||||
common_bridge_v1_to_action_adapter(Config1);
|
||||
false ->
|
||||
common_bridge_v1_to_action_adapter(Config0)
|
||||
end.
|
||||
%% should only be called if defined
|
||||
Module:connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig).
|
||||
|
||||
%% ====================================================================
|
||||
%% Helper fns
|
||||
%% ====================================================================
|
||||
has_custom_bridge_v1_config_to_connector_config(ActionOrBridgeType) ->
|
||||
Module = get_action_info_module(ActionOrBridgeType),
|
||||
erlang:function_exported(Module, bridge_v1_config_to_connector_config, 1).
|
||||
|
||||
common_bridge_v1_to_action_adapter(RawConfig) ->
|
||||
TopKeys = [
|
||||
<<"enable">>,
|
||||
<<"connector">>,
|
||||
<<"local_topic">>,
|
||||
<<"resource_opts">>,
|
||||
<<"description">>,
|
||||
<<"parameters">>
|
||||
],
|
||||
TopMap = maps:with(TopKeys, RawConfig),
|
||||
RestMap = maps:without(TopKeys, RawConfig),
|
||||
%% Other parameters should be stuffed into `parameters'
|
||||
emqx_utils_maps:update_if_present(
|
||||
<<"parameters">>,
|
||||
fun(Old) -> emqx_utils_maps:deep_merge(Old, RestMap) end,
|
||||
TopMap
|
||||
bridge_v1_config_to_connector_config(ActionOrBridgeType, BridgeV1Config) ->
|
||||
Module = get_action_info_module(ActionOrBridgeType),
|
||||
%% should only be called if defined
|
||||
Module:bridge_v1_config_to_connector_config(BridgeV1Config).
|
||||
|
||||
has_custom_bridge_v1_config_to_action_config(ActionOrBridgeType) ->
|
||||
Module = get_action_info_module(ActionOrBridgeType),
|
||||
erlang:function_exported(Module, bridge_v1_config_to_action_config, 2).
|
||||
|
||||
bridge_v1_config_to_action_config(ActionOrBridgeType, BridgeV1Config, ConnectorName) ->
|
||||
Module = get_action_info_module(ActionOrBridgeType),
|
||||
%% should only be called if defined
|
||||
Module:bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName).
|
||||
|
||||
transform_bridge_v1_config_to_action_config(
|
||||
BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
|
||||
) ->
|
||||
emqx_connector_schema:transform_bridge_v1_config_to_action_config(
|
||||
BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
|
||||
).
|
||||
|
||||
%% ====================================================================
|
||||
|
|
|
@ -1102,10 +1102,23 @@ bridge_v1_lookup_and_transform_helper(
|
|||
<<"actions">>,
|
||||
emqx_bridge_v2_schema
|
||||
),
|
||||
BridgeV1ConfigFinal =
|
||||
case
|
||||
emqx_action_info:has_custom_connector_action_config_to_bridge_v1_config(BridgeV1Type)
|
||||
of
|
||||
false ->
|
||||
BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2),
|
||||
BridgeV1Config2 = maps:merge(BridgeV1Config1, ConnectorRawConfig2),
|
||||
BridgeV1Config3 = emqx_action_info:action_to_bridge_v1_fixup(BridgeV2Type, BridgeV1Config2),
|
||||
BridgeV1Tmp = maps:put(raw_config, BridgeV1Config3, BridgeV2),
|
||||
%% Move parameters to the top level
|
||||
ParametersMap = maps:get(<<"parameters">>, BridgeV1Config1, #{}),
|
||||
BridgeV1Config2 = maps:remove(<<"parameters">>, BridgeV1Config1),
|
||||
BridgeV1Config3 = emqx_utils_maps:deep_merge(BridgeV1Config2, ParametersMap),
|
||||
emqx_utils_maps:deep_merge(ConnectorRawConfig2, BridgeV1Config3);
|
||||
true ->
|
||||
emqx_action_info:connector_action_config_to_bridge_v1_config(
|
||||
BridgeV1Type, ConnectorRawConfig2, BridgeV2RawConfig2
|
||||
)
|
||||
end,
|
||||
BridgeV1Tmp = maps:put(raw_config, BridgeV1ConfigFinal, BridgeV2),
|
||||
BridgeV1 = maps:remove(status, BridgeV1Tmp),
|
||||
BridgeV2Status = maps:get(status, BridgeV2, undefined),
|
||||
BridgeV2Error = maps:get(error, BridgeV2, undefined),
|
||||
|
|
|
@ -11,8 +11,8 @@
|
|||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0,
|
||||
action_to_bridge_v1_fixup/1,
|
||||
bridge_v1_to_action_fixup/1
|
||||
connector_action_config_to_bridge_v1_config/2,
|
||||
bridge_v1_config_to_action_config/2
|
||||
]).
|
||||
|
||||
bridge_v1_type_name() -> azure_event_hub_producer.
|
||||
|
@ -23,8 +23,10 @@ connector_type_name() -> azure_event_hub_producer.
|
|||
|
||||
schema_module() -> emqx_bridge_azure_event_hub.
|
||||
|
||||
action_to_bridge_v1_fixup(Config) ->
|
||||
emqx_bridge_kafka_action_info:action_to_bridge_v1_fixup(Config).
|
||||
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
||||
emqx_bridge_kafka_action_info:connector_action_config_to_bridge_v1_config(
|
||||
ConnectorConfig, ActionConfig
|
||||
).
|
||||
|
||||
bridge_v1_to_action_fixup(Config) ->
|
||||
emqx_bridge_kafka_action_info:bridge_v1_to_action_fixup(Config).
|
||||
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
|
||||
emqx_bridge_kafka_action_info:bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName).
|
||||
|
|
|
@ -11,8 +11,8 @@
|
|||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0,
|
||||
action_to_bridge_v1_fixup/1,
|
||||
bridge_v1_to_action_fixup/1
|
||||
connector_action_config_to_bridge_v1_config/2,
|
||||
bridge_v1_config_to_action_config/2
|
||||
]).
|
||||
|
||||
bridge_v1_type_name() -> kafka.
|
||||
|
@ -23,15 +23,22 @@ connector_type_name() -> kafka_producer.
|
|||
|
||||
schema_module() -> emqx_bridge_kafka.
|
||||
|
||||
action_to_bridge_v1_fixup(Config) ->
|
||||
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, Config).
|
||||
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
||||
BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
|
||||
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
|
||||
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2).
|
||||
|
||||
bridge_v1_to_action_fixup(Config0) ->
|
||||
Config = emqx_utils_maps:rename(<<"kafka">>, <<"parameters">>, Config0),
|
||||
maps:with(producer_action_field_keys(), Config).
|
||||
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
|
||||
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
|
||||
BridgeV1Conf, ConnectorName, schema_module(), kafka_producer
|
||||
),
|
||||
KafkaMap = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config0, #{}),
|
||||
Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config0),
|
||||
Config2 = emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaMap}),
|
||||
maps:with(producer_action_field_keys(), Config2).
|
||||
|
||||
%%------------------------------------------------------------------------------------------
|
||||
%% Internal helper fns
|
||||
%% Internal helper functions
|
||||
%%------------------------------------------------------------------------------------------
|
||||
|
||||
producer_action_field_keys() ->
|
||||
|
|
|
@ -22,7 +22,10 @@
|
|||
|
||||
-import(hoconsc, [mk/2, ref/2]).
|
||||
|
||||
-export([transform_bridges_v1_to_connectors_and_bridges_v2/1]).
|
||||
-export([
|
||||
transform_bridges_v1_to_connectors_and_bridges_v2/1,
|
||||
transform_bridge_v1_config_to_action_config/4
|
||||
]).
|
||||
|
||||
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
|
||||
|
||||
|
@ -98,16 +101,25 @@ bridge_configs_to_transform(
|
|||
end.
|
||||
|
||||
split_bridge_to_connector_and_action(
|
||||
{ConnectorsMap, {BridgeType, BridgeName, ActionConf, ConnectorFields, PreviousRawConfig}}
|
||||
{ConnectorsMap, {BridgeType, BridgeName, BridgeV1Conf, ConnectorFields, PreviousRawConfig}}
|
||||
) ->
|
||||
ConnectorMap =
|
||||
case emqx_action_info:has_custom_bridge_v1_config_to_connector_config(BridgeType) of
|
||||
true ->
|
||||
emqx_action_info:bridge_v1_config_to_connector_config(
|
||||
BridgeType, BridgeV1Conf
|
||||
);
|
||||
false ->
|
||||
%% We do an automatic transfomation to get the connector config
|
||||
%% if the callback is not defined.
|
||||
%% Get connector fields from bridge config
|
||||
ConnectorMap = lists:foldl(
|
||||
lists:foldl(
|
||||
fun({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
||||
case maps:is_key(to_bin(ConnectorFieldName), ActionConf) of
|
||||
case maps:is_key(to_bin(ConnectorFieldName), BridgeV1Conf) of
|
||||
true ->
|
||||
NewToTransform = maps:put(
|
||||
to_bin(ConnectorFieldName),
|
||||
maps:get(to_bin(ConnectorFieldName), ActionConf),
|
||||
maps:get(to_bin(ConnectorFieldName), BridgeV1Conf),
|
||||
ToTransformSoFar
|
||||
),
|
||||
NewToTransform;
|
||||
|
@ -117,17 +129,75 @@ split_bridge_to_connector_and_action(
|
|||
end,
|
||||
#{},
|
||||
ConnectorFields
|
||||
),
|
||||
)
|
||||
end,
|
||||
%% Generate a connector name, if needed. Avoid doing so if there was a previous config.
|
||||
ConnectorName =
|
||||
case PreviousRawConfig of
|
||||
#{<<"connector">> := ConnectorName0} -> ConnectorName0;
|
||||
_ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
|
||||
end,
|
||||
%% Add connector field to action map
|
||||
ActionMap = maps:put(<<"connector">>, ConnectorName, ActionConf),
|
||||
ActionMap =
|
||||
case emqx_action_info:has_custom_bridge_v1_config_to_action_config(BridgeType) of
|
||||
true ->
|
||||
emqx_action_info:bridge_v1_config_to_action_config(
|
||||
BridgeType, BridgeV1Conf, ConnectorName
|
||||
);
|
||||
false ->
|
||||
transform_bridge_v1_config_to_action_config(
|
||||
BridgeV1Conf, ConnectorName, ConnectorFields
|
||||
)
|
||||
end,
|
||||
{BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
|
||||
|
||||
transform_bridge_v1_config_to_action_config(
|
||||
BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
|
||||
) ->
|
||||
ConnectorFields = ConnectorConfSchemaMod:fields(ConnectorConfSchemaName),
|
||||
transform_bridge_v1_config_to_action_config(
|
||||
BridgeV1Conf, ConnectorName, ConnectorFields
|
||||
).
|
||||
|
||||
transform_bridge_v1_config_to_action_config(
|
||||
BridgeV1Conf, ConnectorName, ConnectorFields
|
||||
) ->
|
||||
TopKeys = [
|
||||
<<"enable">>,
|
||||
<<"connector">>,
|
||||
<<"local_topic">>,
|
||||
<<"resource_opts">>,
|
||||
<<"description">>,
|
||||
<<"parameters">>
|
||||
],
|
||||
TopKeysMap = maps:from_keys(TopKeys, true),
|
||||
%% Remove connector fields
|
||||
ActionMap0 = lists:foldl(
|
||||
fun
|
||||
({enable, _Spec}, ToTransformSoFar) ->
|
||||
%% Enable filed is used in both
|
||||
ToTransformSoFar;
|
||||
({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
||||
ConnectorFieldNameBin = to_bin(ConnectorFieldName),
|
||||
case
|
||||
maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) andalso
|
||||
(not maps:is_key(ConnectorFieldNameBin, TopKeysMap))
|
||||
of
|
||||
true ->
|
||||
maps:remove(ConnectorFieldNameBin, ToTransformSoFar);
|
||||
false ->
|
||||
ToTransformSoFar
|
||||
end
|
||||
end,
|
||||
BridgeV1Conf,
|
||||
ConnectorFields
|
||||
),
|
||||
%% Add the connector field
|
||||
ActionMap1 = maps:put(<<"connector">>, ConnectorName, ActionMap0),
|
||||
TopMap = maps:with(TopKeys, ActionMap1),
|
||||
RestMap = maps:without(TopKeys, ActionMap1),
|
||||
%% Other parameters should be stuffed into `parameters'
|
||||
emqx_utils_maps:deep_merge(TopMap, #{<<"parameters">> => RestMap}).
|
||||
|
||||
generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
|
||||
ConnectorNameList =
|
||||
case Attempt of
|
||||
|
@ -174,7 +244,7 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
|
|||
),
|
||||
%% Add connectors and actions and remove bridges
|
||||
lists:foldl(
|
||||
fun({BridgeType, BridgeName, ActionMap0, ConnectorName, ConnectorMap}, RawConfigSoFar) ->
|
||||
fun({BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}, RawConfigSoFar) ->
|
||||
%% Add connector
|
||||
RawConfigSoFar1 = emqx_utils_maps:deep_put(
|
||||
[<<"connectors">>, to_bin(ConnectorType), ConnectorName],
|
||||
|
@ -186,7 +256,6 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
|
|||
[<<"bridges">>, to_bin(BridgeType), BridgeName],
|
||||
RawConfigSoFar1
|
||||
),
|
||||
ActionMap = emqx_action_info:bridge_v1_to_action_fixup(BridgeType, ActionMap0),
|
||||
%% Add action
|
||||
RawConfigSoFar3 = emqx_utils_maps:deep_put(
|
||||
[actions_config_name(), to_bin(maybe_rename(BridgeType)), BridgeName],
|
||||
|
|
Loading…
Reference in New Issue