refactor: address review comments and avoid transformations without schema knowledge
This commit is contained in:
parent
86c126ffcd
commit
eb3f54184e
|
@ -35,9 +35,9 @@
|
||||||
-callback connector_type_name() -> atom().
|
-callback connector_type_name() -> atom().
|
||||||
-callback schema_module() -> atom().
|
-callback schema_module() -> atom().
|
||||||
%% Define this if the automatic config downgrade is not enough for the bridge.
|
%% Define this if the automatic config downgrade is not enough for the bridge.
|
||||||
-callback action_to_bridge_v1_fixup(map()) -> term().
|
-callback action_to_bridge_v1_fixup(map()) -> map().
|
||||||
%% Define this if the automatic config upgrade is not enough for the bridge.
|
%% Define this if the automatic config upgrade is not enough for the bridge.
|
||||||
-callback bridge_v1_to_action_fixup(map()) -> term().
|
-callback bridge_v1_to_action_fixup(map()) -> map().
|
||||||
|
|
||||||
-optional_callbacks([
|
-optional_callbacks([
|
||||||
bridge_v1_type_name/0,
|
bridge_v1_type_name/0,
|
||||||
|
@ -130,15 +130,38 @@ action_to_bridge_v1_fixup(ActionOrBridgeType, Config) ->
|
||||||
Config
|
Config
|
||||||
end.
|
end.
|
||||||
|
|
||||||
bridge_v1_to_action_fixup(ActionOrBridgeType, Config) ->
|
bridge_v1_to_action_fixup(ActionOrBridgeType, Config0) ->
|
||||||
Module = get_action_info_module(ActionOrBridgeType),
|
Module = get_action_info_module(ActionOrBridgeType),
|
||||||
case erlang:function_exported(Module, bridge_v1_to_action_fixup, 1) of
|
case erlang:function_exported(Module, bridge_v1_to_action_fixup, 1) of
|
||||||
true ->
|
true ->
|
||||||
Module:bridge_v1_to_action_fixup(Config);
|
Config1 = Module:bridge_v1_to_action_fixup(Config0),
|
||||||
|
common_bridge_v1_to_action_adapter(Config1);
|
||||||
false ->
|
false ->
|
||||||
Config
|
common_bridge_v1_to_action_adapter(Config0)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% ====================================================================
|
||||||
|
%% Helper fns
|
||||||
|
%% ====================================================================
|
||||||
|
|
||||||
|
common_bridge_v1_to_action_adapter(RawConfig) ->
|
||||||
|
TopKeys = [
|
||||||
|
<<"enable">>,
|
||||||
|
<<"connector">>,
|
||||||
|
<<"local_topic">>,
|
||||||
|
<<"resource_opts">>,
|
||||||
|
<<"description">>,
|
||||||
|
<<"parameters">>
|
||||||
|
],
|
||||||
|
TopMap = maps:with(TopKeys, RawConfig),
|
||||||
|
RestMap = maps:without(TopKeys, RawConfig),
|
||||||
|
%% Other parameters should be stuffed into `parameters'
|
||||||
|
emqx_utils_maps:update_if_present(
|
||||||
|
<<"parameters">>,
|
||||||
|
fun(Old) -> emqx_utils_maps:deep_merge(Old, RestMap) end,
|
||||||
|
TopMap
|
||||||
|
).
|
||||||
|
|
||||||
%% ====================================================================
|
%% ====================================================================
|
||||||
%% Internal functions for building the info map and accessing it
|
%% Internal functions for building the info map and accessing it
|
||||||
%% ====================================================================
|
%% ====================================================================
|
||||||
|
@ -146,7 +169,7 @@ bridge_v1_to_action_fixup(ActionOrBridgeType, Config) ->
|
||||||
get_action_info_module(ActionOrBridgeType) ->
|
get_action_info_module(ActionOrBridgeType) ->
|
||||||
InfoMap = info_map(),
|
InfoMap = info_map(),
|
||||||
ActionInfoModuleMap = maps:get(action_type_to_info_module, InfoMap),
|
ActionInfoModuleMap = maps:get(action_type_to_info_module, InfoMap),
|
||||||
maps:get(ActionOrBridgeType, ActionInfoModuleMap).
|
maps:get(ActionOrBridgeType, ActionInfoModuleMap, undefined).
|
||||||
|
|
||||||
internal_emqx_action_persistent_term_info_key() ->
|
internal_emqx_action_persistent_term_info_key() ->
|
||||||
?FUNCTION_NAME.
|
?FUNCTION_NAME.
|
||||||
|
|
|
@ -40,7 +40,7 @@
|
||||||
|
|
||||||
-export([types/0, types_sc/0]).
|
-export([types/0, types_sc/0]).
|
||||||
|
|
||||||
-export([make_action_schema/1]).
|
-export([make_producer_action_schema/1, make_consumer_action_schema/1]).
|
||||||
|
|
||||||
-export_type([action_type/0]).
|
-export_type([action_type/0]).
|
||||||
|
|
||||||
|
@ -158,7 +158,13 @@ examples(Method) ->
|
||||||
%% Helper functions for making HOCON Schema
|
%% Helper functions for making HOCON Schema
|
||||||
%%======================================================================================
|
%%======================================================================================
|
||||||
|
|
||||||
make_action_schema(ActionParametersRef) ->
|
make_producer_action_schema(ActionParametersRef) ->
|
||||||
|
[
|
||||||
|
{local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})}
|
||||||
|
| make_consumer_action_schema(ActionParametersRef)
|
||||||
|
].
|
||||||
|
|
||||||
|
make_consumer_action_schema(ActionParametersRef) ->
|
||||||
[
|
[
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
{connector,
|
{connector,
|
||||||
|
@ -166,7 +172,6 @@ make_action_schema(ActionParametersRef) ->
|
||||||
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
|
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
|
||||||
})},
|
})},
|
||||||
{description, emqx_schema:description_schema()},
|
{description, emqx_schema:description_schema()},
|
||||||
{local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})},
|
|
||||||
{parameters, ActionParametersRef},
|
{parameters, ActionParametersRef},
|
||||||
{resource_opts,
|
{resource_opts,
|
||||||
mk(ref(?MODULE, resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
|
mk(ref(?MODULE, resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
|
||||||
|
|
|
@ -60,15 +60,7 @@ init_per_testcase(_TestCase, Config) ->
|
||||||
ets:new(fun_table_name(), [named_table, public]),
|
ets:new(fun_table_name(), [named_table, public]),
|
||||||
%% Create a fake connector
|
%% Create a fake connector
|
||||||
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
|
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
|
||||||
[
|
Config.
|
||||||
{mocked_mods, [
|
|
||||||
emqx_connector_schema,
|
|
||||||
emqx_connector_resource,
|
|
||||||
|
|
||||||
emqx_bridge_v2
|
|
||||||
]}
|
|
||||||
| Config
|
|
||||||
].
|
|
||||||
|
|
||||||
end_per_testcase(_TestCase, _Config) ->
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
ets:delete(fun_table_name()),
|
ets:delete(fun_table_name()),
|
||||||
|
|
|
@ -24,17 +24,7 @@ connector_type_name() -> azure_event_hub_producer.
|
||||||
schema_module() -> emqx_bridge_azure_event_hub.
|
schema_module() -> emqx_bridge_azure_event_hub.
|
||||||
|
|
||||||
action_to_bridge_v1_fixup(Config) ->
|
action_to_bridge_v1_fixup(Config) ->
|
||||||
rename(<<"parameters">>, <<"kafka">>, Config).
|
emqx_bridge_kafka_action_info:action_to_bridge_v1_fixup(Config).
|
||||||
|
|
||||||
rename(OldKey, NewKey, Map) ->
|
|
||||||
case maps:find(OldKey, Map) of
|
|
||||||
{ok, Value} ->
|
|
||||||
maps:remove(OldKey, maps:put(NewKey, Value, Map));
|
|
||||||
error ->
|
|
||||||
Map
|
|
||||||
end.
|
|
||||||
|
|
||||||
bridge_v1_to_action_fixup(Config) ->
|
bridge_v1_to_action_fixup(Config) ->
|
||||||
KafkaField = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config, #{}),
|
emqx_bridge_kafka_action_info:bridge_v1_to_action_fixup(Config).
|
||||||
Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config),
|
|
||||||
emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaField}).
|
|
||||||
|
|
|
@ -24,17 +24,22 @@ connector_type_name() -> kafka_producer.
|
||||||
schema_module() -> emqx_bridge_kafka.
|
schema_module() -> emqx_bridge_kafka.
|
||||||
|
|
||||||
action_to_bridge_v1_fixup(Config) ->
|
action_to_bridge_v1_fixup(Config) ->
|
||||||
rename(<<"parameters">>, <<"kafka">>, Config).
|
emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, Config).
|
||||||
|
|
||||||
rename(OldKey, NewKey, Map) ->
|
bridge_v1_to_action_fixup(Config0) ->
|
||||||
case maps:find(OldKey, Map) of
|
Config = emqx_utils_maps:rename(<<"kafka">>, <<"parameters">>, Config0),
|
||||||
{ok, Value} ->
|
maps:with(producer_action_field_keys(), Config).
|
||||||
maps:remove(OldKey, maps:put(NewKey, Value, Map));
|
|
||||||
error ->
|
|
||||||
Map
|
|
||||||
end.
|
|
||||||
|
|
||||||
bridge_v1_to_action_fixup(Config) ->
|
%%------------------------------------------------------------------------------------------
|
||||||
KafkaField = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config, #{}),
|
%% Internal helper fns
|
||||||
Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config),
|
%%------------------------------------------------------------------------------------------
|
||||||
emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaField}).
|
|
||||||
|
producer_action_field_keys() ->
|
||||||
|
[
|
||||||
|
to_bin(K)
|
||||||
|
|| {K, _} <- emqx_bridge_kafka:fields(kafka_producer_action)
|
||||||
|
].
|
||||||
|
|
||||||
|
to_bin(B) when is_binary(B) -> B;
|
||||||
|
to_bin(L) when is_list(L) -> list_to_binary(L);
|
||||||
|
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
|
||||||
|
|
|
@ -98,16 +98,16 @@ bridge_configs_to_transform(
|
||||||
end.
|
end.
|
||||||
|
|
||||||
split_bridge_to_connector_and_action(
|
split_bridge_to_connector_and_action(
|
||||||
{ConnectorsMap, {BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}}
|
{ConnectorsMap, {BridgeType, BridgeName, ActionConf, ConnectorFields, PreviousRawConfig}}
|
||||||
) ->
|
) ->
|
||||||
%% Get connector fields from bridge config
|
%% Get connector fields from bridge config
|
||||||
ConnectorMap = lists:foldl(
|
ConnectorMap = lists:foldl(
|
||||||
fun({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
fun({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
||||||
case maps:is_key(to_bin(ConnectorFieldName), BridgeConf) of
|
case maps:is_key(to_bin(ConnectorFieldName), ActionConf) of
|
||||||
true ->
|
true ->
|
||||||
NewToTransform = maps:put(
|
NewToTransform = maps:put(
|
||||||
to_bin(ConnectorFieldName),
|
to_bin(ConnectorFieldName),
|
||||||
maps:get(to_bin(ConnectorFieldName), BridgeConf),
|
maps:get(to_bin(ConnectorFieldName), ActionConf),
|
||||||
ToTransformSoFar
|
ToTransformSoFar
|
||||||
),
|
),
|
||||||
NewToTransform;
|
NewToTransform;
|
||||||
|
@ -118,23 +118,6 @@ split_bridge_to_connector_and_action(
|
||||||
#{},
|
#{},
|
||||||
ConnectorFields
|
ConnectorFields
|
||||||
),
|
),
|
||||||
%% Remove connector fields from bridge config to create Action
|
|
||||||
ActionMap0 = lists:foldl(
|
|
||||||
fun
|
|
||||||
({enable, _Spec}, ToTransformSoFar) ->
|
|
||||||
%% Enable filed is used in both
|
|
||||||
ToTransformSoFar;
|
|
||||||
({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
|
|
||||||
case maps:is_key(to_bin(ConnectorFieldName), BridgeConf) of
|
|
||||||
true ->
|
|
||||||
maps:remove(to_bin(ConnectorFieldName), ToTransformSoFar);
|
|
||||||
false ->
|
|
||||||
ToTransformSoFar
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
BridgeConf,
|
|
||||||
ConnectorFields
|
|
||||||
),
|
|
||||||
%% Generate a connector name, if needed. Avoid doing so if there was a previous config.
|
%% Generate a connector name, if needed. Avoid doing so if there was a previous config.
|
||||||
ConnectorName =
|
ConnectorName =
|
||||||
case PreviousRawConfig of
|
case PreviousRawConfig of
|
||||||
|
@ -142,7 +125,7 @@ split_bridge_to_connector_and_action(
|
||||||
_ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
|
_ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
|
||||||
end,
|
end,
|
||||||
%% Add connector field to action map
|
%% Add connector field to action map
|
||||||
ActionMap = maps:put(<<"connector">>, ConnectorName, ActionMap0),
|
ActionMap = maps:put(<<"connector">>, ConnectorName, ActionConf),
|
||||||
{BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
|
{BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
|
||||||
|
|
||||||
generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
|
generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
|
||||||
|
@ -191,7 +174,7 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
|
||||||
),
|
),
|
||||||
%% Add connectors and actions and remove bridges
|
%% Add connectors and actions and remove bridges
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun({BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}, RawConfigSoFar) ->
|
fun({BridgeType, BridgeName, ActionMap0, ConnectorName, ConnectorMap}, RawConfigSoFar) ->
|
||||||
%% Add connector
|
%% Add connector
|
||||||
RawConfigSoFar1 = emqx_utils_maps:deep_put(
|
RawConfigSoFar1 = emqx_utils_maps:deep_put(
|
||||||
[<<"connectors">>, to_bin(ConnectorType), ConnectorName],
|
[<<"connectors">>, to_bin(ConnectorType), ConnectorName],
|
||||||
|
@ -203,21 +186,12 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
|
||||||
[<<"bridges">>, to_bin(BridgeType), BridgeName],
|
[<<"bridges">>, to_bin(BridgeType), BridgeName],
|
||||||
RawConfigSoFar1
|
RawConfigSoFar1
|
||||||
),
|
),
|
||||||
%% Take fields that should be in the top level of the action map
|
ActionMap = emqx_action_info:bridge_v1_to_action_fixup(BridgeType, ActionMap0),
|
||||||
TopLevelFields = [<<"resource_opts">>, <<"enable">>, <<"connector">>],
|
|
||||||
TopLevelActionFields = maps:with(TopLevelFields, ActionMap),
|
|
||||||
ParametersActionFields = maps:without(TopLevelFields, ActionMap),
|
|
||||||
%% Action map should be wrapped under parameters key
|
|
||||||
WrappedParameters = #{<<"parameters">> => ParametersActionFields},
|
|
||||||
FinalActionMap = maps:merge(TopLevelActionFields, WrappedParameters),
|
|
||||||
FixedActionMap = emqx_action_info:bridge_v1_to_action_fixup(
|
|
||||||
BridgeType, FinalActionMap
|
|
||||||
),
|
|
||||||
%% Add action
|
%% Add action
|
||||||
RawConfigSoFar3 = emqx_utils_maps:deep_put(
|
RawConfigSoFar3 = emqx_utils_maps:deep_put(
|
||||||
[actions_config_name(), to_bin(maybe_rename(BridgeType)), BridgeName],
|
[actions_config_name(), to_bin(maybe_rename(BridgeType)), BridgeName],
|
||||||
RawConfigSoFar2,
|
RawConfigSoFar2,
|
||||||
FixedActionMap
|
ActionMap
|
||||||
),
|
),
|
||||||
RawConfigSoFar3
|
RawConfigSoFar3
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -34,7 +34,8 @@
|
||||||
best_effort_recursive_sum/3,
|
best_effort_recursive_sum/3,
|
||||||
if_only_to_toggle_enable/2,
|
if_only_to_toggle_enable/2,
|
||||||
update_if_present/3,
|
update_if_present/3,
|
||||||
put_if/4
|
put_if/4,
|
||||||
|
rename/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([config_key/0, config_key_path/0]).
|
-export_type([config_key/0, config_key_path/0]).
|
||||||
|
@ -309,3 +310,11 @@ put_if(Acc, K, V, true) ->
|
||||||
Acc#{K => V};
|
Acc#{K => V};
|
||||||
put_if(Acc, _K, _V, false) ->
|
put_if(Acc, _K, _V, false) ->
|
||||||
Acc.
|
Acc.
|
||||||
|
|
||||||
|
rename(OldKey, NewKey, Map) ->
|
||||||
|
case maps:find(OldKey, Map) of
|
||||||
|
{ok, Value} ->
|
||||||
|
maps:put(NewKey, Value, maps:remove(OldKey, Map));
|
||||||
|
error ->
|
||||||
|
Map
|
||||||
|
end.
|
||||||
|
|
Loading…
Reference in New Issue