test(kafka_consumer): attempt to stabilize cluster tests

Example failure:
https://github.com/emqx/emqx/actions/runs/5070096710/jobs/9105822319#step:7:515

The attempt here is to setup the spy as early as possible, before the
bridge starts, so we avoid missing rebalancing events.
This commit is contained in:
Thales Macedo Garitezi 2023-05-24 13:13:54 -03:00
parent ad93af2853
commit 7374e00a02
1 changed files with 28 additions and 33 deletions

View File

@ -994,36 +994,33 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) ->
Assignments
).
setup_group_subscriber_spy(Node) ->
setup_group_subscriber_spy_fn() ->
TestPid = self(),
ok = erpc:call(
Node,
fun() ->
ok = meck:new(brod_group_subscriber_v2, [
passthrough, no_link, no_history, non_strict
]),
ok = meck:expect(
brod_group_subscriber_v2,
assignments_received,
fun(Pid, MemberId, GenerationId, TopicAssignments) ->
?tp(
kafka_assignment,
#{
node => node(),
pid => Pid,
member_id => MemberId,
generation_id => GenerationId,
topic_assignments => TopicAssignments
}
),
TestPid !
{kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
end
),
ok
end
).
fun() ->
ok = meck:new(brod_group_subscriber_v2, [
passthrough, no_link, no_history, non_strict
]),
ok = meck:expect(
brod_group_subscriber_v2,
assignments_received,
fun(Pid, MemberId, GenerationId, TopicAssignments) ->
?tp(
kafka_assignment,
#{
node => node(),
pid => Pid,
member_id => MemberId,
generation_id => GenerationId,
topic_assignments => TopicAssignments
}
),
TestPid !
{kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
end
),
ok
end.
wait_for_cluster_rpc(Node) ->
%% need to wait until the config handler is ready after
@ -1067,6 +1064,7 @@ cluster(Config) ->
_ ->
ct_slave
end,
ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(),
Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core],
[
@ -1080,6 +1078,7 @@ cluster(Config) ->
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, [broker, router]),
ExtraEnvHandlerHook(),
ok;
(emqx_conf) ->
ok;
@ -1680,7 +1679,6 @@ t_cluster_group(Config) ->
Nodes
)
end),
lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
length(Nodes),
@ -1757,7 +1755,6 @@ t_node_joins_existing_cluster(Config) ->
ct:pal("stopping ~p", [N1]),
ok = emqx_common_test_helpers:stop_slave(N1)
end),
setup_group_subscriber_spy(N1),
{{ok, _}, {ok, _}} =
?wait_async_action(
erpc:call(N1, fun() ->
@ -1801,7 +1798,6 @@ t_node_joins_existing_cluster(Config) ->
ct:pal("stopping ~p", [N2]),
ok = emqx_common_test_helpers:stop_slave(N2)
end),
setup_group_subscriber_spy(N2),
Nodes = [N1, N2],
wait_for_cluster_rpc(N2),
@ -1902,7 +1898,6 @@ t_cluster_node_down(Config) ->
Nodes
)
end),
lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
length(Nodes),