From ebf131266a74a982d3d272e2fdfec254bc77c7c7 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 20 Sep 2022 11:37:28 -0300 Subject: [PATCH] test: fix flaky shared sub test case the route replication is async, added a function to wait for it --- src/emqx_misc.erl | 112 +++++++++++++++++++++++++++++++++ test/emqx_misc_SUITE.erl | 33 ++++++++++ test/emqx_node_helpers.erl | 40 +++++++++++- test/emqx_shared_sub_SUITE.erl | 1 + 4 files changed, 185 insertions(+), 1 deletion(-) diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index bfda5bb38..d256569fb 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -46,6 +46,8 @@ , maybe_parse_ip/1 , ipv6_probe/1 , ipv6_probe/2 + , pmap/2 + , pmap/3 ]). -export([ bin2hexstr_A_F/1 @@ -56,7 +58,13 @@ -export([ is_sane_id/1 ]). +-export([ + nolink_apply/1, + nolink_apply/2 +]). + -define(VALID_STR_RE, "^[A-Za-z0-9]+[A-Za-z0-9-_]*$"). +-define(DEFAULT_PMAP_TIMEOUT, 5000). -spec is_sane_id(list() | binary()) -> ok | {error, Reason::binary()}. is_sane_id(Str) -> @@ -332,6 +340,110 @@ hexchar2int(I) when I >= $0 andalso I =< $9 -> I - $0; hexchar2int(I) when I >= $A andalso I =< $F -> I - $A + 10; hexchar2int(I) when I >= $a andalso I =< $f -> I - $a + 10. +%% @doc Like lists:map/2, only the callback function is evaluated +%% concurrently. +-spec pmap(fun((A) -> B), list(A)) -> list(B). +pmap(Fun, List) when is_function(Fun, 1), is_list(List) -> + pmap(Fun, List, ?DEFAULT_PMAP_TIMEOUT). + +-spec pmap(fun((A) -> B), list(A), timeout()) -> list(B). +pmap(Fun, List, Timeout) when + is_function(Fun, 1), is_list(List), is_integer(Timeout), Timeout >= 0 +-> + nolink_apply(fun() -> do_parallel_map(Fun, List) end, Timeout). + +%% @doc Delegate a function to a worker process. +%% The function may spawn_link other processes but we do not +%% want the caller process to be linked. +%% This is done by isolating the possible link with a not-linked +%% middleman process. +nolink_apply(Fun) -> nolink_apply(Fun, infinity). + +%% @doc Same as `nolink_apply/1', with a timeout. +-spec nolink_apply(function(), timer:timeout()) -> term(). +nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> + Caller = self(), + ResRef = make_ref(), + Middleman = erlang:spawn(make_middleman_fn(Caller, Fun, ResRef)), + receive + {ResRef, {normal, Result}} -> + Result; + {ResRef, {exception, {C, E, S}}} -> + erlang:raise(C, E, S); + {ResRef, {'EXIT', Reason}} -> + exit(Reason) + after Timeout -> + exit(Middleman, kill), + exit(timeout) + end. + +-spec make_middleman_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()). +make_middleman_fn(Caller, Fun, ResRef) -> + fun() -> + process_flag(trap_exit, true), + CallerMRef = erlang:monitor(process, Caller), + Worker = erlang:spawn_link(make_worker_fn(Caller, Fun, ResRef)), + receive + {'DOWN', CallerMRef, process, _, _} -> + %% For whatever reason, if the caller is dead, + %% there is no reason to continue + exit(Worker, kill), + exit(normal); + {'EXIT', Worker, normal} -> + exit(normal); + {'EXIT', Worker, Reason} -> + %% worker exited with some reason other than 'normal' + _ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}), + exit(normal) + end + end. + +-spec make_worker_fn(pid(), fun(() -> any()), reference()) -> fun(() -> no_return()). +make_worker_fn(Caller, Fun, ResRef) -> + fun() -> + Res = + try + {normal, Fun()} + catch + C:E:S -> + {exception, {C, E, S}} + end, + _ = erlang:send(Caller, {ResRef, Res}), + exit(normal) + end. + +do_parallel_map(Fun, List) -> + Parent = self(), + PidList = lists:map( + fun(Item) -> + erlang:spawn_link( + fun() -> + Res = + try + {normal, Fun(Item)} + catch + C:E:St -> + {exception, {C, E, St}} + end, + Parent ! {self(), Res} + end + ) + end, + List + ), + lists:foldr( + fun(Pid, Acc) -> + receive + {Pid, {normal, Result}} -> + [Result | Acc]; + {Pid, {exception, {C, E, St}}} -> + erlang:raise(C, E, St) + end + end, + [], + PidList + ). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/test/emqx_misc_SUITE.erl b/test/emqx_misc_SUITE.erl index 0eec55faa..e9dd3e132 100644 --- a/test/emqx_misc_SUITE.erl +++ b/test/emqx_misc_SUITE.erl @@ -146,3 +146,36 @@ t_now_to_secs(_) -> t_now_to_ms(_) -> ?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))). +t_pmap_normal(_) -> + ?assertEqual( + [5, 7, 9], + emqx_misc:pmap( + fun({A, B}) -> A + B end, + [{2, 3}, {3, 4}, {4, 5}] + ) + ). + +t_pmap_timeout(_) -> + ?assertExit( + timeout, + emqx_misc:pmap( + fun + (timeout) -> ct:sleep(1000); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, timeout], + 100 + ) + ). + +t_pmap_exception(_) -> + ?assertError( + foobar, + emqx_misc:pmap( + fun + (error) -> error(foobar); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, error] + ) + ). diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index 16d4be000..d1b7a99cf 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -22,7 +22,9 @@ -export([start_slave/1, start_slave/2, - stop_slave/1]). + stop_slave/1, + wait_for_synced_routes/3 + ]). start_slave(Name) -> start_slave(Name, #{}). @@ -81,3 +83,39 @@ setup_node(Node, #{} = Opts) -> , gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []]) ), ok. + +%% Routes are replicated async. +%% Call this function to wait for nodes in the cluster to have the same view +%% for a given topic. +wait_for_synced_routes(Nodes, Topic, Timeout) -> + F = fun() -> do_wait_for_synced_routes(Nodes, Topic) end, + emqx_misc:nolink_apply(F, Timeout). + +do_wait_for_synced_routes(Nodes, Topic) -> + PerNodeView0 = + lists:map( + fun(Node) -> + {rpc:call(Node, emqx_router, match_routes, [Topic]), Node} + end, Nodes), + PerNodeView = lists:keysort(1, PerNodeView0), + case check_consistent_view(PerNodeView) of + {ok, OneView} -> + io:format(user, "~p~n", [OneView]), + ok; + {error, Reason}-> + ct:pal("inconsistent_routes_view ~p", [Reason]), + timer:sleep(10), + do_wait_for_synced_routes(Nodes, Topic) + end. + +check_consistent_view(PerNodeView) -> + check_consistent_view(PerNodeView, []). + +check_consistent_view([], Acc) -> {ok, Acc}; +check_consistent_view([{View, Node} | Rest], [{View, Nodes} | Acc]) -> + check_consistent_view(Rest, [{View, add_to_list(Node, Nodes)} | Acc]); +check_consistent_view([{View, Node} | Rest], Acc) -> + check_consistent_view(Rest, [{View, Node} | Acc]). + +add_to_list(Node, Nodes) when is_list(Nodes) -> [Node | Nodes]; +add_to_list(Node, Node1) -> [Node, Node1]. diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 5e79ee983..2bf0cf048 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -407,6 +407,7 @@ t_local_fallback(_) -> Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/", Topic/binary>>, 0}), + ok = emqx_node_helpers:wait_for_synced_routes([node(), Node], Topic, timer:seconds(10)), [{share, Topic, {ok, 1}}] = emqx:publish(Message1), {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),