test(kafka_consumer): attempt to stabilize cluster tests
Example failure: https://github.com/emqx/emqx/actions/runs/5070096710/jobs/9105822319#step:7:515 The attempt here is to setup the spy as early as possible, before the bridge starts, so we avoid missing rebalancing events.
This commit is contained in:
parent
0ca3f51503
commit
cb34bc5c46
|
@ -997,36 +997,33 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) ->
|
|||
Assignments
|
||||
).
|
||||
|
||||
setup_group_subscriber_spy(Node) ->
|
||||
setup_group_subscriber_spy_fn() ->
|
||||
TestPid = self(),
|
||||
ok = erpc:call(
|
||||
Node,
|
||||
fun() ->
|
||||
ok = meck:new(brod_group_subscriber_v2, [
|
||||
passthrough, no_link, no_history, non_strict
|
||||
]),
|
||||
ok = meck:expect(
|
||||
brod_group_subscriber_v2,
|
||||
assignments_received,
|
||||
fun(Pid, MemberId, GenerationId, TopicAssignments) ->
|
||||
?tp(
|
||||
kafka_assignment,
|
||||
#{
|
||||
node => node(),
|
||||
pid => Pid,
|
||||
member_id => MemberId,
|
||||
generation_id => GenerationId,
|
||||
topic_assignments => TopicAssignments
|
||||
}
|
||||
),
|
||||
TestPid !
|
||||
{kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
|
||||
meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
|
||||
end
|
||||
),
|
||||
ok
|
||||
end
|
||||
).
|
||||
fun() ->
|
||||
ok = meck:new(brod_group_subscriber_v2, [
|
||||
passthrough, no_link, no_history, non_strict
|
||||
]),
|
||||
ok = meck:expect(
|
||||
brod_group_subscriber_v2,
|
||||
assignments_received,
|
||||
fun(Pid, MemberId, GenerationId, TopicAssignments) ->
|
||||
?tp(
|
||||
kafka_assignment,
|
||||
#{
|
||||
node => node(),
|
||||
pid => Pid,
|
||||
member_id => MemberId,
|
||||
generation_id => GenerationId,
|
||||
topic_assignments => TopicAssignments
|
||||
}
|
||||
),
|
||||
TestPid !
|
||||
{kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
|
||||
meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
|
||||
end
|
||||
),
|
||||
ok
|
||||
end.
|
||||
|
||||
wait_for_cluster_rpc(Node) ->
|
||||
%% need to wait until the config handler is ready after
|
||||
|
@ -1070,6 +1067,7 @@ cluster(Config) ->
|
|||
_ ->
|
||||
ct_slave
|
||||
end,
|
||||
ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(),
|
||||
Cluster = emqx_common_test_helpers:emqx_cluster(
|
||||
[core, core],
|
||||
[
|
||||
|
@ -1083,6 +1081,7 @@ cluster(Config) ->
|
|||
{env_handler, fun
|
||||
(emqx) ->
|
||||
application:set_env(emqx, boot_modules, [broker, router]),
|
||||
ExtraEnvHandlerHook(),
|
||||
ok;
|
||||
(emqx_conf) ->
|
||||
ok;
|
||||
|
@ -1701,7 +1700,6 @@ t_cluster_group(Config) ->
|
|||
Nodes
|
||||
)
|
||||
end),
|
||||
lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
|
||||
{ok, SRef0} = snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
|
||||
length(Nodes),
|
||||
|
@ -1778,7 +1776,6 @@ t_node_joins_existing_cluster(Config) ->
|
|||
ct:pal("stopping ~p", [N1]),
|
||||
ok = emqx_common_test_helpers:stop_slave(N1)
|
||||
end),
|
||||
setup_group_subscriber_spy(N1),
|
||||
{{ok, _}, {ok, _}} =
|
||||
?wait_async_action(
|
||||
erpc:call(N1, fun() ->
|
||||
|
@ -1822,7 +1819,6 @@ t_node_joins_existing_cluster(Config) ->
|
|||
ct:pal("stopping ~p", [N2]),
|
||||
ok = emqx_common_test_helpers:stop_slave(N2)
|
||||
end),
|
||||
setup_group_subscriber_spy(N2),
|
||||
Nodes = [N1, N2],
|
||||
wait_for_cluster_rpc(N2),
|
||||
|
||||
|
@ -1923,7 +1919,6 @@ t_cluster_node_down(Config) ->
|
|||
Nodes
|
||||
)
|
||||
end),
|
||||
lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
|
||||
{ok, SRef0} = snabbkaffe:subscribe(
|
||||
?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
|
||||
length(Nodes),
|
||||
|
|
Loading…
Reference in New Issue