Merge pull request #9084 from emqx/0930-fix-flaky-tests
test: fix flaky shared sub tests
This commit is contained in:
commit
c63b8e79d3
|
@ -46,6 +46,8 @@
|
||||||
, maybe_parse_ip/1
|
, maybe_parse_ip/1
|
||||||
, ipv6_probe/1
|
, ipv6_probe/1
|
||||||
, ipv6_probe/2
|
, ipv6_probe/2
|
||||||
|
, pmap/2
|
||||||
|
, pmap/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ bin2hexstr_A_F/1
|
-export([ bin2hexstr_A_F/1
|
||||||
|
@ -56,7 +58,13 @@
|
||||||
-export([ is_sane_id/1
|
-export([ is_sane_id/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
nolink_apply/1,
|
||||||
|
nolink_apply/2
|
||||||
|
]).
|
||||||
|
|
||||||
-define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$").
|
-define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$").
|
||||||
|
-define(DEFAULT_PMAP_TIMEOUT, 5000).
|
||||||
|
|
||||||
-spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}.
|
-spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}.
|
||||||
is_sane_id(Str) ->
|
is_sane_id(Str) ->
|
||||||
|
@ -332,6 +340,110 @@ hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0;
|
||||||
hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10;
|
hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10;
|
||||||
hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10.
|
hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10.
|
||||||
|
|
||||||
|
%% @doc Like lists:map/2, only the callback function is evaluated
|
||||||
|
%% concurrently.
|
||||||
|
-spec pmap(fun((A) -> B), list(A)) -> list(B).
|
||||||
|
pmap(Fun, List) when is_function(Fun, 1), is_list(List) ->
|
||||||
|
pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT).
|
||||||
|
|
||||||
|
-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B).
|
||||||
|
pmap(Fun, List, Timeout) when
|
||||||
|
is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0
|
||||||
|
->
|
||||||
|
nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout).
|
||||||
|
|
||||||
|
%% @doc Delegate a function to a worker process.
|
||||||
|
%% The function may spawn_link other processes but we do not
|
||||||
|
%% want the caller process to be linked.
|
||||||
|
%% This is done by isolating the possible link with a not-linked
|
||||||
|
%% middleman process.
|
||||||
|
nolink_apply(Fun) -> nolink_apply(Fun, infinity).
|
||||||
|
|
||||||
|
%% @doc Same as `nolink_apply/1', with a timeout.
|
||||||
|
-spec nolink_apply(function(), timer:timeout()) -> term().
|
||||||
|
nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
|
||||||
|
Caller = self(),
|
||||||
|
ResRef = make_ref(),
|
||||||
|
Middleman = erlang:spawn(make_middleman_fn(Caller, Fun, ResRef)),
|
||||||
|
receive
|
||||||
|
{ResRef, {normal, Result}} ->
|
||||||
|
Result;
|
||||||
|
{ResRef, {exception, {C, E, S}}} ->
|
||||||
|
erlang:raise(C, E, S);
|
||||||
|
{ResRef, {'EXIT', Reason}} ->
|
||||||
|
exit(Reason)
|
||||||
|
after Timeout ->
|
||||||
|
exit(Middleman, kill),
|
||||||
|
exit(timeout)
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec make_middleman_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()).
|
||||||
|
make_middleman_fn(Caller, Fun, ResRef) ->
|
||||||
|
fun() ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
CallerMRef = erlang:monitor(process, Caller),
|
||||||
|
Worker = erlang:spawn_link(make_worker_fn(Caller, Fun, ResRef)),
|
||||||
|
receive
|
||||||
|
{'DOWN', CallerMRef, process, _, _} ->
|
||||||
|
%% For whatever reason, if the caller is dead,
|
||||||
|
%% there is no reason to continue
|
||||||
|
exit(Worker, kill),
|
||||||
|
exit(normal);
|
||||||
|
{'EXIT', Worker, normal} ->
|
||||||
|
exit(normal);
|
||||||
|
{'EXIT', Worker, Reason} ->
|
||||||
|
%% worker exited with some reason other than 'normal'
|
||||||
|
_ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}),
|
||||||
|
exit(normal)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec make_worker_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()).
|
||||||
|
make_worker_fn(Caller, Fun, ResRef) ->
|
||||||
|
fun() ->
|
||||||
|
Res =
|
||||||
|
try
|
||||||
|
{normal, Fun()}
|
||||||
|
catch
|
||||||
|
C:E:S ->
|
||||||
|
{exception, {C, E, S}}
|
||||||
|
end,
|
||||||
|
_ = erlang:send(Caller, {ResRef, Res}),
|
||||||
|
exit(normal)
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_parallel_map(Fun, List) ->
|
||||||
|
Parent = self(),
|
||||||
|
PidList = lists:map(
|
||||||
|
fun(Item) ->
|
||||||
|
erlang:spawn_link(
|
||||||
|
fun() ->
|
||||||
|
Res =
|
||||||
|
try
|
||||||
|
{normal, Fun(Item)}
|
||||||
|
catch
|
||||||
|
C:E:St ->
|
||||||
|
{exception, {C, E, St}}
|
||||||
|
end,
|
||||||
|
Parent ! {self(), Res}
|
||||||
|
end
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
List
|
||||||
|
),
|
||||||
|
lists:foldr(
|
||||||
|
fun(Pid, Acc) ->
|
||||||
|
receive
|
||||||
|
{Pid, {normal, Result}} ->
|
||||||
|
[Result | Acc];
|
||||||
|
{Pid, {exception, {C, E, St}}} ->
|
||||||
|
erlang:raise(C, E, St)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
[],
|
||||||
|
PidList
|
||||||
|
).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
|
|
@ -146,3 +146,36 @@ t_now_to_secs(_) ->
|
||||||
t_now_to_ms(_) ->
|
t_now_to_ms(_) ->
|
||||||
?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))).
|
?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))).
|
||||||
|
|
||||||
|
t_pmap_normal(_) ->
|
||||||
|
?assertEqual(
|
||||||
|
[5, 7, 9],
|
||||||
|
emqx_misc:pmap(
|
||||||
|
fun({A, B}) -> A + B end,
|
||||||
|
[{2, 3}, {3, 4}, {4, 5}]
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_pmap_timeout(_) ->
|
||||||
|
?assertExit(
|
||||||
|
timeout,
|
||||||
|
emqx_misc:pmap(
|
||||||
|
fun
|
||||||
|
(timeout) -> ct:sleep(1000);
|
||||||
|
({A, B}) -> A + B
|
||||||
|
end,
|
||||||
|
[{2, 3}, {3, 4}, timeout],
|
||||||
|
100
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
||||||
|
t_pmap_exception(_) ->
|
||||||
|
?assertError(
|
||||||
|
foobar,
|
||||||
|
emqx_misc:pmap(
|
||||||
|
fun
|
||||||
|
(error) -> error(foobar);
|
||||||
|
({A, B}) -> A + B
|
||||||
|
end,
|
||||||
|
[{2, 3}, {3, 4}, error]
|
||||||
|
)
|
||||||
|
).
|
||||||
|
|
|
@ -22,7 +22,9 @@
|
||||||
|
|
||||||
-export([start_slave/1,
|
-export([start_slave/1,
|
||||||
start_slave/2,
|
start_slave/2,
|
||||||
stop_slave/1]).
|
stop_slave/1,
|
||||||
|
wait_for_synced_routes/3
|
||||||
|
]).
|
||||||
|
|
||||||
start_slave(Name) ->
|
start_slave(Name) ->
|
||||||
start_slave(Name, #{}).
|
start_slave(Name, #{}).
|
||||||
|
@ -81,3 +83,39 @@ setup_node(Node, #{} = Opts) ->
|
||||||
, gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []])
|
, gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []])
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Routes are replicated async.
|
||||||
|
%% Call this function to wait for nodes in the cluster to have the same view
|
||||||
|
%% for a given topic.
|
||||||
|
wait_for_synced_routes(Nodes, Topic, Timeout) ->
|
||||||
|
F = fun() -> do_wait_for_synced_routes(Nodes, Topic) end,
|
||||||
|
emqx_misc:nolink_apply(F, Timeout).
|
||||||
|
|
||||||
|
do_wait_for_synced_routes(Nodes, Topic) ->
|
||||||
|
PerNodeView0 =
|
||||||
|
lists:map(
|
||||||
|
fun(Node) ->
|
||||||
|
{rpc:call(Node, emqx_router, match_routes, [Topic]), Node}
|
||||||
|
end, Nodes),
|
||||||
|
PerNodeView = lists:keysort(1, PerNodeView0),
|
||||||
|
case check_consistent_view(PerNodeView) of
|
||||||
|
{ok, OneView} ->
|
||||||
|
io:format(user, "~p~n", [OneView]),
|
||||||
|
ok;
|
||||||
|
{error, Reason}->
|
||||||
|
ct:pal("inconsistent_routes_view ~p", [Reason]),
|
||||||
|
timer:sleep(10),
|
||||||
|
do_wait_for_synced_routes(Nodes, Topic)
|
||||||
|
end.
|
||||||
|
|
||||||
|
check_consistent_view(PerNodeView) ->
|
||||||
|
check_consistent_view(PerNodeView, []).
|
||||||
|
|
||||||
|
check_consistent_view([], Acc) -> {ok, Acc};
|
||||||
|
check_consistent_view([{View, Node} | Rest], [{View, Nodes} | Acc]) ->
|
||||||
|
check_consistent_view(Rest, [{View, add_to_list(Node, Nodes)} | Acc]);
|
||||||
|
check_consistent_view([{View, Node} | Rest], Acc) ->
|
||||||
|
check_consistent_view(Rest, [{View, Node} | Acc]).
|
||||||
|
|
||||||
|
add_to_list(Node, Nodes) when is_list(Nodes) -> [Node | Nodes];
|
||||||
|
add_to_list(Node, Node1) -> [Node, Node1].
|
||||||
|
|
|
@ -407,6 +407,7 @@ t_local_fallback(_) ->
|
||||||
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
|
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
|
||||||
|
|
||||||
emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/", Topic/binary>>, 0}),
|
emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/", Topic/binary>>, 0}),
|
||||||
|
ok = emqx_node_helpers:wait_for_synced_routes([node(), Node], Topic, timer:seconds(10)),
|
||||||
|
|
||||||
[{share, Topic, {ok, 1}}] = emqx:publish(Message1),
|
[{share, Topic, {ok, 1}}] = emqx:publish(Message1),
|
||||||
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
|
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
|
||||||
|
|
Loading…
Reference in New Issue