Merge pull request #10261 from thalesmg/fix-kconsumer-flaky-test-rv50
test: fix flaky kafka consumer test
This commit is contained in:
commit
575e7cad01
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_bridge, [
|
{application, emqx_bridge, [
|
||||||
{description, "EMQX bridges"},
|
{description, "EMQX bridges"},
|
||||||
{vsn, "0.1.13"},
|
{vsn, "0.1.14"},
|
||||||
{registered, [emqx_bridge_sup]},
|
{registered, [emqx_bridge_sup]},
|
||||||
{mod, {emqx_bridge_app, []}},
|
{mod, {emqx_bridge_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
{application, emqx_rule_engine, [
|
{application, emqx_rule_engine, [
|
||||||
{description, "EMQX Rule Engine"},
|
{description, "EMQX Rule Engine"},
|
||||||
% strict semver, bump manually!
|
% strict semver, bump manually!
|
||||||
{vsn, "5.0.11"},
|
{vsn, "5.0.12"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
|
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
|
||||||
{applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]},
|
{applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]},
|
||||||
|
|
|
@ -1744,7 +1744,18 @@ t_node_joins_existing_cluster(Config) ->
|
||||||
setup_group_subscriber_spy(N1),
|
setup_group_subscriber_spy(N1),
|
||||||
{{ok, _}, {ok, _}} =
|
{{ok, _}, {ok, _}} =
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
erpc:call(N1, fun() -> {ok, _} = create_bridge(Config) end),
|
erpc:call(N1, fun() ->
|
||||||
|
{ok, _} = create_bridge(
|
||||||
|
Config,
|
||||||
|
#{
|
||||||
|
<<"kafka">> =>
|
||||||
|
#{
|
||||||
|
<<"offset_reset_policy">> =>
|
||||||
|
<<"earliest">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
end),
|
||||||
#{?snk_kind := kafka_consumer_subscriber_started},
|
#{?snk_kind := kafka_consumer_subscriber_started},
|
||||||
15_000
|
15_000
|
||||||
),
|
),
|
||||||
|
@ -1775,14 +1786,19 @@ t_node_joins_existing_cluster(Config) ->
|
||||||
wait_for_cluster_rpc(N2),
|
wait_for_cluster_rpc(N2),
|
||||||
|
|
||||||
{ok, _} = snabbkaffe:receive_events(SRef0),
|
{ok, _} = snabbkaffe:receive_events(SRef0),
|
||||||
?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId])),
|
?retry(
|
||||||
|
_Sleep1 = 100,
|
||||||
|
_Attempts1 = 50,
|
||||||
|
?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId]))
|
||||||
|
),
|
||||||
|
|
||||||
%% Give some time for the consumers in both nodes to
|
%% Give some time for the consumers in both nodes to
|
||||||
%% rebalance.
|
%% rebalance.
|
||||||
{ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000),
|
{ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000),
|
||||||
%% Publish some messages so we can check they came from each node.
|
%% Publish some messages so we can check they came from each node.
|
||||||
?retry(
|
?retry(
|
||||||
_Sleep1 = 100,
|
_Sleep2 = 100,
|
||||||
_Attempts1 = 50,
|
_Attempts2 = 50,
|
||||||
true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic])
|
true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic])
|
||||||
),
|
),
|
||||||
{ok, SRef1} =
|
{ok, SRef1} =
|
||||||
|
@ -1792,7 +1808,7 @@ t_node_joins_existing_cluster(Config) ->
|
||||||
?snk_span := {complete, _}
|
?snk_span := {complete, _}
|
||||||
}),
|
}),
|
||||||
NPartitions,
|
NPartitions,
|
||||||
10_000
|
20_000
|
||||||
),
|
),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(N) ->
|
fun(N) ->
|
||||||
|
|
Loading…
Reference in New Issue