test(pulsar_producer): attempt to fix flaky test
This commit is contained in:
parent
770dd188b1
commit
97a9bb484a
|
@ -59,3 +59,27 @@
|
|||
end
|
||||
end)()
|
||||
).
|
||||
|
||||
-define(retrying(CONFIG, NUM_RETRIES, TEST_BODY_FN), begin
|
||||
__TEST_CASE = ?FUNCTION_NAME,
|
||||
(fun
|
||||
__GO(__CONFIG, __N) when __N >= NUM_RETRIES ->
|
||||
TEST_BODY_FN(__CONFIG);
|
||||
__GO(__CONFIG, __N) ->
|
||||
try
|
||||
TEST_BODY_FN(__CONFIG)
|
||||
catch
|
||||
__KIND:__REASON:__STACKTRACE ->
|
||||
ct:pal("test errored; will retry\n ~p", [
|
||||
#{kind => __KIND, reason => __REASON, stacktrace => __STACKTRACE}
|
||||
]),
|
||||
end_per_testcase(__TEST_CASE, __CONFIG),
|
||||
garbage_collect(),
|
||||
timer:sleep(1000),
|
||||
__CONFIG1 = init_per_testcase(__TEST_CASE, __CONFIG),
|
||||
__GO(__CONFIG1, __N + 1)
|
||||
end
|
||||
end)(
|
||||
CONFIG, 0
|
||||
)
|
||||
end).
|
||||
|
|
|
@ -724,6 +724,12 @@ start_slave(Name, Opts) when is_map(Opts) ->
|
|||
Node = node_name(Name),
|
||||
put_peer_mod(Node, SlaveMod),
|
||||
Cookie = atom_to_list(erlang:get_cookie()),
|
||||
PrivDataDir = maps:get(priv_data_dir, Opts, "/tmp"),
|
||||
NodeDataDir = filename:join([
|
||||
PrivDataDir,
|
||||
Node,
|
||||
integer_to_list(erlang:unique_integer())
|
||||
]),
|
||||
DoStart =
|
||||
fun() ->
|
||||
case SlaveMod of
|
||||
|
@ -738,7 +744,8 @@ start_slave(Name, Opts) when is_map(Opts) ->
|
|||
{erl_flags, erl_flags()},
|
||||
{env, [
|
||||
{"HOCON_ENV_OVERRIDE_PREFIX", "EMQX_"},
|
||||
{"EMQX_NODE__COOKIE", Cookie}
|
||||
{"EMQX_NODE__COOKIE", Cookie},
|
||||
{"EMQX_NODE__DATA_DIR", NodeDataDir}
|
||||
]}
|
||||
]
|
||||
);
|
||||
|
@ -843,7 +850,12 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
|||
integer_to_list(erlang:unique_integer()),
|
||||
"mnesia"
|
||||
]),
|
||||
erpc:call(Node, application, set_env, [mnesia, dir, MnesiaDataDir]),
|
||||
case erpc:call(Node, application, get_env, [mnesia, dir, undefined]) of
|
||||
undefined ->
|
||||
erpc:call(Node, application, set_env, [mnesia, dir, MnesiaDataDir]);
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
|
||||
%% Needs to be set explicitly because ekka:start() (which calls `gen`) is called without Handler
|
||||
%% in emqx_common_test_helpers:start_apps(...)
|
||||
|
@ -875,11 +887,9 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
|||
integer_to_list(erlang:unique_integer())
|
||||
]),
|
||||
Cookie = atom_to_list(erlang:get_cookie()),
|
||||
os:putenv("EMQX_NODE__DATA_DIR", NodeDataDir),
|
||||
os:putenv("EMQX_NODE__COOKIE", Cookie),
|
||||
set_env_once("EMQX_NODE__DATA_DIR", NodeDataDir),
|
||||
set_env_once("EMQX_NODE__COOKIE", Cookie),
|
||||
emqx_config:init_load(SchemaMod),
|
||||
os:unsetenv("EMQX_NODE__DATA_DIR"),
|
||||
os:unsetenv("EMQX_NODE__COOKIE"),
|
||||
application:set_env(emqx, init_config_load_done, true)
|
||||
end,
|
||||
|
||||
|
@ -930,6 +940,15 @@ setup_node(Node, Opts) when is_map(Opts) ->
|
|||
|
||||
%% Helpers
|
||||
|
||||
set_env_once(Var, Value) ->
|
||||
case os:getenv(Var) of
|
||||
false ->
|
||||
os:putenv(Var, Value);
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
ok.
|
||||
|
||||
put_peer_mod(Node, SlaveMod) ->
|
||||
put({?MODULE, Node}, SlaveMod),
|
||||
ok.
|
||||
|
@ -1289,6 +1308,7 @@ call_janitor() ->
|
|||
call_janitor(Timeout) ->
|
||||
Janitor = get_or_spawn_janitor(),
|
||||
ok = emqx_test_janitor:stop(Janitor, Timeout),
|
||||
erase({?MODULE, janitor_proc}),
|
||||
ok.
|
||||
|
||||
get_or_spawn_janitor() ->
|
||||
|
|
|
@ -60,13 +60,14 @@ init(Parent) ->
|
|||
{ok, #{callbacks => [], owner => Parent}}.
|
||||
|
||||
terminate(_Reason, #{callbacks := Callbacks}) ->
|
||||
do_terminate(Callbacks).
|
||||
_ = do_terminate(Callbacks),
|
||||
ok.
|
||||
|
||||
handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) ->
|
||||
{reply, ok, State#{callbacks := [Callback | Callbacks]}};
|
||||
handle_call(terminate, _From, State = #{callbacks := Callbacks}) ->
|
||||
do_terminate(Callbacks),
|
||||
{stop, normal, ok, State};
|
||||
FailedCallbacks = do_terminate(Callbacks),
|
||||
{stop, normal, ok, State#{callbacks := FailedCallbacks}};
|
||||
handle_call(_Req, _From, State) ->
|
||||
{reply, error, State}.
|
||||
|
||||
|
@ -83,17 +84,18 @@ handle_info(_Msg, State) ->
|
|||
%%----------------------------------------------------------------------------------
|
||||
|
||||
do_terminate(Callbacks) ->
|
||||
lists:foreach(
|
||||
fun(Fun) ->
|
||||
lists:foldl(
|
||||
fun(Fun, Failed) ->
|
||||
try
|
||||
Fun()
|
||||
Fun(),
|
||||
Failed
|
||||
catch
|
||||
K:E:S ->
|
||||
ct:pal("error executing callback ~p: ~p", [Fun, {K, E}]),
|
||||
ct:pal("stacktrace: ~p", [S]),
|
||||
ok
|
||||
[Fun | Failed]
|
||||
end
|
||||
end,
|
||||
[],
|
||||
Callbacks
|
||||
),
|
||||
ok.
|
||||
).
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("emqx/include/asserts.hrl").
|
||||
|
||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||
|
||||
|
@ -148,6 +149,7 @@ end_per_testcase(_Testcase, Config) ->
|
|||
true ->
|
||||
ok;
|
||||
false ->
|
||||
ok = emqx_config:delete_override_conf_files(),
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
|
@ -157,6 +159,7 @@ end_per_testcase(_Testcase, Config) ->
|
|||
%% machines struggle with all the containers running...
|
||||
emqx_common_test_helpers:call_janitor(60_000),
|
||||
ok = snabbkaffe:stop(),
|
||||
flush_consumed(),
|
||||
ok
|
||||
end.
|
||||
|
||||
|
@ -373,7 +376,9 @@ start_consumer(TestCase, Config) ->
|
|||
(integer_to_binary(PulsarPort))/binary>>
|
||||
),
|
||||
ConnOpts = #{},
|
||||
ConsumerClientId = TestCase,
|
||||
ConsumerClientId = list_to_atom(
|
||||
atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer())
|
||||
),
|
||||
CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
|
||||
SSLOpts = #{
|
||||
enable => UseTLS,
|
||||
|
@ -393,12 +398,12 @@ start_consumer(TestCase, Config) ->
|
|||
cb_init_args => #{send_to => self()},
|
||||
cb_module => pulsar_echo_consumer,
|
||||
sub_type => 'Shared',
|
||||
subscription => atom_to_list(TestCase),
|
||||
subscription => atom_to_list(TestCase) ++ integer_to_list(erlang:unique_integer()),
|
||||
max_consumer_num => 1,
|
||||
%% Note! This must not coincide with the client
|
||||
%% id, or else weird bugs will happen, like the
|
||||
%% consumer never starts...
|
||||
name => test_consumer,
|
||||
name => list_to_atom("test_consumer" ++ integer_to_list(erlang:unique_integer())),
|
||||
consumer_id => 1,
|
||||
conn_opts => ConnOpts
|
||||
},
|
||||
|
@ -440,7 +445,10 @@ wait_until_connected(SupMod, Mod) ->
|
|||
?retry(
|
||||
_Sleep = 300,
|
||||
_Attempts0 = 20,
|
||||
lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
|
||||
begin
|
||||
true = length(Pids) > 0,
|
||||
lists:foreach(fun(P) -> {connected, _} = sys:get_state(P) end, Pids)
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
|
@ -483,6 +491,12 @@ receive_consumed(Timeout) ->
|
|||
ct:fail("no message consumed")
|
||||
end.
|
||||
|
||||
flush_consumed() ->
|
||||
receive
|
||||
{pulsar_message, _} -> flush_consumed()
|
||||
after 0 -> ok
|
||||
end.
|
||||
|
||||
try_decode_json(Payload) ->
|
||||
case emqx_utils_json:safe_decode(Payload, [return_maps]) of
|
||||
{error, _} ->
|
||||
|
@ -1054,31 +1068,44 @@ t_resource_manager_crash_before_producers_started(Config) ->
|
|||
),
|
||||
ok.
|
||||
|
||||
t_cluster(Config) ->
|
||||
MQTTTopic = ?config(mqtt_topic, Config),
|
||||
ResourceId = resource_id(Config),
|
||||
Cluster = cluster(Config),
|
||||
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||
QoS = 0,
|
||||
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||
t_cluster(Config0) ->
|
||||
ct:timetrap({seconds, 120}),
|
||||
?retrying(Config0, 3, fun do_t_cluster/1).
|
||||
|
||||
do_t_cluster(Config) ->
|
||||
?check_trace(
|
||||
begin
|
||||
MQTTTopic = ?config(mqtt_topic, Config),
|
||||
ResourceId = resource_id(Config),
|
||||
Cluster = cluster(Config),
|
||||
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||
QoS = 0,
|
||||
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||
NumNodes = length(Cluster),
|
||||
{ok, SRef0} = snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := emqx_bridge_app_started}),
|
||||
NumNodes,
|
||||
25_000
|
||||
),
|
||||
Nodes = [N1, N2 | _] = start_cluster(Cluster),
|
||||
%% wait until bridge app supervisor is up; by that point,
|
||||
%% `emqx_config_handler:add_handler' has been called and the node should be
|
||||
%% ready to create bridges.
|
||||
NumNodes = length(Nodes),
|
||||
{ok, _} = snabbkaffe:block_until(
|
||||
?match_n_events(NumNodes, #{?snk_kind := emqx_bridge_app_started}),
|
||||
15_000
|
||||
),
|
||||
{ok, SRef0} = snabbkaffe:subscribe(
|
||||
{ok, _} = snabbkaffe:receive_events(SRef0),
|
||||
{ok, SRef1} = snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := pulsar_producer_bridge_started}),
|
||||
NumNodes,
|
||||
15_000
|
||||
25_000
|
||||
),
|
||||
{ok, _} = erpc:call(N1, fun() -> create_bridge(Config) end),
|
||||
{ok, _} = snabbkaffe:receive_events(SRef0),
|
||||
{ok, _} = snabbkaffe:receive_events(SRef1),
|
||||
{ok, _} = snabbkaffe:block_until(
|
||||
?match_n_events(
|
||||
NumNodes,
|
||||
#{?snk_kind := bridge_post_config_update_done}
|
||||
),
|
||||
25_000
|
||||
),
|
||||
lists:foreach(
|
||||
fun(N) ->
|
||||
?retry(
|
||||
|
@ -1095,6 +1122,7 @@ t_cluster(Config) ->
|
|||
),
|
||||
erpc:multicall(Nodes, fun wait_until_producer_connected/0),
|
||||
Message0 = emqx_message:make(ClientId, QoS, MQTTTopic, Payload),
|
||||
?tp(publishing_message, #{}),
|
||||
erpc:call(N2, emqx, publish, [Message0]),
|
||||
|
||||
lists:foreach(
|
||||
|
@ -1108,10 +1136,7 @@ t_cluster(Config) ->
|
|||
Nodes
|
||||
),
|
||||
|
||||
ok
|
||||
end,
|
||||
fun(_Trace) ->
|
||||
Data0 = receive_consumed(10_000),
|
||||
Data0 = receive_consumed(30_000),
|
||||
?assertMatch(
|
||||
[
|
||||
#{
|
||||
|
@ -1123,7 +1148,9 @@ t_cluster(Config) ->
|
|||
],
|
||||
Data0
|
||||
),
|
||||
|
||||
ok
|
||||
end
|
||||
end,
|
||||
[]
|
||||
),
|
||||
ok.
|
||||
|
|
Loading…
Reference in New Issue