Merge pull request #11992 from kjellwinblad/kjell/backport/action_upgrade_downgrade_hooks
fix(action): upgrade and downgrade strategy
This commit is contained in:
commit
b0d670aaa9
|
@ -25,15 +25,39 @@
|
||||||
action_type_to_bridge_v1_type/1,
|
action_type_to_bridge_v1_type/1,
|
||||||
bridge_v1_type_to_action_type/1,
|
bridge_v1_type_to_action_type/1,
|
||||||
is_action_type/1,
|
is_action_type/1,
|
||||||
registered_schema_modules/0
|
registered_schema_modules/0,
|
||||||
|
connector_action_config_to_bridge_v1_config/3,
|
||||||
|
has_custom_connector_action_config_to_bridge_v1_config/1,
|
||||||
|
bridge_v1_config_to_connector_config/2,
|
||||||
|
has_custom_bridge_v1_config_to_connector_config/1,
|
||||||
|
bridge_v1_config_to_action_config/3,
|
||||||
|
has_custom_bridge_v1_config_to_action_config/1,
|
||||||
|
transform_bridge_v1_config_to_action_config/4
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-callback bridge_v1_type_name() -> atom().
|
-callback bridge_v1_type_name() -> atom().
|
||||||
-callback action_type_name() -> atom().
|
-callback action_type_name() -> atom().
|
||||||
-callback connector_type_name() -> atom().
|
-callback connector_type_name() -> atom().
|
||||||
-callback schema_module() -> atom().
|
-callback schema_module() -> atom().
|
||||||
|
%% Define this if the automatic config downgrade is not enough for the bridge.
|
||||||
|
-callback connector_action_config_to_bridge_v1_config(
|
||||||
|
ConnectorConfig :: map(), ActionConfig :: map()
|
||||||
|
) -> map().
|
||||||
|
%% Define this if the automatic config upgrade is not enough for the connector.
|
||||||
|
-callback bridge_v1_config_to_connector_config(BridgeV1Config :: map()) -> map().
|
||||||
|
%% Define this if the automatic config upgrade is not enough for the bridge.
|
||||||
|
%% If you want to make use of the automatic config upgrade, you can call
|
||||||
|
%% emqx_action_info:transform_bridge_v1_config_to_action_config/4 in your
|
||||||
|
%% implementation and do some adjustments on the result.
|
||||||
|
-callback bridge_v1_config_to_action_config(BridgeV1Config :: map(), ConnectorName :: binary()) ->
|
||||||
|
map().
|
||||||
|
|
||||||
-optional_callbacks([bridge_v1_type_name/0]).
|
-optional_callbacks([
|
||||||
|
bridge_v1_type_name/0,
|
||||||
|
connector_action_config_to_bridge_v1_config/2,
|
||||||
|
bridge_v1_config_to_connector_config/1,
|
||||||
|
bridge_v1_config_to_action_config/2
|
||||||
|
]).
|
||||||
|
|
||||||
%% ====================================================================
|
%% ====================================================================
|
||||||
%% Hadcoded list of info modules for actions
|
%% Hadcoded list of info modules for actions
|
||||||
|
@ -110,10 +134,49 @@ registered_schema_modules() ->
|
||||||
Schemas = maps:get(action_type_to_schema_module, InfoMap),
|
Schemas = maps:get(action_type_to_schema_module, InfoMap),
|
||||||
maps:to_list(Schemas).
|
maps:to_list(Schemas).
|
||||||
|
|
||||||
|
has_custom_connector_action_config_to_bridge_v1_config(ActionOrBridgeType) ->
|
||||||
|
Module = get_action_info_module(ActionOrBridgeType),
|
||||||
|
erlang:function_exported(Module, connector_action_config_to_bridge_v1_config, 2).
|
||||||
|
|
||||||
|
connector_action_config_to_bridge_v1_config(ActionOrBridgeType, ConnectorConfig, ActionConfig) ->
|
||||||
|
Module = get_action_info_module(ActionOrBridgeType),
|
||||||
|
%% should only be called if defined
|
||||||
|
Module:connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig).
|
||||||
|
|
||||||
|
has_custom_bridge_v1_config_to_connector_config(ActionOrBridgeType) ->
|
||||||
|
Module = get_action_info_module(ActionOrBridgeType),
|
||||||
|
erlang:function_exported(Module, bridge_v1_config_to_connector_config, 1).
|
||||||
|
|
||||||
|
bridge_v1_config_to_connector_config(ActionOrBridgeType, BridgeV1Config) ->
|
||||||
|
Module = get_action_info_module(ActionOrBridgeType),
|
||||||
|
%% should only be called if defined
|
||||||
|
Module:bridge_v1_config_to_connector_config(BridgeV1Config).
|
||||||
|
|
||||||
|
has_custom_bridge_v1_config_to_action_config(ActionOrBridgeType) ->
|
||||||
|
Module = get_action_info_module(ActionOrBridgeType),
|
||||||
|
erlang:function_exported(Module, bridge_v1_config_to_action_config, 2).
|
||||||
|
|
||||||
|
bridge_v1_config_to_action_config(ActionOrBridgeType, BridgeV1Config, ConnectorName) ->
|
||||||
|
Module = get_action_info_module(ActionOrBridgeType),
|
||||||
|
%% should only be called if defined
|
||||||
|
Module:bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName).
|
||||||
|
|
||||||
|
transform_bridge_v1_config_to_action_config(
|
||||||
|
BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
|
||||||
|
) ->
|
||||||
|
emqx_connector_schema:transform_bridge_v1_config_to_action_config(
|
||||||
|
BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
|
||||||
|
).
|
||||||
|
|
||||||
%% ====================================================================
|
%% ====================================================================
|
||||||
%% Internal functions for building the info map and accessing it
|
%% Internal functions for building the info map and accessing it
|
||||||
%% ====================================================================
|
%% ====================================================================
|
||||||
|
|
||||||
|
get_action_info_module(ActionOrBridgeType) ->
|
||||||
|
InfoMap = info_map(),
|
||||||
|
ActionInfoModuleMap = maps:get(action_type_to_info_module, InfoMap),
|
||||||
|
maps:get(ActionOrBridgeType, ActionInfoModuleMap, undefined).
|
||||||
|
|
||||||
internal_emqx_action_persistent_term_info_key() ->
|
internal_emqx_action_persistent_term_info_key() ->
|
||||||
?FUNCTION_NAME.
|
?FUNCTION_NAME.
|
||||||
|
|
||||||
|
@ -161,7 +224,8 @@ initial_info_map() ->
|
||||||
bridge_v1_type_to_action_type => #{},
|
bridge_v1_type_to_action_type => #{},
|
||||||
action_type_to_bridge_v1_type => #{},
|
action_type_to_bridge_v1_type => #{},
|
||||||
action_type_to_connector_type => #{},
|
action_type_to_connector_type => #{},
|
||||||
action_type_to_schema_module => #{}
|
action_type_to_schema_module => #{},
|
||||||
|
action_type_to_info_module => #{}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
get_info_map(Module) ->
|
get_info_map(Module) ->
|
||||||
|
@ -195,5 +259,10 @@ get_info_map(Module) ->
|
||||||
},
|
},
|
||||||
action_type_to_schema_module => #{
|
action_type_to_schema_module => #{
|
||||||
ActionType => Module:schema_module()
|
ActionType => Module:schema_module()
|
||||||
|
},
|
||||||
|
action_type_to_info_module => #{
|
||||||
|
ActionType => Module,
|
||||||
|
%% Alias the bridge V1 type to the action type
|
||||||
|
BridgeV1Type => Module
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -1105,9 +1105,23 @@ bridge_v1_lookup_and_transform_helper(
|
||||||
<<"actions">>,
|
<<"actions">>,
|
||||||
emqx_bridge_v2_schema
|
emqx_bridge_v2_schema
|
||||||
),
|
),
|
||||||
BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2),
|
BridgeV1ConfigFinal =
|
||||||
BridgeV1Config2 = maps:merge(BridgeV1Config1, ConnectorRawConfig2),
|
case
|
||||||
BridgeV1Tmp = maps:put(raw_config, BridgeV1Config2, BridgeV2),
|
emqx_action_info:has_custom_connector_action_config_to_bridge_v1_config(BridgeV1Type)
|
||||||
|
of
|
||||||
|
false ->
|
||||||
|
BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2),
|
||||||
|
%% Move parameters to the top level
|
||||||
|
ParametersMap = maps:get(<<"parameters">>, BridgeV1Config1, #{}),
|
||||||
|
BridgeV1Config2 = maps:remove(<<"parameters">>, BridgeV1Config1),
|
||||||
|
BridgeV1Config3 = emqx_utils_maps:deep_merge(BridgeV1Config2, ParametersMap),
|
||||||
|
emqx_utils_maps:deep_merge(ConnectorRawConfig2, BridgeV1Config3);
|
||||||
|
true ->
|
||||||
|
emqx_action_info:connector_action_config_to_bridge_v1_config(
|
||||||
|
BridgeV1Type, ConnectorRawConfig2, BridgeV2RawConfig2
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
BridgeV1Tmp = maps:put(raw_config, BridgeV1ConfigFinal, BridgeV2),
|
||||||
BridgeV1 = maps:remove(status, BridgeV1Tmp),
|
BridgeV1 = maps:remove(status, BridgeV1Tmp),
|
||||||
BridgeV2Status = maps:get(status, BridgeV2, undefined),
|
BridgeV2Status = maps:get(status, BridgeV2, undefined),
|
||||||
BridgeV2Error = maps:get(error, BridgeV2, undefined),
|
BridgeV2Error = maps:get(error, BridgeV2, undefined),
|
||||||
|
|
|
@ -10,7 +10,9 @@
|
||||||
bridge_v1_type_name/0,
|
bridge_v1_type_name/0,
|
||||||
action_type_name/0,
|
action_type_name/0,
|
||||||
connector_type_name/0,
|
connector_type_name/0,
|
||||||
schema_module/0
|
schema_module/0,
|
||||||
|
connector_action_config_to_bridge_v1_config/2,
|
||||||
|
bridge_v1_config_to_action_config/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
bridge_v1_type_name() -> azure_event_hub_producer.
|
bridge_v1_type_name() -> azure_event_hub_producer.
|
||||||
|
@ -20,3 +22,11 @@ action_type_name() -> azure_event_hub_producer.
|
||||||
connector_type_name() -> azure_event_hub_producer.
|
connector_type_name() -> azure_event_hub_producer.
|
||||||
|
|
||||||
schema_module() -> emqx_bridge_azure_event_hub.
|
schema_module() -> emqx_bridge_azure_event_hub.
|
||||||
|
|
||||||
|
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
||||||
|
emqx_bridge_kafka_action_info:connector_action_config_to_bridge_v1_config(
|
||||||
|
ConnectorConfig, ActionConfig
|
||||||
|
).
|
||||||
|
|
||||||
|
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
|
||||||
|
emqx_bridge_kafka_action_info:bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName).
|
||||||
|
|
|
@ -10,7 +10,9 @@
|
||||||
bridge_v1_type_name/0,
|
bridge_v1_type_name/0,
|
||||||
action_type_name/0,
|
action_type_name/0,
|
||||||
connector_type_name/0,
|
connector_type_name/0,
|
||||||
schema_module/0
|
schema_module/0,
|
||||||
|
connector_action_config_to_bridge_v1_config/2,
|
||||||
|
bridge_v1_config_to_action_config/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
bridge_v1_type_name() -> kafka.
|
bridge_v1_type_name() -> kafka.
|
||||||
|
@ -20,3 +22,31 @@ action_type_name() -> kafka_producer.
|
||||||
connector_type_name() -> kafka_producer.
|
connector_type_name() -> kafka_producer.
|
||||||
|
|
||||||
schema_module() -> emqx_bridge_kafka.
|
schema_module() -> emqx_bridge_kafka.
|
||||||
|
|
||||||
|
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
||||||
|
BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
|
||||||
|
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
|
||||||
|
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2).
|
||||||
|
|
||||||
|
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
|
||||||
|
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
|
||||||
|
BridgeV1Conf, ConnectorName, schema_module(), kafka_producer
|
||||||
|
),
|
||||||
|
KafkaMap = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config0, #{}),
|
||||||
|
Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config0),
|
||||||
|
Config2 = emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaMap}),
|
||||||
|
maps:with(producer_action_field_keys(), Config2).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------------------
|
||||||
|
%% Internal helper functions
|
||||||
|
%%------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
producer_action_field_keys() ->
|
||||||
|
[
|
||||||
|
to_bin(K)
|
||||||
|
|| {K, _} <- emqx_bridge_kafka:fields(kafka_producer_action)
|
||||||
|
].
|
||||||
|
|
||||||
|
to_bin(B) when is_binary(B) -> B;
|
||||||
|
to_bin(L) when is_list(L) -> list_to_binary(L);
|
||||||
|
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
|
||||||
|
|
|
@ -22,7 +22,10 @@
|
||||||
|
|
||||||
-import(hoconsc, [mk/2, ref/2]).
|
-import(hoconsc, [mk/2, ref/2]).
|
||||||
|
|
||||||
-export([transform_bridges_v1_to_connectors_and_bridges_v2/1]).
|
-export([
|
||||||
|
transform_bridges_v1_to_connectors_and_bridges_v2/1,
|
||||||
|
transform_bridge_v1_config_to_action_config/4
|
||||||
|
]).
|
||||||
|
|
||||||
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
|
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
|
||||||
|
|
||||||
|
@ -98,53 +101,103 @@ bridge_configs_to_transform(
|
||||||
end.
|
end.
|
||||||
|
|
||||||
split_bridge_to_connector_and_action(
|
split_bridge_to_connector_and_action(
|
||||||
{ConnectorsMap, {BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}}
|
{ConnectorsMap, {BridgeType, BridgeName, BridgeV1Conf, ConnectorFields, PreviousRawConfig}}
|
||||||
) ->
|
) ->
|
||||||
%% Get connector fields from bridge config
|
ConnectorMap =
|
||||||
ConnectorMap = lists:foldl(
|
case emqx_action_info:has_custom_bridge_v1_config_to_connector_config(BridgeType) of
|
||||||
fun({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
true ->
|
||||||
case maps:is_key(to_bin(ConnectorFieldName), BridgeConf) of
|
emqx_action_info:bridge_v1_config_to_connector_config(
|
||||||
true ->
|
BridgeType, BridgeV1Conf
|
||||||
NewToTransform = maps:put(
|
);
|
||||||
to_bin(ConnectorFieldName),
|
false ->
|
||||||
maps:get(to_bin(ConnectorFieldName), BridgeConf),
|
%% We do an automatic transfomation to get the connector config
|
||||||
ToTransformSoFar
|
%% if the callback is not defined.
|
||||||
),
|
%% Get connector fields from bridge config
|
||||||
NewToTransform;
|
lists:foldl(
|
||||||
false ->
|
fun({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
||||||
ToTransformSoFar
|
case maps:is_key(to_bin(ConnectorFieldName), BridgeV1Conf) of
|
||||||
end
|
true ->
|
||||||
|
NewToTransform = maps:put(
|
||||||
|
to_bin(ConnectorFieldName),
|
||||||
|
maps:get(to_bin(ConnectorFieldName), BridgeV1Conf),
|
||||||
|
ToTransformSoFar
|
||||||
|
),
|
||||||
|
NewToTransform;
|
||||||
|
false ->
|
||||||
|
ToTransformSoFar
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
ConnectorFields
|
||||||
|
)
|
||||||
end,
|
end,
|
||||||
#{},
|
|
||||||
ConnectorFields
|
|
||||||
),
|
|
||||||
%% Remove connector fields from bridge config to create Action
|
|
||||||
ActionMap0 = lists:foldl(
|
|
||||||
fun
|
|
||||||
({enable, _Spec}, ToTransformSoFar) ->
|
|
||||||
%% Enable filed is used in both
|
|
||||||
ToTransformSoFar;
|
|
||||||
({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
|
||||||
case maps:is_key(to_bin(ConnectorFieldName), BridgeConf) of
|
|
||||||
true ->
|
|
||||||
maps:remove(to_bin(ConnectorFieldName), ToTransformSoFar);
|
|
||||||
false ->
|
|
||||||
ToTransformSoFar
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
BridgeConf,
|
|
||||||
ConnectorFields
|
|
||||||
),
|
|
||||||
%% Generate a connector name, if needed. Avoid doing so if there was a previous config.
|
%% Generate a connector name, if needed. Avoid doing so if there was a previous config.
|
||||||
ConnectorName =
|
ConnectorName =
|
||||||
case PreviousRawConfig of
|
case PreviousRawConfig of
|
||||||
#{<<"connector">> := ConnectorName0} -> ConnectorName0;
|
#{<<"connector">> := ConnectorName0} -> ConnectorName0;
|
||||||
_ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
|
_ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
|
||||||
end,
|
end,
|
||||||
%% Add connector field to action map
|
ActionMap =
|
||||||
ActionMap = maps:put(<<"connector">>, ConnectorName, ActionMap0),
|
case emqx_action_info:has_custom_bridge_v1_config_to_action_config(BridgeType) of
|
||||||
|
true ->
|
||||||
|
emqx_action_info:bridge_v1_config_to_action_config(
|
||||||
|
BridgeType, BridgeV1Conf, ConnectorName
|
||||||
|
);
|
||||||
|
false ->
|
||||||
|
transform_bridge_v1_config_to_action_config(
|
||||||
|
BridgeV1Conf, ConnectorName, ConnectorFields
|
||||||
|
)
|
||||||
|
end,
|
||||||
{BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
|
{BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
|
||||||
|
|
||||||
|
transform_bridge_v1_config_to_action_config(
|
||||||
|
BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
|
||||||
|
) ->
|
||||||
|
ConnectorFields = ConnectorConfSchemaMod:fields(ConnectorConfSchemaName),
|
||||||
|
transform_bridge_v1_config_to_action_config(
|
||||||
|
BridgeV1Conf, ConnectorName, ConnectorFields
|
||||||
|
).
|
||||||
|
|
||||||
|
transform_bridge_v1_config_to_action_config(
|
||||||
|
BridgeV1Conf, ConnectorName, ConnectorFields
|
||||||
|
) ->
|
||||||
|
TopKeys = [
|
||||||
|
<<"enable">>,
|
||||||
|
<<"connector">>,
|
||||||
|
<<"local_topic">>,
|
||||||
|
<<"resource_opts">>,
|
||||||
|
<<"description">>,
|
||||||
|
<<"parameters">>
|
||||||
|
],
|
||||||
|
TopKeysMap = maps:from_keys(TopKeys, true),
|
||||||
|
%% Remove connector fields
|
||||||
|
ActionMap0 = lists:foldl(
|
||||||
|
fun
|
||||||
|
({enable, _Spec}, ToTransformSoFar) ->
|
||||||
|
%% Enable filed is used in both
|
||||||
|
ToTransformSoFar;
|
||||||
|
({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
||||||
|
ConnectorFieldNameBin = to_bin(ConnectorFieldName),
|
||||||
|
case
|
||||||
|
maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) andalso
|
||||||
|
(not maps:is_key(ConnectorFieldNameBin, TopKeysMap))
|
||||||
|
of
|
||||||
|
true ->
|
||||||
|
maps:remove(ConnectorFieldNameBin, ToTransformSoFar);
|
||||||
|
false ->
|
||||||
|
ToTransformSoFar
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
BridgeV1Conf,
|
||||||
|
ConnectorFields
|
||||||
|
),
|
||||||
|
%% Add the connector field
|
||||||
|
ActionMap1 = maps:put(<<"connector">>, ConnectorName, ActionMap0),
|
||||||
|
TopMap = maps:with(TopKeys, ActionMap1),
|
||||||
|
RestMap = maps:without(TopKeys, ActionMap1),
|
||||||
|
%% Other parameters should be stuffed into `parameters'
|
||||||
|
emqx_utils_maps:deep_merge(TopMap, #{<<"parameters">> => RestMap}).
|
||||||
|
|
||||||
generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
|
generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
|
||||||
ConnectorNameList =
|
ConnectorNameList =
|
||||||
case Attempt of
|
case Attempt of
|
||||||
|
|
|
@ -34,7 +34,8 @@
|
||||||
best_effort_recursive_sum/3,
|
best_effort_recursive_sum/3,
|
||||||
if_only_to_toggle_enable/2,
|
if_only_to_toggle_enable/2,
|
||||||
update_if_present/3,
|
update_if_present/3,
|
||||||
put_if/4
|
put_if/4,
|
||||||
|
rename/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([config_key/0, config_key_path/0]).
|
-export_type([config_key/0, config_key_path/0]).
|
||||||
|
@ -309,3 +310,11 @@ put_if(Acc, K, V, true) ->
|
||||||
Acc#{K => V};
|
Acc#{K => V};
|
||||||
put_if(Acc, _K, _V, false) ->
|
put_if(Acc, _K, _V, false) ->
|
||||||
Acc.
|
Acc.
|
||||||
|
|
||||||
|
rename(OldKey, NewKey, Map) ->
|
||||||
|
case maps:find(OldKey, Map) of
|
||||||
|
{ok, Value} ->
|
||||||
|
maps:put(NewKey, Value, maps:remove(OldKey, Map));
|
||||||
|
error ->
|
||||||
|
Map
|
||||||
|
end.
|
||||||
|
|
Loading…
Reference in New Issue