diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 99a49f8fd..f5bcb23e2 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.13"}, + {vsn, "0.1.14"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 1681297ec..8d50f60e3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.11"}, + {vsn, "5.0.12"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]}, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index 1211fd5e9..01f62e04c 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -1744,7 +1744,18 @@ t_node_joins_existing_cluster(Config) -> setup_group_subscriber_spy(N1), {{ok, _}, {ok, _}} = ?wait_async_action( - erpc:call(N1, fun() -> {ok, _} = create_bridge(Config) end), + erpc:call(N1, fun() -> + {ok, _} = create_bridge( + Config, + #{ + <<"kafka">> => + #{ + <<"offset_reset_policy">> => + <<"earliest">> + } + } + ) + end), #{?snk_kind := kafka_consumer_subscriber_started}, 15_000 ), @@ -1775,14 +1786,19 @@ t_node_joins_existing_cluster(Config) -> wait_for_cluster_rpc(N2), {ok, _} = snabbkaffe:receive_events(SRef0), - ?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId])), + ?retry( + _Sleep1 = 100, + _Attempts1 = 50, + ?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId])) + ), + %% Give some time for the consumers in both nodes to %% rebalance. {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000), %% Publish some messages so we can check they came from each node. ?retry( - _Sleep1 = 100, - _Attempts1 = 50, + _Sleep2 = 100, + _Attempts2 = 50, true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic]) ), {ok, SRef1} = @@ -1792,7 +1808,7 @@ t_node_joins_existing_cluster(Config) -> ?snk_span := {complete, _} }), NPartitions, - 10_000 + 20_000 ), lists:foreach( fun(N) ->