From 4dd054b0a22889bb1e7c1e5928afab4d12d4bcca Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 3 Nov 2023 11:48:50 +0100 Subject: [PATCH] test(emqx_bridge_kafka_impl_consumer_SUITE): fix flaky --- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 71 ++++++++++++------- 1 file changed, 44 insertions(+), 27 deletions(-) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 48ff89dd5..af64ddf37 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -698,6 +698,20 @@ create_bridge(Config, Overrides) -> KafkaConfig = emqx_utils_maps:deep_merge(KafkaConfig0, Overrides), emqx_bridge:create(Type, Name, KafkaConfig). +create_bridge_wait_for_balance(Config) -> + setup_group_subscriber_spy(self()), + try + Res = create_bridge(Config), + receive + {kafka_assignment, _, _} -> + Res + after 20_000 -> + ct:fail("timed out waiting for kafka assignment") + end + after + kill_group_subscriber_spy() + end. + delete_bridge(Config) -> Type = ?BRIDGE_TYPE_BIN, Name = ?config(kafka_name, Config), @@ -1020,31 +1034,37 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) -> setup_group_subscriber_spy_fn() -> TestPid = self(), fun() -> - ok = meck:new(brod_group_subscriber_v2, [ - passthrough, no_link, no_history, non_strict - ]), - ok = meck:expect( - brod_group_subscriber_v2, - assignments_received, - fun(Pid, MemberId, GenerationId, TopicAssignments) -> - ?tp( - kafka_assignment, - #{ - node => node(), - pid => Pid, - member_id => MemberId, - generation_id => GenerationId, - topic_assignments => TopicAssignments - } - ), - TestPid ! - {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, - meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) - end - ), - ok + setup_group_subscriber_spy(TestPid) end. +setup_group_subscriber_spy(TestPid) -> + ok = meck:new(brod_group_subscriber_v2, [ + passthrough, no_link, no_history, non_strict + ]), + ok = meck:expect( + brod_group_subscriber_v2, + assignments_received, + fun(Pid, MemberId, GenerationId, TopicAssignments) -> + ?tp( + kafka_assignment, + #{ + node => node(), + pid => Pid, + member_id => MemberId, + generation_id => GenerationId, + topic_assignments => TopicAssignments + } + ), + TestPid ! + {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, + meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) + end + ), + ok. + +kill_group_subscriber_spy() -> + meck:unload(brod_group_subscriber_v2). + wait_for_cluster_rpc(Node) -> %% need to wait until the config handler is ready after %% restarting during the cluster join. @@ -1702,10 +1722,7 @@ t_dynamic_mqtt_topic(Config) -> MQTTTopic = emqx_topic:join([KafkaTopic, '#']), ?check_trace( begin - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), + ?assertMatch({ok, _}, create_bridge_wait_for_balance(Config)), wait_until_subscribers_are_ready(NPartitions, 40_000), {ok, C} = emqtt:start_link(), on_exit(fun() -> emqtt:stop(C) end),