Merge pull request #10811 from thalesmg/fix-flaky-kconsumer-balance-test-r50

test(kafka_consumer): attempt to stabilize cluster tests
This commit is contained in:
Thales Macedo Garitezi 2023-05-24 15:56:28 -03:00 committed by GitHub
commit cc8cebe950
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 28 additions and 33 deletions

View File

@ -994,36 +994,33 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) ->
Assignments Assignments
). ).
setup_group_subscriber_spy(Node) -> setup_group_subscriber_spy_fn() ->
TestPid = self(), TestPid = self(),
ok = erpc:call( fun() ->
Node, ok = meck:new(brod_group_subscriber_v2, [
fun() -> passthrough, no_link, no_history, non_strict
ok = meck:new(brod_group_subscriber_v2, [ ]),
passthrough, no_link, no_history, non_strict ok = meck:expect(
]), brod_group_subscriber_v2,
ok = meck:expect( assignments_received,
brod_group_subscriber_v2, fun(Pid, MemberId, GenerationId, TopicAssignments) ->
assignments_received, ?tp(
fun(Pid, MemberId, GenerationId, TopicAssignments) -> kafka_assignment,
?tp( #{
kafka_assignment, node => node(),
#{ pid => Pid,
node => node(), member_id => MemberId,
pid => Pid, generation_id => GenerationId,
member_id => MemberId, topic_assignments => TopicAssignments
generation_id => GenerationId, }
topic_assignments => TopicAssignments ),
} TestPid !
), {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
TestPid ! meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
{kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, end
meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) ),
end ok
), end.
ok
end
).
wait_for_cluster_rpc(Node) -> wait_for_cluster_rpc(Node) ->
%% need to wait until the config handler is ready after %% need to wait until the config handler is ready after
@ -1067,6 +1064,7 @@ cluster(Config) ->
_ -> _ ->
ct_slave ct_slave
end, end,
ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(),
Cluster = emqx_common_test_helpers:emqx_cluster( Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core], [core, core],
[ [
@ -1080,6 +1078,7 @@ cluster(Config) ->
{env_handler, fun {env_handler, fun
(emqx) -> (emqx) ->
application:set_env(emqx, boot_modules, [broker, router]), application:set_env(emqx, boot_modules, [broker, router]),
ExtraEnvHandlerHook(),
ok; ok;
(emqx_conf) -> (emqx_conf) ->
ok; ok;
@ -1680,7 +1679,6 @@ t_cluster_group(Config) ->
Nodes Nodes
) )
end), end),
lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
{ok, SRef0} = snabbkaffe:subscribe( {ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := kafka_consumer_subscriber_started}), ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
length(Nodes), length(Nodes),
@ -1757,7 +1755,6 @@ t_node_joins_existing_cluster(Config) ->
ct:pal("stopping ~p", [N1]), ct:pal("stopping ~p", [N1]),
ok = emqx_common_test_helpers:stop_slave(N1) ok = emqx_common_test_helpers:stop_slave(N1)
end), end),
setup_group_subscriber_spy(N1),
{{ok, _}, {ok, _}} = {{ok, _}, {ok, _}} =
?wait_async_action( ?wait_async_action(
erpc:call(N1, fun() -> erpc:call(N1, fun() ->
@ -1801,7 +1798,6 @@ t_node_joins_existing_cluster(Config) ->
ct:pal("stopping ~p", [N2]), ct:pal("stopping ~p", [N2]),
ok = emqx_common_test_helpers:stop_slave(N2) ok = emqx_common_test_helpers:stop_slave(N2)
end), end),
setup_group_subscriber_spy(N2),
Nodes = [N1, N2], Nodes = [N1, N2],
wait_for_cluster_rpc(N2), wait_for_cluster_rpc(N2),
@ -1902,7 +1898,6 @@ t_cluster_node_down(Config) ->
Nodes Nodes
) )
end), end),
lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
{ok, SRef0} = snabbkaffe:subscribe( {ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := kafka_consumer_subscriber_started}), ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
length(Nodes), length(Nodes),