From 84ff7b0b3849d139e068be36e3bdde507b4174bc Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 17 Nov 2023 11:23:53 +0100 Subject: [PATCH] feat(emqx_bridge): action_info with dynamic lookup This allows a n:1 relation between v1 bridge_types to action/connector types as it's the case with mongodb for instance, where we had `mongodb_single` `mongodb_sharded` etc and the new implementation will just have `mongodb`. --- apps/emqx_bridge/src/emqx_action_info.erl | 89 +++++++++++++------ apps/emqx_bridge/src/emqx_bridge_api.erl | 6 +- apps/emqx_bridge/src/emqx_bridge_lib.erl | 22 +++-- apps/emqx_bridge/src/emqx_bridge_v2.erl | 6 +- .../src/emqx_bridge_kafka_impl_producer.erl | 7 +- .../src/emqx_rule_engine_api.erl | 24 ++++- 6 files changed, 108 insertions(+), 46 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index b5d88c4d8..02ed2fda8 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -22,7 +22,7 @@ -export([ action_type_to_connector_type/1, - action_type_to_bridge_v1_type/1, + action_type_to_bridge_v1_type/2, bridge_v1_type_to_action_type/1, is_action_type/1, registered_schema_modules/0, @@ -30,7 +30,12 @@ bridge_v1_to_action_fixup/2 ]). --callback bridge_v1_type_name() -> atom(). +-callback bridge_v1_type_name() -> + atom() + | { + fun(({ActionConfig :: map(), ConnectorConfig :: map()}) -> Type :: atom()), + TypeList :: [atom()] + }. -callback action_type_name() -> atom(). -callback connector_type_name() -> atom(). -callback schema_module() -> atom(). @@ -93,16 +98,22 @@ bridge_v1_type_to_action_type(Type) -> ActionType -> ActionType end. -action_type_to_bridge_v1_type(Bin) when is_binary(Bin) -> - action_type_to_bridge_v1_type(binary_to_existing_atom(Bin)); -action_type_to_bridge_v1_type(Type) -> +action_type_to_bridge_v1_type(Bin, Conf) when is_binary(Bin) -> + action_type_to_bridge_v1_type(binary_to_existing_atom(Bin), Conf); +action_type_to_bridge_v1_type(ActionType, Conf) -> ActionInfoMap = info_map(), ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap), - case maps:get(Type, ActionTypeToBridgeV1Type, undefined) of - undefined -> Type; + case maps:get(ActionType, ActionTypeToBridgeV1Type, undefined) of + undefined -> ActionType; + BridgeV1TypeFun when is_function(BridgeV1TypeFun) -> BridgeV1TypeFun(get_confs(Conf)); BridgeV1Type -> BridgeV1Type end. +get_confs(#{connector := ConnectorName, type := ActionType} = ActionConfig) -> + ConnectorType = action_type_to_connector_type(ActionType), + ConnectorConfig = emqx_conf:get_raw([connectors, ConnectorType, ConnectorName]), + {ActionConfig, ConnectorConfig}. + %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 %% types. For everything else the function should return false. @@ -226,36 +237,56 @@ get_info_map(Module) -> %% Force the module to get loaded _ = code:ensure_loaded(Module), ActionType = Module:action_type_name(), - BridgeV1Type = + {BridgeV1TypeOrFun, BridgeV1Types} = case erlang:function_exported(Module, bridge_v1_type_name, 0) of true -> - Module:bridge_v1_type_name(); + case Module:bridge_v1_type_name() of + {_BridgeV1TypeFun, _BridgeV1Types} = BridgeV1TypeTuple -> + BridgeV1TypeTuple; + BridgeV1Type0 -> + {BridgeV1Type0, [BridgeV1Type0]} + end; false -> - Module:action_type_name() + {ActionType, [ActionType]} end, #{ - action_type_names => #{ - ActionType => true, - BridgeV1Type => true - }, - bridge_v1_type_to_action_type => #{ - BridgeV1Type => ActionType, - %% Alias the bridge V1 type to the action type - ActionType => ActionType - }, + action_type_names => + lists:foldl( + fun(BridgeV1Type, M) -> + M#{BridgeV1Type => true} + end, + #{ActionType => true}, + BridgeV1Types + ), + bridge_v1_type_to_action_type => + lists:foldl( + fun(BridgeV1Type, M) -> + %% Alias the bridge V1 type to the action type + M#{BridgeV1Type => ActionType} + end, + #{ActionType => ActionType}, + BridgeV1Types + ), action_type_to_bridge_v1_type => #{ - ActionType => BridgeV1Type - }, - action_type_to_connector_type => #{ - ActionType => Module:connector_type_name(), - %% Alias the bridge V1 type to the action type - BridgeV1Type => Module:connector_type_name() + ActionType => BridgeV1TypeOrFun }, + action_type_to_connector_type => + lists:foldl( + fun(BridgeV1Type, M) -> + M#{BridgeV1Type => Module:connector_type_name()} + end, + #{ActionType => Module:connector_type_name()}, + BridgeV1Types + ), action_type_to_schema_module => #{ ActionType => Module:schema_module() }, - action_type_to_info_module => #{ - ActionType => Module, - BridgeV1Type => Module - } + action_type_to_info_module => + lists:foldl( + fun(BridgeV1Type, M) -> + M#{BridgeV1Type => Module} + end, + #{ActionType => Module}, + BridgeV1Types + ) }. diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 188f26ab5..a3c058abb 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -907,7 +907,7 @@ format_resource( redact( maps:merge( RawConfFull#{ - type => downgrade_type(Type), + type => downgrade_type(Type, RawConf), name => maps:get(<<"name">>, RawConf, BridgeName), node => Node }, @@ -1162,5 +1162,5 @@ non_compat_bridge_msg() -> upgrade_type(Type) -> emqx_bridge_lib:upgrade_type(Type). -downgrade_type(Type) -> - emqx_bridge_lib:downgrade_type(Type). +downgrade_type(Type, Conf) -> + emqx_bridge_lib:downgrade_type(Type, Conf). diff --git a/apps/emqx_bridge/src/emqx_bridge_lib.erl b/apps/emqx_bridge/src/emqx_bridge_lib.erl index 4be605745..04b3378ce 100644 --- a/apps/emqx_bridge/src/emqx_bridge_lib.erl +++ b/apps/emqx_bridge/src/emqx_bridge_lib.erl @@ -18,7 +18,7 @@ -export([ maybe_withdraw_rule_action/3, upgrade_type/1, - downgrade_type/1 + downgrade_type/2 ]). %% @doc A bridge can be used as a rule action. @@ -61,17 +61,17 @@ upgrade_type(Type) when is_list(Type) -> atom_to_list(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(list_to_binary(Type))). %% @doc Kafka producer bridge type renamed from 'kafka' to 'kafka_bridge' since 5.3.1 -downgrade_type(Type) when is_atom(Type) -> - emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type); -downgrade_type(Type) when is_binary(Type) -> - atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type)); -downgrade_type(Type) when is_list(Type) -> - atom_to_list(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(list_to_binary(Type))). +downgrade_type(Type, Conf) when is_atom(Type) -> + emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type, Conf); +downgrade_type(Type, Conf) when is_binary(Type) -> + atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type, Conf)); +downgrade_type(Type, Conf) when is_list(Type) -> + atom_to_list(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(list_to_binary(Type), Conf)). %% A rule might be referencing an old version bridge type name %% i.e. 'kafka' instead of 'kafka_producer' so we need to try both external_ids(Type, Name) -> - case downgrade_type(Type) of + case downgrade_type(Type, get_conf(Type, Name)) of Type -> [external_id(Type, Name)]; Type0 -> @@ -87,3 +87,9 @@ external_id(BridgeType, BridgeName) -> bin(Bin) when is_binary(Bin) -> Bin; bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). + +get_conf(BridgeType, BridgeName) -> + case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of + true -> emqx_conf:get_raw([actions, BridgeType, BridgeName]); + false -> emqx_conf:get_raw([bridges, BridgeType, BridgeName]) + end. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index f8939df8c..aa96be19b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -111,7 +111,7 @@ bridge_v1_create_dry_run/2, bridge_v1_type_to_bridge_v2_type/1, %% Exception from the naming convention: - bridge_v2_type_to_bridge_v1_type/1, + bridge_v2_type_to_bridge_v1_type/2, bridge_v1_id_to_connector_resource_id/1, bridge_v1_enable_disable/3, bridge_v1_restart/2, @@ -1050,8 +1050,8 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) -> bridge_v1_type_to_bridge_v2_type(Type) -> emqx_action_info:bridge_v1_type_to_action_type(Type). -bridge_v2_type_to_bridge_v1_type(Type) -> - emqx_action_info:action_type_to_bridge_v1_type(Type). +bridge_v2_type_to_bridge_v1_type(Type, Conf) -> + emqx_action_info:action_type_to_bridge_v1_type(Type, Conf). is_bridge_v2_type(Type) -> emqx_action_info:is_action_type(Type). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 4422d8dd5..e5821e6c7 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -621,8 +621,13 @@ partitioner(random) -> random; partitioner(key_dispatch) -> first_key_dispatch. replayq_dir(BridgeType, BridgeName) -> + RawConf = emqx_conf:get_raw([actions, BridgeType, BridgeName]), DirName = iolist_to_binary([ - emqx_bridge_lib:downgrade_type(BridgeType), ":", BridgeName, ":", atom_to_list(node()) + emqx_bridge_lib:downgrade_type(BridgeType, RawConf), + ":", + BridgeName, + ":", + atom_to_list(node()) ]), filename:join([emqx:data_dir(), "kafka", DirName]). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 1e978828b..b24662e53 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -521,8 +521,9 @@ format_action(Actions) -> do_format_action({bridge, BridgeType, BridgeName, _ResId}) -> emqx_bridge_resource:bridge_id(BridgeType, BridgeName); -do_format_action({bridge_v2, BridgeType, BridgeName}) -> - emqx_bridge_resource:bridge_id(emqx_bridge_lib:downgrade_type(BridgeType), BridgeName); +do_format_action({bridge_v2, BridgeType0, BridgeName}) -> + BridgeType = try_downgrade(BridgeType0, BridgeName), + emqx_bridge_resource:bridge_id(BridgeType, BridgeName); do_format_action(#{mod := Mod, func := Func, args := Args}) -> #{ function => printable_function_name(Mod, Func), @@ -533,6 +534,25 @@ do_format_action(#{mod := Mod, func := Func}) -> function => printable_function_name(Mod, Func) }. +try_downgrade(BridgeType, BridgeName) -> + Conf = try_get_conf(BridgeType, BridgeName), + try emqx_bridge_lib:downgrade_type(BridgeType, Conf) of + DowngradedBridgeType -> + DowngradedBridgeType + catch + error:{config_not_found, _} -> + BridgeType + end. + +try_get_conf(BridgeType, BridgeName) -> + try emqx_conf:get_raw([actions, BridgeType, BridgeName]) of + RawConf -> + RawConf + catch + error:{config_not_found, _} -> + #{} + end. + printable_function_name(emqx_rule_actions, Func) -> Func; printable_function_name(Mod, Func) ->