Merge remote-tracking branch 'origin/release-v43' into main-v4.3

This commit is contained in:
Zaiming (Stone) Shi 2022-10-02 11:25:16 +02:00
commit 366999ccbb
3 changed files with 42 additions and 1 deletions

View File

@ -48,6 +48,8 @@
, pmap/2
, pmap/3
, ipv6_probe/2
, pmap/2
, pmap/3
]).
-export([ bin2hexstr_A_F/1

View File

@ -22,7 +22,9 @@
-export([start_slave/1,
start_slave/2,
stop_slave/1]).
stop_slave/1,
wait_for_synced_routes/3
]).
start_slave(Name) ->
start_slave(Name, #{}).
@ -81,3 +83,39 @@ setup_node(Node, #{} = Opts) ->
, gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []])
),
ok.
%% Routes are replicated async.
%% Call this function to wait for nodes in the cluster to have the same view
%% for a given topic.
wait_for_synced_routes(Nodes, Topic, Timeout) ->
F = fun() -> do_wait_for_synced_routes(Nodes, Topic) end,
emqx_misc:nolink_apply(F, Timeout).
do_wait_for_synced_routes(Nodes, Topic) ->
PerNodeView0 =
lists:map(
fun(Node) ->
{rpc:call(Node, emqx_router, match_routes, [Topic]), Node}
end, Nodes),
PerNodeView = lists:keysort(1, PerNodeView0),
case check_consistent_view(PerNodeView) of
{ok, OneView} ->
io:format(user, "~p~n", [OneView]),
ok;
{error, Reason}->
ct:pal("inconsistent_routes_view ~p", [Reason]),
timer:sleep(10),
do_wait_for_synced_routes(Nodes, Topic)
end.
check_consistent_view(PerNodeView) ->
check_consistent_view(PerNodeView, []).
check_consistent_view([], Acc) -> {ok, Acc};
check_consistent_view([{View, Node} | Rest], [{View, Nodes} | Acc]) ->
check_consistent_view(Rest, [{View, add_to_list(Node, Nodes)} | Acc]);
check_consistent_view([{View, Node} | Rest], Acc) ->
check_consistent_view(Rest, [{View, Node} | Acc]).
add_to_list(Node, Nodes) when is_list(Nodes) -> [Node | Nodes];
add_to_list(Node, Node1) -> [Node, Node1].

View File

@ -407,6 +407,7 @@ t_local_fallback(_) ->
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/", Topic/binary>>, 0}),
ok = emqx_node_helpers:wait_for_synced_routes([node(), Node], Topic, timer:seconds(10)),
[{share, Topic, {ok, 1}}] = emqx:publish(Message1),
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),