diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 4f98f33cf..42729c002 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -994,36 +994,33 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) -> Assignments ). -setup_group_subscriber_spy(Node) -> +setup_group_subscriber_spy_fn() -> TestPid = self(), - ok = erpc:call( - Node, - fun() -> - ok = meck:new(brod_group_subscriber_v2, [ - passthrough, no_link, no_history, non_strict - ]), - ok = meck:expect( - brod_group_subscriber_v2, - assignments_received, - fun(Pid, MemberId, GenerationId, TopicAssignments) -> - ?tp( - kafka_assignment, - #{ - node => node(), - pid => Pid, - member_id => MemberId, - generation_id => GenerationId, - topic_assignments => TopicAssignments - } - ), - TestPid ! - {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, - meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) - end - ), - ok - end - ). + fun() -> + ok = meck:new(brod_group_subscriber_v2, [ + passthrough, no_link, no_history, non_strict + ]), + ok = meck:expect( + brod_group_subscriber_v2, + assignments_received, + fun(Pid, MemberId, GenerationId, TopicAssignments) -> + ?tp( + kafka_assignment, + #{ + node => node(), + pid => Pid, + member_id => MemberId, + generation_id => GenerationId, + topic_assignments => TopicAssignments + } + ), + TestPid ! + {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, + meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) + end + ), + ok + end. wait_for_cluster_rpc(Node) -> %% need to wait until the config handler is ready after @@ -1067,6 +1064,7 @@ cluster(Config) -> _ -> ct_slave end, + ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(), Cluster = emqx_common_test_helpers:emqx_cluster( [core, core], [ @@ -1080,6 +1078,7 @@ cluster(Config) -> {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, [broker, router]), + ExtraEnvHandlerHook(), ok; (emqx_conf) -> ok; @@ -1680,7 +1679,6 @@ t_cluster_group(Config) -> Nodes ) end), - lists:foreach(fun setup_group_subscriber_spy/1, Nodes), {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}), length(Nodes), @@ -1757,7 +1755,6 @@ t_node_joins_existing_cluster(Config) -> ct:pal("stopping ~p", [N1]), ok = emqx_common_test_helpers:stop_slave(N1) end), - setup_group_subscriber_spy(N1), {{ok, _}, {ok, _}} = ?wait_async_action( erpc:call(N1, fun() -> @@ -1801,7 +1798,6 @@ t_node_joins_existing_cluster(Config) -> ct:pal("stopping ~p", [N2]), ok = emqx_common_test_helpers:stop_slave(N2) end), - setup_group_subscriber_spy(N2), Nodes = [N1, N2], wait_for_cluster_rpc(N2), @@ -1902,7 +1898,6 @@ t_cluster_node_down(Config) -> Nodes ) end), - lists:foreach(fun setup_group_subscriber_spy/1, Nodes), {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}), length(Nodes),