test(emqx_bridge_azure_event_hub): fix legacy v1 bridge API tests

This commit is contained in:
Stefan Strigler 2023-10-30 11:25:47 +01:00
parent 176bbe88bc
commit b256241650
4 changed files with 27 additions and 14 deletions

View File

@ -249,32 +249,42 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
Error Error
end. end.
make_message(Config, MakeMessageFun) ->
BridgeType = ?config(bridge_type, Config),
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
true ->
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
{BridgeId, MakeMessageFun()};
false ->
{send_message, MakeMessageFun()}
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
ResourceId = resource_id(Config),
?check_trace( ?check_trace(
begin begin
?assertMatch({ok, _}, create_bridge_api(Config)), ?assertMatch({ok, _}, create_bridge_api(Config)),
ResourceId = resource_id(Config),
?retry( ?retry(
_Sleep = 1_000, _Sleep = 1_000,
_Attempts = 20, _Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
), ),
Message = {send_message, MakeMessageFun()}, Message = make_message(Config, MakeMessageFun),
IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)), IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)),
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
ResourceId = resource_id(Config),
?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace)) ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
end end
), ),
ok. ok.
t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
ResourceId = resource_id(Config),
ReplyFun = ReplyFun =
fun(Pid, Result) -> fun(Pid, Result) ->
Pid ! {result, Result} Pid ! {result, Result}
@ -282,12 +292,13 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
?check_trace( ?check_trace(
begin begin
?assertMatch({ok, _}, create_bridge_api(Config)), ?assertMatch({ok, _}, create_bridge_api(Config)),
ResourceId = resource_id(Config),
?retry( ?retry(
_Sleep = 1_000, _Sleep = 1_000,
_Attempts = 20, _Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
), ),
Message = {send_message, MakeMessageFun()}, Message = make_message(Config, MakeMessageFun),
?assertMatch( ?assertMatch(
{ok, {ok, _}}, {ok, {ok, _}},
?wait_async_action( ?wait_async_action(
@ -301,6 +312,7 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
ResourceId = resource_id(Config),
?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace)) ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
end end
), ),
@ -342,7 +354,6 @@ t_start_stop(Config, StopTracePoint) ->
t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint). t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint).
t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) ->
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
?check_trace( ?check_trace(
begin begin
%% Check that the bridge probe API doesn't leak atoms. %% Check that the bridge probe API doesn't leak atoms.
@ -365,6 +376,7 @@ t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) ->
?assertEqual(AtomsBefore, AtomsAfter), ?assertEqual(AtomsBefore, AtomsAfter),
?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)), ?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)),
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
%% Since the connection process is async, we give it some time to %% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness. %% stabilize and avoid flakiness.
@ -428,6 +440,7 @@ t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
%% one for each probe, two for real %% one for each probe, two for real
?assertMatch( ?assertMatch(
[_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}], [_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}],
@ -445,9 +458,9 @@ t_on_get_status(Config, Opts) ->
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
FailureStatus = maps:get(failure_status, Opts, disconnected), FailureStatus = maps:get(failure_status, Opts, disconnected),
?assertMatch({ok, _}, create_bridge(Config)), ?assertMatch({ok, _}, create_bridge(Config)),
ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to %% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness. %% stabilize and avoid flakiness.
?retry( ?retry(

View File

@ -122,6 +122,7 @@ setup_mocks() ->
catch meck:new(emqx_bridge_v2, MeckOpts), catch meck:new(emqx_bridge_v2, MeckOpts),
meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()), meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()),
meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()), meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()),
meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()),
IsBridgeV2TypeFun = fun(Type) -> IsBridgeV2TypeFun = fun(Type) ->
BridgeV2Type = bridge_type(), BridgeV2Type = bridge_type(),
case Type of case Type of

View File

@ -13,7 +13,6 @@
-define(BRIDGE_TYPE, azure_event_hub_producer). -define(BRIDGE_TYPE, azure_event_hub_producer).
-define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>). -define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>).
-define(KAFKA_BRIDGE_TYPE, kafka). -define(KAFKA_BRIDGE_TYPE, kafka).
-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine]).
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
@ -22,9 +21,7 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
all() -> all() ->
%TODO: fix tests emqx_common_test_helpers:all(?MODULE).
%emqx_common_test_helpers:all(?MODULE).
[].
init_per_suite(Config) -> init_per_suite(Config) ->
KafkaHost = os:getenv("KAFKA_SASL_SSL_HOST", "toxiproxy.emqx.net"), KafkaHost = os:getenv("KAFKA_SASL_SSL_HOST", "toxiproxy.emqx.net"),
@ -43,6 +40,7 @@ init_per_suite(Config) ->
emqx_resource, emqx_resource,
emqx_bridge_azure_event_hub, emqx_bridge_azure_event_hub,
emqx_bridge, emqx_bridge,
emqx_rule_engine,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
], ],
#{work_dir => ?config(priv_dir, Config)} #{work_dir => ?config(priv_dir, Config)}
@ -283,8 +281,6 @@ t_sync_query(Config) ->
t_same_name_azure_kafka_bridges(AehConfig) -> t_same_name_azure_kafka_bridges(AehConfig) ->
ConfigKafka = lists:keyreplace(bridge_type, 1, AehConfig, {bridge_type, ?KAFKA_BRIDGE_TYPE}), ConfigKafka = lists:keyreplace(bridge_type, 1, AehConfig, {bridge_type, ?KAFKA_BRIDGE_TYPE}),
BridgeName = ?config(bridge_name, AehConfig), BridgeName = ?config(bridge_name, AehConfig),
AehResourceId = emqx_bridge_testlib:resource_id(AehConfig),
KafkaResourceId = emqx_bridge_testlib:resource_id(ConfigKafka),
TracePoint = emqx_bridge_kafka_impl_producer_sync_query, TracePoint = emqx_bridge_kafka_impl_producer_sync_query,
%% creates the AEH bridge and check it's working %% creates the AEH bridge and check it's working
ok = emqx_bridge_testlib:t_sync_query( ok = emqx_bridge_testlib:t_sync_query(
@ -295,6 +291,8 @@ t_same_name_azure_kafka_bridges(AehConfig) ->
), ),
%% than creates a Kafka bridge with same name and delete it after creation %% than creates a Kafka bridge with same name and delete it after creation
ok = emqx_bridge_testlib:t_create_via_http(ConfigKafka), ok = emqx_bridge_testlib:t_create_via_http(ConfigKafka),
AehResourceId = emqx_bridge_testlib:resource_id(AehConfig),
KafkaResourceId = emqx_bridge_testlib:resource_id(ConfigKafka),
%% check that both bridges are healthy %% check that both bridges are healthy
?assertEqual({ok, connected}, emqx_resource_manager:health_check(AehResourceId)), ?assertEqual({ok, connected}, emqx_resource_manager:health_check(AehResourceId)),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(KafkaResourceId)), ?assertEqual({ok, connected}, emqx_resource_manager:health_check(KafkaResourceId)),
@ -309,7 +307,8 @@ t_same_name_azure_kafka_bridges(AehConfig) ->
% check that AEH bridge is still working % check that AEH bridge is still working
?check_trace( ?check_trace(
begin begin
Message = {send_message, make_message()}, BridgeId = emqx_bridge_v2_testlib:bridge_id(AehConfig),
Message = {BridgeId, make_message()},
?assertEqual(ok, emqx_resource:simple_sync_query(AehResourceId, Message)), ?assertEqual(ok, emqx_resource:simple_sync_query(AehResourceId, Message)),
ok ok
end, end,

View File

@ -15,7 +15,6 @@
-define(CONNECTOR_TYPE, azure_event_hub_producer). -define(CONNECTOR_TYPE, azure_event_hub_producer).
-define(CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>). -define(CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>).
-define(KAFKA_BRIDGE_TYPE, kafka_producer). -define(KAFKA_BRIDGE_TYPE, kafka_producer).
-define(APPS, [emqx_resource, emqx_connector, emqx_bridge, emqx_rule_engine]).
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
@ -43,6 +42,7 @@ init_per_suite(Config) ->
emqx_resource, emqx_resource,
emqx_bridge_azure_event_hub, emqx_bridge_azure_event_hub,
emqx_bridge, emqx_bridge,
emqx_rule_engine,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
], ],
#{work_dir => ?config(priv_dir, Config)} #{work_dir => ?config(priv_dir, Config)}