diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 1c0a3957a..df404d9b0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -249,32 +249,42 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) -> Error end. +make_message(Config, MakeMessageFun) -> + BridgeType = ?config(bridge_type, Config), + case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of + true -> + BridgeId = emqx_bridge_v2_testlib:bridge_id(Config), + {BridgeId, MakeMessageFun()}; + false -> + {send_message, MakeMessageFun()} + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> - ResourceId = resource_id(Config), ?check_trace( begin ?assertMatch({ok, _}, create_bridge_api(Config)), + ResourceId = resource_id(Config), ?retry( _Sleep = 1_000, _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - Message = {send_message, MakeMessageFun()}, + Message = make_message(Config, MakeMessageFun), IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)), ok end, fun(Trace) -> + ResourceId = resource_id(Config), ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace)) end ), ok. t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> - ResourceId = resource_id(Config), ReplyFun = fun(Pid, Result) -> Pid ! {result, Result} @@ -282,12 +292,13 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> ?check_trace( begin ?assertMatch({ok, _}, create_bridge_api(Config)), + ResourceId = resource_id(Config), ?retry( _Sleep = 1_000, _Attempts = 20, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - Message = {send_message, MakeMessageFun()}, + Message = make_message(Config, MakeMessageFun), ?assertMatch( {ok, {ok, _}}, ?wait_async_action( @@ -301,6 +312,7 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> ok end, fun(Trace) -> + ResourceId = resource_id(Config), ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace)) end ), @@ -342,7 +354,6 @@ t_start_stop(Config, StopTracePoint) -> t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint). t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> - ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ?check_trace( begin %% Check that the bridge probe API doesn't leak atoms. @@ -365,6 +376,7 @@ t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> ?assertEqual(AtomsBefore, AtomsAfter), ?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)), + ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. @@ -428,6 +440,7 @@ t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> ok end, fun(Trace) -> + ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), %% one for each probe, two for real ?assertMatch( [_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}], @@ -445,9 +458,9 @@ t_on_get_status(Config, Opts) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), ProxyName = ?config(proxy_name, Config), - ResourceId = resource_id(Config), FailureStatus = maps:get(failure_status, Opts, disconnected), ?assertMatch({ok, _}, create_bridge(Config)), + ResourceId = resource_id(Config), %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. ?retry( diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 0d14af9b4..d3a7b05e4 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -122,6 +122,7 @@ setup_mocks() -> catch meck:new(emqx_bridge_v2, MeckOpts), meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()), meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()), + meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()), IsBridgeV2TypeFun = fun(Type) -> BridgeV2Type = bridge_type(), case Type of diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl index 229eb1f74..c721cb9e8 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl @@ -13,7 +13,6 @@ -define(BRIDGE_TYPE, azure_event_hub_producer). -define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>). -define(KAFKA_BRIDGE_TYPE, kafka). --define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine]). -import(emqx_common_test_helpers, [on_exit/1]). @@ -22,9 +21,7 @@ %%------------------------------------------------------------------------------ all() -> - %TODO: fix tests - %emqx_common_test_helpers:all(?MODULE). - []. + emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> KafkaHost = os:getenv("KAFKA_SASL_SSL_HOST", "toxiproxy.emqx.net"), @@ -43,6 +40,7 @@ init_per_suite(Config) -> emqx_resource, emqx_bridge_azure_event_hub, emqx_bridge, + emqx_rule_engine, {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} ], #{work_dir => ?config(priv_dir, Config)} @@ -283,8 +281,6 @@ t_sync_query(Config) -> t_same_name_azure_kafka_bridges(AehConfig) -> ConfigKafka = lists:keyreplace(bridge_type, 1, AehConfig, {bridge_type, ?KAFKA_BRIDGE_TYPE}), BridgeName = ?config(bridge_name, AehConfig), - AehResourceId = emqx_bridge_testlib:resource_id(AehConfig), - KafkaResourceId = emqx_bridge_testlib:resource_id(ConfigKafka), TracePoint = emqx_bridge_kafka_impl_producer_sync_query, %% creates the AEH bridge and check it's working ok = emqx_bridge_testlib:t_sync_query( @@ -295,6 +291,8 @@ t_same_name_azure_kafka_bridges(AehConfig) -> ), %% than creates a Kafka bridge with same name and delete it after creation ok = emqx_bridge_testlib:t_create_via_http(ConfigKafka), + AehResourceId = emqx_bridge_testlib:resource_id(AehConfig), + KafkaResourceId = emqx_bridge_testlib:resource_id(ConfigKafka), %% check that both bridges are healthy ?assertEqual({ok, connected}, emqx_resource_manager:health_check(AehResourceId)), ?assertEqual({ok, connected}, emqx_resource_manager:health_check(KafkaResourceId)), @@ -309,7 +307,8 @@ t_same_name_azure_kafka_bridges(AehConfig) -> % check that AEH bridge is still working ?check_trace( begin - Message = {send_message, make_message()}, + BridgeId = emqx_bridge_v2_testlib:bridge_id(AehConfig), + Message = {BridgeId, make_message()}, ?assertEqual(ok, emqx_resource:simple_sync_query(AehResourceId, Message)), ok end, diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl index 14a3dc870..206cc08e0 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl @@ -15,7 +15,6 @@ -define(CONNECTOR_TYPE, azure_event_hub_producer). -define(CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>). -define(KAFKA_BRIDGE_TYPE, kafka_producer). --define(APPS, [emqx_resource, emqx_connector, emqx_bridge, emqx_rule_engine]). -import(emqx_common_test_helpers, [on_exit/1]). @@ -43,6 +42,7 @@ init_per_suite(Config) -> emqx_resource, emqx_bridge_azure_event_hub, emqx_bridge, + emqx_rule_engine, {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} ], #{work_dir => ?config(priv_dir, Config)}