fix: pulsar flaky cluster tests
This commit is contained in:
parent
4ee44972b2
commit
30c931ae62
|
@ -14,7 +14,7 @@
|
||||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||||
|
|
||||||
-define(BRIDGE_TYPE_BIN, <<"pulsar_producer">>).
|
-define(BRIDGE_TYPE_BIN, <<"pulsar_producer">>).
|
||||||
-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_pulsar]).
|
-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]).
|
||||||
-define(RULE_TOPIC, "mqtt/rule").
|
-define(RULE_TOPIC, "mqtt/rule").
|
||||||
-define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
|
-define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
|
||||||
|
|
||||||
|
@ -123,10 +123,10 @@ common_init_per_group() ->
|
||||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
%% Ensure enterprise bridge module is loaded
|
%% Ensure enterprise bridge module is loaded
|
||||||
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_resource, emqx_bridge]),
|
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||||
_ = application:ensure_all_started(pulsar),
|
ok = emqx_common_test_helpers:start_apps(?APPS),
|
||||||
|
{ok, _} = application:ensure_all_started(pulsar),
|
||||||
_ = emqx_bridge_enterprise:module_info(),
|
_ = emqx_bridge_enterprise:module_info(),
|
||||||
ok = emqx_connector_test_helpers:start_apps(?APPS),
|
|
||||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||||
emqx_mgmt_api_test_util:init_suite(),
|
emqx_mgmt_api_test_util:init_suite(),
|
||||||
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
||||||
|
@ -520,7 +520,7 @@ cluster(Config) ->
|
||||||
Cluster = emqx_common_test_helpers:emqx_cluster(
|
Cluster = emqx_common_test_helpers:emqx_cluster(
|
||||||
[core, core],
|
[core, core],
|
||||||
[
|
[
|
||||||
{apps, [emqx_conf, emqx_bridge, emqx_rule_engine, emqx_bridge_pulsar]},
|
{apps, [emqx_conf] ++ ?APPS ++ [pulsar]},
|
||||||
{listener_ports, []},
|
{listener_ports, []},
|
||||||
{peer_mod, PeerModule},
|
{peer_mod, PeerModule},
|
||||||
{priv_data_dir, PrivDataDir},
|
{priv_data_dir, PrivDataDir},
|
||||||
|
@ -1099,6 +1099,7 @@ do_t_cluster(Config) ->
|
||||||
),
|
),
|
||||||
{ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end),
|
{ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end),
|
||||||
{ok, _} = snabbkaffe:receive_events(SRef1),
|
{ok, _} = snabbkaffe:receive_events(SRef1),
|
||||||
|
erpc:multicall(Nodes, fun wait_until_producer_connected/0),
|
||||||
{ok, _} = snabbkaffe:block_until(
|
{ok, _} = snabbkaffe:block_until(
|
||||||
?match_n_events(
|
?match_n_events(
|
||||||
NumNodes,
|
NumNodes,
|
||||||
|
@ -1120,7 +1121,6 @@ do_t_cluster(Config) ->
|
||||||
end,
|
end,
|
||||||
Nodes
|
Nodes
|
||||||
),
|
),
|
||||||
erpc:multicall(Nodes, fun wait_until_producer_connected/0),
|
|
||||||
Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
|
Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
|
||||||
?tp(publishing_message, #{}),
|
?tp(publishing_message, #{}),
|
||||||
erpc:call(N2, emqx, publish, [Message0]),
|
erpc:call(N2, emqx, publish, [Message0]),
|
||||||
|
|
Loading…
Reference in New Issue