Merge pull request #11968 from kjellwinblad/kjell/bridge_v1_upgrade_downgrade_fixup_callbacks
Bridge V1 config upgrade downgrade fixup callbacks
This commit is contained in:
commit
6c19d1e78e
|
@ -25,15 +25,25 @@
|
|||
action_type_to_bridge_v1_type/1,
|
||||
bridge_v1_type_to_action_type/1,
|
||||
is_action_type/1,
|
||||
registered_schema_modules/0
|
||||
registered_schema_modules/0,
|
||||
action_to_bridge_v1_fixup/2,
|
||||
bridge_v1_to_action_fixup/2
|
||||
]).
|
||||
|
||||
-callback bridge_v1_type_name() -> atom().
|
||||
-callback action_type_name() -> atom().
|
||||
-callback connector_type_name() -> atom().
|
||||
-callback schema_module() -> atom().
|
||||
%% Define this if the automatic config downgrade is not enough for the bridge.
|
||||
-callback action_to_bridge_v1_fixup(map()) -> map().
|
||||
%% Define this if the automatic config upgrade is not enough for the bridge.
|
||||
-callback bridge_v1_to_action_fixup(map()) -> map().
|
||||
|
||||
-optional_callbacks([bridge_v1_type_name/0]).
|
||||
-optional_callbacks([
|
||||
bridge_v1_type_name/0,
|
||||
action_to_bridge_v1_fixup/1,
|
||||
bridge_v1_to_action_fixup/1
|
||||
]).
|
||||
|
||||
%% ====================================================================
|
||||
%% Hadcoded list of info modules for actions
|
||||
|
@ -111,10 +121,56 @@ registered_schema_modules() ->
|
|||
Schemas = maps:get(action_type_to_schema_module, InfoMap),
|
||||
maps:to_list(Schemas).
|
||||
|
||||
action_to_bridge_v1_fixup(ActionOrBridgeType, Config) ->
|
||||
Module = get_action_info_module(ActionOrBridgeType),
|
||||
case erlang:function_exported(Module, action_to_bridge_v1_fixup, 1) of
|
||||
true ->
|
||||
Module:action_to_bridge_v1_fixup(Config);
|
||||
false ->
|
||||
Config
|
||||
end.
|
||||
|
||||
bridge_v1_to_action_fixup(ActionOrBridgeType, Config0) ->
|
||||
Module = get_action_info_module(ActionOrBridgeType),
|
||||
case erlang:function_exported(Module, bridge_v1_to_action_fixup, 1) of
|
||||
true ->
|
||||
Config1 = Module:bridge_v1_to_action_fixup(Config0),
|
||||
common_bridge_v1_to_action_adapter(Config1);
|
||||
false ->
|
||||
common_bridge_v1_to_action_adapter(Config0)
|
||||
end.
|
||||
|
||||
%% ====================================================================
|
||||
%% Helper fns
|
||||
%% ====================================================================
|
||||
|
||||
common_bridge_v1_to_action_adapter(RawConfig) ->
|
||||
TopKeys = [
|
||||
<<"enable">>,
|
||||
<<"connector">>,
|
||||
<<"local_topic">>,
|
||||
<<"resource_opts">>,
|
||||
<<"description">>,
|
||||
<<"parameters">>
|
||||
],
|
||||
TopMap = maps:with(TopKeys, RawConfig),
|
||||
RestMap = maps:without(TopKeys, RawConfig),
|
||||
%% Other parameters should be stuffed into `parameters'
|
||||
emqx_utils_maps:update_if_present(
|
||||
<<"parameters">>,
|
||||
fun(Old) -> emqx_utils_maps:deep_merge(Old, RestMap) end,
|
||||
TopMap
|
||||
).
|
||||
|
||||
%% ====================================================================
|
||||
%% Internal functions for building the info map and accessing it
|
||||
%% ====================================================================
|
||||
|
||||
get_action_info_module(ActionOrBridgeType) ->
|
||||
InfoMap = info_map(),
|
||||
ActionInfoModuleMap = maps:get(action_type_to_info_module, InfoMap),
|
||||
maps:get(ActionOrBridgeType, ActionInfoModuleMap, undefined).
|
||||
|
||||
internal_emqx_action_persistent_term_info_key() ->
|
||||
?FUNCTION_NAME.
|
||||
|
||||
|
@ -162,7 +218,8 @@ initial_info_map() ->
|
|||
bridge_v1_type_to_action_type => #{},
|
||||
action_type_to_bridge_v1_type => #{},
|
||||
action_type_to_connector_type => #{},
|
||||
action_type_to_schema_module => #{}
|
||||
action_type_to_schema_module => #{},
|
||||
action_type_to_info_module => #{}
|
||||
}.
|
||||
|
||||
get_info_map(Module) ->
|
||||
|
@ -196,5 +253,9 @@ get_info_map(Module) ->
|
|||
},
|
||||
action_type_to_schema_module => #{
|
||||
ActionType => Module:schema_module()
|
||||
},
|
||||
action_type_to_info_module => #{
|
||||
ActionType => Module,
|
||||
BridgeV1Type => Module
|
||||
}
|
||||
}.
|
||||
|
|
|
@ -900,7 +900,7 @@ format_resource(
|
|||
case emqx_bridge_v2:is_bridge_v2_type(Type) of
|
||||
true ->
|
||||
%% The defaults are already filled in
|
||||
downgrade_raw_conf(Type, RawConf);
|
||||
RawConf;
|
||||
false ->
|
||||
fill_defaults(Type, RawConf)
|
||||
end,
|
||||
|
@ -1164,19 +1164,3 @@ upgrade_type(Type) ->
|
|||
|
||||
downgrade_type(Type) ->
|
||||
emqx_bridge_lib:downgrade_type(Type).
|
||||
|
||||
%% TODO: move it to callback
|
||||
downgrade_raw_conf(kafka_producer, RawConf) ->
|
||||
rename(<<"parameters">>, <<"kafka">>, RawConf);
|
||||
downgrade_raw_conf(azure_event_hub_producer, RawConf) ->
|
||||
rename(<<"parameters">>, <<"kafka">>, RawConf);
|
||||
downgrade_raw_conf(_Type, RawConf) ->
|
||||
RawConf.
|
||||
|
||||
rename(OldKey, NewKey, Map) ->
|
||||
case maps:find(OldKey, Map) of
|
||||
{ok, Value} ->
|
||||
maps:remove(OldKey, maps:put(NewKey, Value, Map));
|
||||
error ->
|
||||
Map
|
||||
end.
|
||||
|
|
|
@ -1104,7 +1104,8 @@ bridge_v1_lookup_and_transform_helper(
|
|||
),
|
||||
BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2),
|
||||
BridgeV1Config2 = maps:merge(BridgeV1Config1, ConnectorRawConfig2),
|
||||
BridgeV1Tmp = maps:put(raw_config, BridgeV1Config2, BridgeV2),
|
||||
BridgeV1Config3 = emqx_action_info:action_to_bridge_v1_fixup(BridgeV2Type, BridgeV1Config2),
|
||||
BridgeV1Tmp = maps:put(raw_config, BridgeV1Config3, BridgeV2),
|
||||
BridgeV1 = maps:remove(status, BridgeV1Tmp),
|
||||
BridgeV2Status = maps:get(status, BridgeV2, undefined),
|
||||
BridgeV2Error = maps:get(error, BridgeV2, undefined),
|
||||
|
|
|
@ -40,6 +40,8 @@
|
|||
|
||||
-export([types/0, types_sc/0]).
|
||||
|
||||
-export([make_producer_action_schema/1, make_consumer_action_schema/1]).
|
||||
|
||||
-export_type([action_type/0]).
|
||||
|
||||
%% Should we explicitly list them here so dialyzer may be more helpful?
|
||||
|
@ -116,7 +118,9 @@ roots() ->
|
|||
end.
|
||||
|
||||
fields(actions) ->
|
||||
registered_schema_fields().
|
||||
registered_schema_fields();
|
||||
fields(resource_opts) ->
|
||||
emqx_resource_schema:create_opts(_Overrides = []).
|
||||
|
||||
registered_schema_fields() ->
|
||||
[
|
||||
|
@ -150,6 +154,29 @@ examples(Method) ->
|
|||
SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()],
|
||||
lists:foldl(Fun, #{}, SchemaModules).
|
||||
|
||||
%%======================================================================================
|
||||
%% Helper functions for making HOCON Schema
|
||||
%%======================================================================================
|
||||
|
||||
make_producer_action_schema(ActionParametersRef) ->
|
||||
[
|
||||
{local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})}
|
||||
| make_consumer_action_schema(ActionParametersRef)
|
||||
].
|
||||
|
||||
make_consumer_action_schema(ActionParametersRef) ->
|
||||
[
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{connector,
|
||||
mk(binary(), #{
|
||||
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
|
||||
})},
|
||||
{description, emqx_schema:description_schema()},
|
||||
{parameters, ActionParametersRef},
|
||||
{resource_opts,
|
||||
mk(ref(?MODULE, resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
|
||||
].
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("hocon/include/hocon_types.hrl").
|
||||
schema_homogeneous_test() ->
|
||||
|
|
|
@ -60,15 +60,7 @@ init_per_testcase(_TestCase, Config) ->
|
|||
ets:new(fun_table_name(), [named_table, public]),
|
||||
%% Create a fake connector
|
||||
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
|
||||
[
|
||||
{mocked_mods, [
|
||||
emqx_connector_schema,
|
||||
emqx_connector_resource,
|
||||
|
||||
emqx_bridge_v2
|
||||
]}
|
||||
| Config
|
||||
].
|
||||
Config.
|
||||
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
ets:delete(fun_table_name()),
|
||||
|
|
|
@ -10,7 +10,9 @@
|
|||
bridge_v1_type_name/0,
|
||||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0
|
||||
schema_module/0,
|
||||
action_to_bridge_v1_fixup/1,
|
||||
bridge_v1_to_action_fixup/1
|
||||
]).
|
||||
|
||||
bridge_v1_type_name() -> azure_event_hub_producer.
|
||||
|
@ -20,3 +22,9 @@ action_type_name() -> azure_event_hub_producer.
|
|||
connector_type_name() -> azure_event_hub_producer.
|
||||
|
||||
schema_module() -> emqx_bridge_azure_event_hub.
|
||||
|
||||
action_to_bridge_v1_fixup(Config) ->
|
||||
emqx_bridge_kafka_action_info:action_to_bridge_v1_fixup(Config).
|
||||
|
||||
bridge_v1_to_action_fixup(Config) ->
|
||||
emqx_bridge_kafka_action_info:bridge_v1_to_action_fixup(Config).
|
||||
|
|
|
@ -610,7 +610,7 @@ producer_opts() ->
|
|||
].
|
||||
|
||||
%% Since e5.3.1, we want to rename the field 'kafka' to 'parameters'
|
||||
%% Hoever we need to keep it backward compatible for generated schema json (version 0.1.0)
|
||||
%% However we need to keep it backward compatible for generated schema json (version 0.1.0)
|
||||
%% since schema is data for the 'schemas' API.
|
||||
parameters_field() ->
|
||||
{Name, Alias} =
|
||||
|
|
|
@ -10,7 +10,9 @@
|
|||
bridge_v1_type_name/0,
|
||||
action_type_name/0,
|
||||
connector_type_name/0,
|
||||
schema_module/0
|
||||
schema_module/0,
|
||||
action_to_bridge_v1_fixup/1,
|
||||
bridge_v1_to_action_fixup/1
|
||||
]).
|
||||
|
||||
bridge_v1_type_name() -> kafka.
|
||||
|
@ -20,3 +22,24 @@ action_type_name() -> kafka_producer.
|
|||
connector_type_name() -> kafka_producer.
|
||||
|
||||
schema_module() -> emqx_bridge_kafka.
|
||||
|
||||
action_to_bridge_v1_fixup(Config) ->
|
||||
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, Config).
|
||||
|
||||
bridge_v1_to_action_fixup(Config0) ->
|
||||
Config = emqx_utils_maps:rename(<<"kafka">>, <<"parameters">>, Config0),
|
||||
maps:with(producer_action_field_keys(), Config).
|
||||
|
||||
%%------------------------------------------------------------------------------------------
|
||||
%% Internal helper fns
|
||||
%%------------------------------------------------------------------------------------------
|
||||
|
||||
producer_action_field_keys() ->
|
||||
[
|
||||
to_bin(K)
|
||||
|| {K, _} <- emqx_bridge_kafka:fields(kafka_producer_action)
|
||||
].
|
||||
|
||||
to_bin(B) when is_binary(B) -> B;
|
||||
to_bin(L) when is_list(L) -> list_to_binary(L);
|
||||
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
|
||||
|
|
|
@ -98,16 +98,16 @@ bridge_configs_to_transform(
|
|||
end.
|
||||
|
||||
split_bridge_to_connector_and_action(
|
||||
{ConnectorsMap, {BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}}
|
||||
{ConnectorsMap, {BridgeType, BridgeName, ActionConf, ConnectorFields, PreviousRawConfig}}
|
||||
) ->
|
||||
%% Get connector fields from bridge config
|
||||
ConnectorMap = lists:foldl(
|
||||
fun({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
||||
case maps:is_key(to_bin(ConnectorFieldName), BridgeConf) of
|
||||
case maps:is_key(to_bin(ConnectorFieldName), ActionConf) of
|
||||
true ->
|
||||
NewToTransform = maps:put(
|
||||
to_bin(ConnectorFieldName),
|
||||
maps:get(to_bin(ConnectorFieldName), BridgeConf),
|
||||
maps:get(to_bin(ConnectorFieldName), ActionConf),
|
||||
ToTransformSoFar
|
||||
),
|
||||
NewToTransform;
|
||||
|
@ -118,23 +118,6 @@ split_bridge_to_connector_and_action(
|
|||
#{},
|
||||
ConnectorFields
|
||||
),
|
||||
%% Remove connector fields from bridge config to create Action
|
||||
ActionMap0 = lists:foldl(
|
||||
fun
|
||||
({enable, _Spec}, ToTransformSoFar) ->
|
||||
%% Enable filed is used in both
|
||||
ToTransformSoFar;
|
||||
({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
||||
case maps:is_key(to_bin(ConnectorFieldName), BridgeConf) of
|
||||
true ->
|
||||
maps:remove(to_bin(ConnectorFieldName), ToTransformSoFar);
|
||||
false ->
|
||||
ToTransformSoFar
|
||||
end
|
||||
end,
|
||||
BridgeConf,
|
||||
ConnectorFields
|
||||
),
|
||||
%% Generate a connector name, if needed. Avoid doing so if there was a previous config.
|
||||
ConnectorName =
|
||||
case PreviousRawConfig of
|
||||
|
@ -142,7 +125,7 @@ split_bridge_to_connector_and_action(
|
|||
_ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
|
||||
end,
|
||||
%% Add connector field to action map
|
||||
ActionMap = maps:put(<<"connector">>, ConnectorName, ActionMap0),
|
||||
ActionMap = maps:put(<<"connector">>, ConnectorName, ActionConf),
|
||||
{BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
|
||||
|
||||
generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
|
||||
|
@ -191,7 +174,7 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
|
|||
),
|
||||
%% Add connectors and actions and remove bridges
|
||||
lists:foldl(
|
||||
fun({BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}, RawConfigSoFar) ->
|
||||
fun({BridgeType, BridgeName, ActionMap0, ConnectorName, ConnectorMap}, RawConfigSoFar) ->
|
||||
%% Add connector
|
||||
RawConfigSoFar1 = emqx_utils_maps:deep_put(
|
||||
[<<"connectors">>, to_bin(ConnectorType), ConnectorName],
|
||||
|
@ -203,6 +186,7 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
|
|||
[<<"bridges">>, to_bin(BridgeType), BridgeName],
|
||||
RawConfigSoFar1
|
||||
),
|
||||
ActionMap = emqx_action_info:bridge_v1_to_action_fixup(BridgeType, ActionMap0),
|
||||
%% Add action
|
||||
RawConfigSoFar3 = emqx_utils_maps:deep_put(
|
||||
[actions_config_name(), to_bin(maybe_rename(BridgeType)), BridgeName],
|
||||
|
|
|
@ -34,7 +34,8 @@
|
|||
best_effort_recursive_sum/3,
|
||||
if_only_to_toggle_enable/2,
|
||||
update_if_present/3,
|
||||
put_if/4
|
||||
put_if/4,
|
||||
rename/3
|
||||
]).
|
||||
|
||||
-export_type([config_key/0, config_key_path/0]).
|
||||
|
@ -309,3 +310,11 @@ put_if(Acc, K, V, true) ->
|
|||
Acc#{K => V};
|
||||
put_if(Acc, _K, _V, false) ->
|
||||
Acc.
|
||||
|
||||
rename(OldKey, NewKey, Map) ->
|
||||
case maps:find(OldKey, Map) of
|
||||
{ok, Value} ->
|
||||
maps:put(NewKey, Value, maps:remove(OldKey, Map));
|
||||
error ->
|
||||
Map
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue