diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index b8aa62ef6..18f79a782 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -737,8 +737,8 @@ bridge_v2_type_to_connector_type(kafka) -> kafka_producer; bridge_v2_type_to_connector_type(kafka_producer) -> kafka_producer; -bridge_v2_type_to_connector_type(azure_event_hub) -> - azure_event_hub. +bridge_v2_type_to_connector_type(azure_event_hub_producer) -> + azure_event_hub_producer. %%==================================================================== %% Data backup API @@ -964,8 +964,8 @@ bridge_v1_type_to_bridge_v2_type(kafka) -> kafka_producer; bridge_v1_type_to_bridge_v2_type(kafka_producer) -> kafka_producer; -bridge_v1_type_to_bridge_v2_type(azure_event_hub) -> - azure_event_hub. +bridge_v1_type_to_bridge_v2_type(azure_event_hub_producer) -> + azure_event_hub_producer. %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 @@ -976,7 +976,7 @@ is_bridge_v2_type(<<"kafka_producer">>) -> true; is_bridge_v2_type(<<"kafka">>) -> true; -is_bridge_v2_type(<<"azure_event_hub">>) -> +is_bridge_v2_type(<<"azure_event_hub_producer">>) -> true; is_bridge_v2_type(_) -> false. @@ -1385,33 +1385,37 @@ to_existing_atom(X) -> {error, _} -> throw(bad_atom) end. -validate_referenced_connectors(Type0, ConnectorName0, BridgeName) -> +validate_referenced_connectors(BridgeType, ConnectorNameBin, BridgeName) -> %% N.B.: assumes that, for all bridgeV2 types, the name of the bridge type is %% identical to its matching connector type name. try - Type = to_existing_atom(Type0), - ConnectorName = to_existing_atom(ConnectorName0), - case emqx_config:get([connectors, Type, ConnectorName], undefined) of + {ConnectorName, ConnectorType} = to_connector(ConnectorNameBin, BridgeType), + case emqx_config:get([connectors, ConnectorType, ConnectorName], undefined) of undefined -> - {error, #{ - reason => "connector_not_found_or_wrong_type", - type => Type, - bridge_name => BridgeName, - connector_name => ConnectorName - }}; + throw(not_found); _ -> ok end catch - throw:bad_atom -> + throw:not_found -> {error, #{ reason => "connector_not_found_or_wrong_type", - type => Type0, + connector_name => ConnectorNameBin, bridge_name => BridgeName, - connector_name => ConnectorName0 + bridge_type => BridgeType }} end. +to_connector(ConnectorNameBin, BridgeType) -> + try + ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(to_existing_atom(BridgeType)), + ConnectorName = to_existing_atom(ConnectorNameBin), + {ConnectorName, ConnectorType} + catch + _:_ -> + throw(not_found) + end. + multi_validate_referenced_connectors(Configs) -> Pipeline = lists:map( diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl index 188ac9f17..079aa1c64 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl @@ -44,7 +44,7 @@ bridge_v2_structs() -> required => false } )}, - {azure_event_hub, + {azure_event_hub_producer, mk( hoconsc:map(name, ref(emqx_bridge_azure_event_hub, bridge_v2)), #{ @@ -57,7 +57,7 @@ bridge_v2_structs() -> api_schemas(Method) -> [ api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_bridge_v2") + api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2") ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 1c0a3957a..df404d9b0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -249,32 +249,42 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) -> Error end. +make_message(Config, MakeMessageFun) -> + BridgeType = ?config(bridge_type, Config), + case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of + true -> + BridgeId = emqx_bridge_v2_testlib:bridge_id(Config), + {BridgeId, MakeMessageFun()}; + false -> + {send_message, MakeMessageFun()} + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> - ResourceId = resource_id(Config), ?check_trace( begin ?assertMatch({ok, _}, create_bridge_api(Config)), + ResourceId = resource_id(Config), ?retry( _Sleep = 1_000, _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - Message = {send_message, MakeMessageFun()}, + Message = make_message(Config, MakeMessageFun), IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)), ok end, fun(Trace) -> + ResourceId = resource_id(Config), ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace)) end ), ok. t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> - ResourceId = resource_id(Config), ReplyFun = fun(Pid, Result) -> Pid ! {result, Result} @@ -282,12 +292,13 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> ?check_trace( begin ?assertMatch({ok, _}, create_bridge_api(Config)), + ResourceId = resource_id(Config), ?retry( _Sleep = 1_000, _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - Message = {send_message, MakeMessageFun()}, + Message = make_message(Config, MakeMessageFun), ?assertMatch( {ok, {ok, _}}, ?wait_async_action( @@ -301,6 +312,7 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> ok end, fun(Trace) -> + ResourceId = resource_id(Config), ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace)) end ), @@ -342,7 +354,6 @@ t_start_stop(Config, StopTracePoint) -> t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint). t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> - ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ?check_trace( begin %% Check that the bridge probe API doesn't leak atoms. @@ -365,6 +376,7 @@ t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> ?assertEqual(AtomsBefore, AtomsAfter), ?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)), + ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. @@ -428,6 +440,7 @@ t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> ok end, fun(Trace) -> + ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), %% one for each probe, two for real ?assertMatch( [_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}], @@ -445,9 +458,9 @@ t_on_get_status(Config, Opts) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), - ResourceId = resource_id(Config), FailureStatus = maps:get(failure_status, Opts, disconnected), ?assertMatch({ok, _}, create_bridge(Config)), + ResourceId = resource_id(Config), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. ?retry( diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 0d14af9b4..6e15887c8 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -120,16 +120,16 @@ setup_mocks() -> meck:expect(emqx_bridge_v2_schema, fields, 1, bridge_schema()), catch meck:new(emqx_bridge_v2, MeckOpts), - meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()), + BridgeType = bridge_type(), + BridgeTypeBin = atom_to_binary(BridgeType), + meck:expect( + emqx_bridge_v2, + bridge_v2_type_to_connector_type, + fun(Type) when Type =:= BridgeType; Type =:= BridgeTypeBin -> con_type() end + ), meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()), - IsBridgeV2TypeFun = fun(Type) -> - BridgeV2Type = bridge_type(), - case Type of - BridgeV2Type -> true; - _ -> false - end - end, - meck:expect(emqx_bridge_v2, is_bridge_v2_type, 1, IsBridgeV2TypeFun), + + meck:expect(emqx_bridge_v2, is_bridge_v2_type, fun(Type) -> Type =:= BridgeType end), ok. init_per_suite(Config) -> @@ -519,8 +519,8 @@ t_load_no_matching_connector(_Config) -> {error, {post_config_update, _HandlerMod, #{ bridge_name := my_test_bridge_update, - connector_name := unknown, - type := _, + connector_name := <<"unknown">>, + bridge_type := _, reason := "connector_not_found_or_wrong_type" }}}, update_root_config(RootConf0) @@ -536,8 +536,8 @@ t_load_no_matching_connector(_Config) -> {error, {post_config_update, _HandlerMod, #{ bridge_name := my_test_bridge_new, - connector_name := unknown, - type := _, + connector_name := <<"unknown">>, + bridge_type := _, reason := "connector_not_found_or_wrong_type" }}}, update_root_config(RootConf1) @@ -608,7 +608,7 @@ t_create_no_matching_connector(_Config) -> {post_config_update, _HandlerMod, #{ bridge_name := _, connector_name := _, - type := _, + bridge_type := _, reason := "connector_not_found_or_wrong_type" }}}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf) @@ -628,7 +628,7 @@ t_create_wrong_connector_type(_Config) -> {post_config_update, _HandlerMod, #{ bridge_name := _, connector_name := _, - type := wrong_type, + bridge_type := wrong_type, reason := "connector_not_found_or_wrong_type" }}}, emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf) @@ -644,7 +644,7 @@ t_update_connector_not_found(_Config) -> {post_config_update, _HandlerMod, #{ bridge_name := _, connector_name := _, - type := _, + bridge_type := _, reason := "connector_not_found_or_wrong_type" }}}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf) diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index 7d36c894e..830681def 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -31,8 +31,8 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). --define(AEH_CONNECTOR_TYPE, azure_event_hub). --define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub">>). +-define(AEH_CONNECTOR_TYPE, azure_event_hub_producer). +-define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>). %%------------------------------------------------------------------------------------------------- %% `hocon_schema' API diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl index 229eb1f74..c721cb9e8 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl @@ -13,7 +13,6 @@ -define(BRIDGE_TYPE, azure_event_hub_producer). -define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>). -define(KAFKA_BRIDGE_TYPE, kafka). --define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine]). -import(emqx_common_test_helpers, [on_exit/1]). @@ -22,9 +21,7 @@ %%------------------------------------------------------------------------------ all() -> - %TODO: fix tests - %emqx_common_test_helpers:all(?MODULE). - []. + emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> KafkaHost = os:getenv("KAFKA_SASL_SSL_HOST", "toxiproxy.emqx.net"), @@ -43,6 +40,7 @@ init_per_suite(Config) -> emqx_resource, emqx_bridge_azure_event_hub, emqx_bridge, + emqx_rule_engine, {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} ], #{work_dir => ?config(priv_dir, Config)} @@ -283,8 +281,6 @@ t_sync_query(Config) -> t_same_name_azure_kafka_bridges(AehConfig) -> ConfigKafka = lists:keyreplace(bridge_type, 1, AehConfig, {bridge_type, ?KAFKA_BRIDGE_TYPE}), BridgeName = ?config(bridge_name, AehConfig), - AehResourceId = emqx_bridge_testlib:resource_id(AehConfig), - KafkaResourceId = emqx_bridge_testlib:resource_id(ConfigKafka), TracePoint = emqx_bridge_kafka_impl_producer_sync_query, %% creates the AEH bridge and check it's working ok = emqx_bridge_testlib:t_sync_query( @@ -295,6 +291,8 @@ t_same_name_azure_kafka_bridges(AehConfig) -> ), %% than creates a Kafka bridge with same name and delete it after creation ok = emqx_bridge_testlib:t_create_via_http(ConfigKafka), + AehResourceId = emqx_bridge_testlib:resource_id(AehConfig), + KafkaResourceId = emqx_bridge_testlib:resource_id(ConfigKafka), %% check that both bridges are healthy ?assertEqual({ok, connected}, emqx_resource_manager:health_check(AehResourceId)), ?assertEqual({ok, connected}, emqx_resource_manager:health_check(KafkaResourceId)), @@ -309,7 +307,8 @@ t_same_name_azure_kafka_bridges(AehConfig) -> % check that AEH bridge is still working ?check_trace( begin - Message = {send_message, make_message()}, + BridgeId = emqx_bridge_v2_testlib:bridge_id(AehConfig), + Message = {BridgeId, make_message()}, ?assertEqual(ok, emqx_resource:simple_sync_query(AehResourceId, Message)), ok end, diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index decbc1ed3..206cc08e0 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -10,10 +10,11 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --define(BRIDGE_TYPE, azure_event_hub). --define(BRIDGE_TYPE_BIN, <<"azure_event_hub">>). +-define(BRIDGE_TYPE, azure_event_hub_producer). +-define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>). +-define(CONNECTOR_TYPE, azure_event_hub_producer). +-define(CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>). -define(KAFKA_BRIDGE_TYPE, kafka_producer). --define(APPS, [emqx_resource, emqx_connector, emqx_bridge, emqx_rule_engine]). -import(emqx_common_test_helpers, [on_exit/1]). @@ -41,6 +42,7 @@ init_per_suite(Config) -> emqx_resource, emqx_bridge_azure_event_hub, emqx_bridge, + emqx_rule_engine, {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} ], #{work_dir => ?config(priv_dir, Config)} @@ -88,7 +90,7 @@ common_init_per_testcase(TestCase, Config) -> ok = snabbkaffe:start_trace(), ExtraConfig ++ [ - {connector_type, ?BRIDGE_TYPE}, + {connector_type, ?CONNECTOR_TYPE}, {connector_name, Name}, {connector_config, ConnectorConfig}, {bridge_type, ?BRIDGE_TYPE}, @@ -156,7 +158,7 @@ connector_config(Name, KafkaHost, KafkaPort) -> parse_and_check_connector_config(InnerConfigMap, Name). parse_and_check_connector_config(InnerConfigMap, Name) -> - TypeBin = ?BRIDGE_TYPE_BIN, + TypeBin = ?CONNECTOR_TYPE_BIN, RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}}, #{<<"connectors">> := #{TypeBin := #{Name := Config}}} = hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{ diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index a048b327e..c07856c55 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -23,7 +23,7 @@ resource_type(Type) when is_binary(Type) -> resource_type(kafka_producer) -> emqx_bridge_kafka_impl_producer; %% We use AEH's Kafka interface. -resource_type(azure_event_hub) -> +resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -31,7 +31,7 @@ resource_type(Type) -> %% For connectors that need to override connector configurations. connector_impl_module(ConnectorType) when is_binary(ConnectorType) -> connector_impl_module(binary_to_atom(ConnectorType, utf8)); -connector_impl_module(azure_event_hub) -> +connector_impl_module(azure_event_hub_producer) -> emqx_bridge_azure_event_hub; connector_impl_module(_ConnectorType) -> undefined. @@ -49,7 +49,7 @@ connector_structs() -> required => false } )}, - {azure_event_hub, + {azure_event_hub_producer, mk( hoconsc:map(name, ref(emqx_bridge_azure_event_hub, "config_connector")), #{ @@ -82,7 +82,7 @@ api_schemas(Method) -> %% We need to map the `type' field of a request (binary) to a %% connector schema module. api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_connector") + api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector") ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index ab2213793..d006d27c0 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -57,7 +57,7 @@ enterprise_fields_connectors() -> []. -endif. connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; -connector_type_to_bridge_types(azure_event_hub) -> [azure_event_hub]. +connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]. actions_config_name() -> <<"bridges_v2">>.