diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 55ed572cc..5e5f59b09 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -48,6 +48,8 @@ , pmap/2 , pmap/3 , ipv6_probe/2 + , pmap/2 + , pmap/3 ]). -export([ bin2hexstr_A_F/1 diff --git a/test/emqx_node_helpers.erl b/test/emqx_node_helpers.erl index 16d4be000..d1b7a99cf 100644 --- a/test/emqx_node_helpers.erl +++ b/test/emqx_node_helpers.erl @@ -22,7 +22,9 @@ -export([start_slave/1, start_slave/2, - stop_slave/1]). + stop_slave/1, + wait_for_synced_routes/3 + ]). start_slave(Name) -> start_slave(Name, #{}). @@ -81,3 +83,39 @@ setup_node(Node, #{} = Opts) -> , gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []]) ), ok. + +%% Routes are replicated async. +%% Call this function to wait for nodes in the cluster to have the same view +%% for a given topic. +wait_for_synced_routes(Nodes, Topic, Timeout) -> + F = fun() -> do_wait_for_synced_routes(Nodes, Topic) end, + emqx_misc:nolink_apply(F, Timeout). + +do_wait_for_synced_routes(Nodes, Topic) -> + PerNodeView0 = + lists:map( + fun(Node) -> + {rpc:call(Node, emqx_router, match_routes, [Topic]), Node} + end, Nodes), + PerNodeView = lists:keysort(1, PerNodeView0), + case check_consistent_view(PerNodeView) of + {ok, OneView} -> + io:format(user, "~p~n", [OneView]), + ok; + {error, Reason}-> + ct:pal("inconsistent_routes_view ~p", [Reason]), + timer:sleep(10), + do_wait_for_synced_routes(Nodes, Topic) + end. + +check_consistent_view(PerNodeView) -> + check_consistent_view(PerNodeView, []). + +check_consistent_view([], Acc) -> {ok, Acc}; +check_consistent_view([{View, Node} | Rest], [{View, Nodes} | Acc]) -> + check_consistent_view(Rest, [{View, add_to_list(Node, Nodes)} | Acc]); +check_consistent_view([{View, Node} | Rest], Acc) -> + check_consistent_view(Rest, [{View, Node} | Acc]). + +add_to_list(Node, Nodes) when is_list(Nodes) -> [Node | Nodes]; +add_to_list(Node, Node1) -> [Node, Node1]. diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 5e79ee983..2bf0cf048 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -407,6 +407,7 @@ t_local_fallback(_) -> Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/", Topic/binary>>, 0}), + ok = emqx_node_helpers:wait_for_synced_routes([node(), Node], Topic, timer:seconds(10)), [{share, Topic, {ok, 1}}] = emqx:publish(Message1), {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),