fix: use `_producer` for AEH bridge type

This commit is contained in:
Stefan Strigler 2023-10-30 09:34:56 +01:00
parent aea449306a
commit 176bbe88bc
6 changed files with 29 additions and 26 deletions

View File

@ -737,8 +737,8 @@ bridge_v2_type_to_connector_type(kafka) ->
kafka_producer; kafka_producer;
bridge_v2_type_to_connector_type(kafka_producer) -> bridge_v2_type_to_connector_type(kafka_producer) ->
kafka_producer; kafka_producer;
bridge_v2_type_to_connector_type(azure_event_hub) -> bridge_v2_type_to_connector_type(azure_event_hub_producer) ->
azure_event_hub. azure_event_hub_producer.
%%==================================================================== %%====================================================================
%% Data backup API %% Data backup API
@ -964,8 +964,8 @@ bridge_v1_type_to_bridge_v2_type(kafka) ->
kafka_producer; kafka_producer;
bridge_v1_type_to_bridge_v2_type(kafka_producer) -> bridge_v1_type_to_bridge_v2_type(kafka_producer) ->
kafka_producer; kafka_producer;
bridge_v1_type_to_bridge_v2_type(azure_event_hub) -> bridge_v1_type_to_bridge_v2_type(azure_event_hub_producer) ->
azure_event_hub. azure_event_hub_producer.
%% This function should return true for all inputs that are bridge V1 types for %% This function should return true for all inputs that are bridge V1 types for
%% bridges that have been refactored to bridge V2s, and for all all bridge V2 %% bridges that have been refactored to bridge V2s, and for all all bridge V2
@ -976,7 +976,7 @@ is_bridge_v2_type(<<"kafka_producer">>) ->
true; true;
is_bridge_v2_type(<<"kafka">>) -> is_bridge_v2_type(<<"kafka">>) ->
true; true;
is_bridge_v2_type(<<"azure_event_hub">>) -> is_bridge_v2_type(<<"azure_event_hub_producer">>) ->
true; true;
is_bridge_v2_type(_) -> is_bridge_v2_type(_) ->
false. false.
@ -1385,19 +1385,20 @@ to_existing_atom(X) ->
{error, _} -> throw(bad_atom) {error, _} -> throw(bad_atom)
end. end.
validate_referenced_connectors(Type0, ConnectorName0, BridgeName) -> validate_referenced_connectors(BridgeType, ConnectorNameBin, BridgeName) ->
%% N.B.: assumes that, for all bridgeV2 types, the name of the bridge type is %% N.B.: assumes that, for all bridgeV2 types, the name of the bridge type is
%% identical to its matching connector type name. %% identical to its matching connector type name.
try try
Type = to_existing_atom(Type0), ConnectorType = bridge_v2_type_to_connector_type(to_existing_atom(BridgeType)),
ConnectorName = to_existing_atom(ConnectorName0), ConnectorName = to_existing_atom(ConnectorNameBin),
case emqx_config:get([connectors, Type, ConnectorName], undefined) of case emqx_config:get([connectors, ConnectorType, ConnectorName], undefined) of
undefined -> undefined ->
{error, #{ {error, #{
reason => "connector_not_found_or_wrong_type", reason => "connector_not_found_or_wrong_type",
type => Type, connector_name => ConnectorName,
connectortype => ConnectorType,
bridge_name => BridgeName, bridge_name => BridgeName,
connector_name => ConnectorName bridge_type => BridgeType
}}; }};
_ -> _ ->
ok ok
@ -1406,9 +1407,9 @@ validate_referenced_connectors(Type0, ConnectorName0, BridgeName) ->
throw:bad_atom -> throw:bad_atom ->
{error, #{ {error, #{
reason => "connector_not_found_or_wrong_type", reason => "connector_not_found_or_wrong_type",
type => Type0, type => BridgeType,
bridge_name => BridgeName, bridge_name => BridgeName,
connector_name => ConnectorName0 connector_name => ConnectorNameBin
}} }}
end. end.

View File

@ -44,7 +44,7 @@ bridge_v2_structs() ->
required => false required => false
} }
)}, )},
{azure_event_hub, {azure_event_hub_producer,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_azure_event_hub, bridge_v2)), hoconsc:map(name, ref(emqx_bridge_azure_event_hub, bridge_v2)),
#{ #{
@ -57,7 +57,7 @@ bridge_v2_structs() ->
api_schemas(Method) -> api_schemas(Method) ->
[ [
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"), api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"),
api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_bridge_v2") api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2")
]. ].
api_ref(Module, Type, Method) -> api_ref(Module, Type, Method) ->

