From cb34bc5c46ddf5a369a77ecbf50089494eb1ea35 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 13:13:54 -0300 Subject: [PATCH] test(kafka_consumer): attempt to stabilize cluster tests Example failure: https://github.com/emqx/emqx/actions/runs/5070096710/jobs/9105822319#step:7:515 The attempt here is to setup the spy as early as possible, before the bridge starts, so we avoid missing rebalancing events. --- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 61 +++++++++---------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 194ca95d6..8b61b2ee1 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -997,36 +997,33 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) -> Assignments ). -setup_group_subscriber_spy(Node) -> +setup_group_subscriber_spy_fn() -> TestPid = self(), - ok = erpc:call( - Node, - fun() -> - ok = meck:new(brod_group_subscriber_v2, [ - passthrough, no_link, no_history, non_strict - ]), - ok = meck:expect( - brod_group_subscriber_v2, - assignments_received, - fun(Pid, MemberId, GenerationId, TopicAssignments) -> - ?tp( - kafka_assignment, - #{ - node => node(), - pid => Pid, - member_id => MemberId, - generation_id => GenerationId, - topic_assignments => TopicAssignments - } - ), - TestPid ! - {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, - meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) - end - ), - ok - end - ). + fun() -> + ok = meck:new(brod_group_subscriber_v2, [ + passthrough, no_link, no_history, non_strict + ]), + ok = meck:expect( + brod_group_subscriber_v2, + assignments_received, + fun(Pid, MemberId, GenerationId, TopicAssignments) -> + ?tp( + kafka_assignment, + #{ + node => node(), + pid => Pid, + member_id => MemberId, + generation_id => GenerationId, + topic_assignments => TopicAssignments + } + ), + TestPid ! + {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, + meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) + end + ), + ok + end. wait_for_cluster_rpc(Node) -> %% need to wait until the config handler is ready after @@ -1070,6 +1067,7 @@ cluster(Config) -> _ -> ct_slave end, + ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(), Cluster = emqx_common_test_helpers:emqx_cluster( [core, core], [ @@ -1083,6 +1081,7 @@ cluster(Config) -> {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, [broker, router]), + ExtraEnvHandlerHook(), ok; (emqx_conf) -> ok; @@ -1701,7 +1700,6 @@ t_cluster_group(Config) -> Nodes ) end), - lists:foreach(fun setup_group_subscriber_spy/1, Nodes), {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}), length(Nodes), @@ -1778,7 +1776,6 @@ t_node_joins_existing_cluster(Config) -> ct:pal("stopping ~p", [N1]), ok = emqx_common_test_helpers:stop_slave(N1) end), - setup_group_subscriber_spy(N1), {{ok, _}, {ok, _}} = ?wait_async_action( erpc:call(N1, fun() -> @@ -1822,7 +1819,6 @@ t_node_joins_existing_cluster(Config) -> ct:pal("stopping ~p", [N2]), ok = emqx_common_test_helpers:stop_slave(N2) end), - setup_group_subscriber_spy(N2), Nodes = [N1, N2], wait_for_cluster_rpc(N2), @@ -1923,7 +1919,6 @@ t_cluster_node_down(Config) -> Nodes ) end), - lists:foreach(fun setup_group_subscriber_spy/1, Nodes), {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}), length(Nodes),