View File

@ -31,8 +31,8 @@
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-define(AEH_CONNECTOR_TYPE, azure_event_hub). -define(AEH_CONNECTOR_TYPE, azure_event_hub_producer).
-define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub">>). -define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>).
%%------------------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API %% `hocon_schema' API

View File

@ -10,8 +10,10 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(BRIDGE_TYPE, azure_event_hub). -define(BRIDGE_TYPE, azure_event_hub_producer).
-define(BRIDGE_TYPE_BIN, <<"azure_event_hub">>). -define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>).
-define(CONNECTOR_TYPE, azure_event_hub_producer).
-define(CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>).
-define(KAFKA_BRIDGE_TYPE, kafka_producer). -define(KAFKA_BRIDGE_TYPE, kafka_producer).
-define(APPS, [emqx_resource, emqx_connector, emqx_bridge, emqx_rule_engine]). -define(APPS, [emqx_resource, emqx_connector, emqx_bridge, emqx_rule_engine]).
@ -88,7 +90,7 @@ common_init_per_testcase(TestCase, Config) ->
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
ExtraConfig ++ ExtraConfig ++
[ [
{connector_type, ?BRIDGE_TYPE}, {connector_type, ?CONNECTOR_TYPE},
{connector_name, Name}, {connector_name, Name},
{connector_config, ConnectorConfig}, {connector_config, ConnectorConfig},
{bridge_type, ?BRIDGE_TYPE}, {bridge_type, ?BRIDGE_TYPE},
@ -156,7 +158,7 @@ connector_config(Name, KafkaHost, KafkaPort) ->
parse_and_check_connector_config(InnerConfigMap, Name). parse_and_check_connector_config(InnerConfigMap, Name).
parse_and_check_connector_config(InnerConfigMap, Name) -> parse_and_check_connector_config(InnerConfigMap, Name) ->
TypeBin = ?BRIDGE_TYPE_BIN, TypeBin = ?CONNECTOR_TYPE_BIN,
RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}}, RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
#{<<"connectors">> := #{TypeBin := #{Name := Config}}} = #{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{ hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{

View File

@ -23,7 +23,7 @@ resource_type(Type) when is_binary(Type) ->
resource_type(kafka_producer) -> resource_type(kafka_producer) ->
emqx_bridge_kafka_impl_producer; emqx_bridge_kafka_impl_producer;
%% We use AEH's Kafka interface. %% We use AEH's Kafka interface.
resource_type(azure_event_hub) -> resource_type(azure_event_hub_producer) ->
emqx_bridge_kafka_impl_producer; emqx_bridge_kafka_impl_producer;
resource_type(Type) -> resource_type(Type) ->
error({unknown_connector_type, Type}). error({unknown_connector_type, Type}).
@ -31,7 +31,7 @@ resource_type(Type) ->
%% For connectors that need to override connector configurations. %% For connectors that need to override connector configurations.
connector_impl_module(ConnectorType) when is_binary(ConnectorType) -> connector_impl_module(ConnectorType) when is_binary(ConnectorType) ->
connector_impl_module(binary_to_atom(ConnectorType, utf8)); connector_impl_module(binary_to_atom(ConnectorType, utf8));
connector_impl_module(azure_event_hub) -> connector_impl_module(azure_event_hub_producer) ->
emqx_bridge_azure_event_hub; emqx_bridge_azure_event_hub;
connector_impl_module(_ConnectorType) -> connector_impl_module(_ConnectorType) ->
undefined. undefined.
@ -49,7 +49,7 @@ connector_structs() ->
required => false required => false
} }
)}, )},
{azure_event_hub, {azure_event_hub_producer,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_azure_event_hub, "config_connector")), hoconsc:map(name, ref(emqx_bridge_azure_event_hub, "config_connector")),
#{ #{
@ -82,7 +82,7 @@ api_schemas(Method) ->
%% We need to map the `type' field of a request (binary) to a %% We need to map the `type' field of a request (binary) to a
%% connector schema module. %% connector schema module.
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_connector") api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector")
]. ].
api_ref(Module, Type, Method) -> api_ref(Module, Type, Method) ->

View File

@ -57,7 +57,7 @@ enterprise_fields_connectors() -> [].
-endif. -endif.
connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
connector_type_to_bridge_types(azure_event_hub) -> [azure_event_hub]. connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer].
actions_config_name() -> <<"bridges_v2">>. actions_config_name() -> <<"bridges_v2">>.