diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index fb8b19b69..d246b4639 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.0.21"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.0.2-rc.1"). +-define(EMQX_RELEASE_EE, "5.0.2-rc.2"). %% the HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx_bridge/include/emqx_bridge_resource.hrl b/apps/emqx_bridge/include/emqx_bridge_resource.hrl new file mode 100644 index 000000000..fcf1c41a4 --- /dev/null +++ b/apps/emqx_bridge/include/emqx_bridge_resource.hrl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_BRIDGE_RESOURCE_HRL). +-define(EMQX_BRIDGE_RESOURCE_HRL, true). + +-define(BRIDGE_HOOKPOINT(BridgeId), <<"$bridges/", BridgeId/binary>>). + +-endif. diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 6426a46b7..b43cbe0ec 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -15,6 +15,7 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_resource). +-include("emqx_bridge_resource.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -23,7 +24,9 @@ resource_id/1, resource_id/2, bridge_id/2, - parse_bridge_id/1 + parse_bridge_id/1, + bridge_hookpoint/1, + bridge_hookpoint_to_bridge_id/1 ]). -export([ @@ -89,6 +92,14 @@ parse_bridge_id(BridgeId) -> ) end. +bridge_hookpoint(BridgeId) -> + <<"$bridges/", (bin(BridgeId))/binary>>. + +bridge_hookpoint_to_bridge_id(?BRIDGE_HOOKPOINT(BridgeId)) -> + {ok, BridgeId}; +bridge_hookpoint_to_bridge_id(_) -> + {error, bad_bridge_hookpoint}. + validate_name(Name0) -> Name = unicode:characters_to_list(Name0, utf8), case is_list(Name) andalso Name =/= [] of @@ -308,7 +319,8 @@ parse_confs(Type, Name, Conf) when ?IS_INGRESS_BRIDGE(Type) -> %% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it %% receives a message from the external database. BId = bridge_id(Type, Name), - Conf#{hookpoint => <<"$bridges/", BId/binary>>, bridge_name => Name}; + BridgeHookpoint = bridge_hookpoint(BId), + Conf#{hookpoint => BridgeHookpoint, bridge_name => Name}; %% TODO: rename this to `kafka_producer' after alias support is added %% to hocon; keeping this as just `kafka' for backwards compatibility. parse_confs(<<"kafka">> = _Type, Name, Conf) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_resource_tests.erl b/apps/emqx_bridge/test/emqx_bridge_resource_tests.erl new file mode 100644 index 000000000..a8a83ff6a --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_resource_tests.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_resource_tests). + +-include_lib("eunit/include/eunit.hrl"). + +bridge_hookpoint_test_() -> + BridgeId = emqx_bridge_resource:bridge_id(type, name), + BridgeHookpoint = emqx_bridge_resource:bridge_hookpoint(BridgeId), + [ + ?_assertEqual(<<"$bridges/type:name">>, BridgeHookpoint), + ?_assertEqual( + {ok, BridgeId}, + emqx_bridge_resource:bridge_hookpoint_to_bridge_id(BridgeHookpoint) + ), + ?_assertEqual( + {error, bad_bridge_hookpoint}, + emqx_bridge_resource:bridge_hookpoint_to_bridge_id(BridgeId) + ) + ]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index f15290547..d494a4740 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -213,11 +213,12 @@ get_rules_with_same_event(Topic) -> ]. -spec get_rule_ids_by_action(action_name()) -> [rule_id()]. -get_rule_ids_by_action(ActionName) when is_binary(ActionName) -> +get_rule_ids_by_action(BridgeId) when is_binary(BridgeId) -> [ Id - || #{actions := Acts, id := Id} <- get_rules(), - lists:any(fun(A) -> A =:= ActionName end, Acts) + || #{actions := Acts, id := Id, from := Froms} <- get_rules(), + forwards_to_bridge(Acts, BridgeId) orelse + references_ingress_bridge(Froms, BridgeId) ]; get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) -> {Mod, Fun} = @@ -317,8 +318,14 @@ get_basic_usage_info() -> NumRules = length(EnabledRules), ReferencedBridges = lists:foldl( - fun(#{actions := Actions, from := From}, Acc) -> - BridgeIDs0 = [BridgeID || <<"$bridges/", BridgeID/binary>> <- From], + fun(#{actions := Actions, from := Froms}, Acc) -> + BridgeIDs0 = + [ + BridgeID + || From <- Froms, + {ok, BridgeID} <- + [emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)] + ], BridgeIDs1 = lists:filter(fun is_binary/1, Actions), tally_referenced_bridges(BridgeIDs0 ++ BridgeIDs1, Acc) end, @@ -478,3 +485,19 @@ contains_actions(Actions, Mod0, Func0) -> end, Actions ). + +forwards_to_bridge(Actions, BridgeId) -> + lists:any(fun(A) -> A =:= BridgeId end, Actions). + +references_ingress_bridge(Froms, BridgeId) -> + lists:any( + fun(ReferenceBridgeId) -> + BridgeId =:= ReferenceBridgeId + end, + [ + RefBridgeId + || From <- Froms, + {ok, RefBridgeId} <- + [emqx_bridge_resource:bridge_hookpoint_to_bridge_id(From)] + ] + ). diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 0c962f1fa..7f14f6d8b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge_resource.hrl"). -export([ reload/0, @@ -1011,7 +1012,7 @@ hook_fun_name(HookPoint) -> HookFunName. %% return static function references to help static code checks -hook_fun(<<"$bridges/", _/binary>>) -> fun ?MODULE:on_bridge_message_received/2; +hook_fun(?BRIDGE_HOOKPOINT(_)) -> fun ?MODULE:on_bridge_message_received/2; hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3; hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4; hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4; @@ -1034,7 +1035,7 @@ ntoa(undefined) -> undefined; ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]); ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)). -event_name(<<"$bridges/", _/binary>> = Bridge) -> Bridge; +event_name(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge; event_name(<<"$events/client_connected">>) -> 'client.connected'; event_name(<<"$events/client_disconnected">>) -> 'client.disconnected'; event_name(<<"$events/client_connack">>) -> 'client.connack'; @@ -1047,7 +1048,7 @@ event_name(<<"$events/message_dropped">>) -> 'message.dropped'; event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. -event_topic(<<"$bridges/", _/binary>> = Bridge) -> Bridge; +event_topic(?BRIDGE_HOOKPOINT(_) = Bridge) -> Bridge; event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>; event_topic('client.connack') -> <<"$events/client_connack">>; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index c986cd365..2de013975 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -25,6 +25,8 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + %%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())). -define(TMP_RULEID, atom_to_binary(?FUNCTION_NAME)). @@ -198,8 +200,11 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(t_events, Config) -> ets:delete(events_record_tab), - ok = delete_rule(?config(hook_points_rules, Config)); + ok = delete_rule(?config(hook_points_rules, Config)), + emqx_common_test_helpers:call_janitor(), + ok; end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(), ok. %%------------------------------------------------------------------------------ @@ -2683,6 +2688,24 @@ t_get_basic_usage_info_1(_Config) -> ), ok. +t_get_rule_ids_by_action_reference_ingress_bridge(_Config) -> + BridgeId = <<"mqtt:ingress">>, + RuleId = <<"rule:ingress_bridge_referenced">>, + {ok, _} = + emqx_rule_engine:create_rule( + #{ + id => RuleId, + sql => <<"select 1 from \"$bridges/", BridgeId/binary, "\"">>, + actions => [#{function => console}] + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId) end), + ?assertMatch( + [RuleId], + emqx_rule_engine:get_rule_ids_by_action(BridgeId) + ), + ok. + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/fix-10251.en.md b/changes/ce/fix-10251.en.md new file mode 100644 index 000000000..84102f952 --- /dev/null +++ b/changes/ce/fix-10251.en.md @@ -0,0 +1,3 @@ +Consider bridges referenced in `FROM` rule clauses as dependencies. + +Before this fix, when one tried to delete an ingress rule referenced in an action like `select * from "$bridges/mqtt:ingress"`, the UI would not trigger a warning about dependent rule actions. diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index 308c26231..8f04c433c 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -53,7 +53,7 @@ VOLUME ["/opt/emqx/log", "/opt/emqx/data"] # - 11883 port for internal MQTT/TCP # - 18083 for dashboard and API # - 4370 default Erlang distribution port -# - 5369 for backplain gen_rpc +# - 5369 for backplane gen_rpc EXPOSE 1883 8083 8084 8883 11883 18083 4370 5369 ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"] diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index 39b9d48f4..787a39fdb 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -599,10 +599,10 @@ emqx_ee_bridge_kafka { } consumer_offset_reset_policy { desc { - en: "Defines how the consumers should reset the start offset when " - "a topic partition has and invalid or no initial offset." - zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时," - "消费者应如何重置开始偏移量。" + en: "Defines from which offset a consumer should start fetching when there" + " is no commit history or when the commit history becomes invalid." + zh: "当没有主题分区没有偏移量的历史记录,或则历史记录失效后," + "消费者应该使用哪个偏移量重新开始消费" } label { en: "Offset Reset Policy" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index d9401b7fd..fd7b3563a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -105,7 +105,7 @@ values(consumer) -> #{ kafka => #{ max_batch_bytes => <<"896KB">>, - offset_reset_policy => <<"reset_to_latest">>, + offset_reset_policy => <<"latest">>, offset_commit_interval_seconds => 5 }, key_encoding_mode => <<"none">>, @@ -370,8 +370,8 @@ fields(consumer_kafka_opts) -> })}, {offset_reset_policy, mk( - enum([reset_to_latest, reset_to_earliest, reset_by_subscriber]), - #{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)} + enum([latest, earliest]), + #{default => latest, desc => ?DESC(consumer_offset_reset_policy)} )}, {offset_commit_interval_seconds, mk( diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index 44633213c..a05f6ec13 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -59,8 +59,7 @@ subscriber_id := subscriber_id(), kafka_client_id := brod:client_id() }. --type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber. -%% -type mqtt_payload() :: full_message | message_value. +-type offset_reset_policy() :: latest | earliest. -type encoding_mode() :: none | base64. -type consumer_init_data() :: #{ hookpoint := binary(), @@ -271,7 +270,7 @@ start_consumer(Config, InstanceId, ClientID) -> max_batch_bytes := MaxBatchBytes, max_rejoin_attempts := MaxRejoinAttempts, offset_commit_interval_seconds := OffsetCommitInterval, - offset_reset_policy := OffsetResetPolicy + offset_reset_policy := OffsetResetPolicy0 }, key_encoding_mode := KeyEncodingMode, topic_mapping := TopicMapping0, @@ -290,7 +289,15 @@ start_consumer(Config, InstanceId, ClientID) -> %% cluster, so that the load gets distributed between all %% consumers and we don't repeat messages in the same cluster. GroupID = consumer_group_id(BridgeName), + %% earliest or latest + BeginOffset = OffsetResetPolicy0, + OffsetResetPolicy = + case OffsetResetPolicy0 of + latest -> reset_to_latest; + earliest -> reset_to_earliest + end, ConsumerConfig = [ + {begin_offset, BeginOffset}, {max_bytes, MaxBatchBytes}, {offset_reset_policy, OffsetResetPolicy} ], diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index d46f687dd..5703c69f5 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -7,6 +7,7 @@ %% callbacks of behaviour emqx_resource -export([ + is_buffer_supported/0, callback_mode/0, on_start/2, on_stop/2, @@ -26,6 +27,8 @@ %% to hocon; keeping this as just `kafka' for backwards compatibility. -define(BRIDGE_TYPE, kafka). +is_buffer_supported() -> true. + callback_mode() -> async_if_possible. %% @doc Config schema is defined in emqx_ee_bridge_kafka. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index be6494cb2..02a4c3c3b 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -53,6 +53,7 @@ sasl_only_tests() -> %% tests that do not need to be run on all groups only_once_tests() -> [ + t_begin_offset_earliest, t_bridge_rule_action_source, t_cluster_group, t_node_joins_existing_cluster, @@ -274,7 +275,18 @@ init_per_testcase(TestCase, Config) when [{skip_does_not_apply, true}] end; init_per_testcase(TestCase, Config) when - TestCase =:= t_failed_creation_then_fixed; + TestCase =:= t_failed_creation_then_fixed +-> + %% test with one partiton only for this case because + %% the wait probe may not be always sent to the same partition + HasProxy = proplists:get_value(has_proxy, Config, true), + case HasProxy of + false -> + [{skip_does_not_apply, true}]; + true -> + common_init_per_testcase(TestCase, [{num_partitions, 1} | Config]) + end; +init_per_testcase(TestCase, Config) when TestCase =:= t_on_get_status; TestCase =:= t_receive_after_recovery -> @@ -574,7 +586,7 @@ kafka_config(TestCase, _KafkaType, Config) -> " max_rejoin_attempts = 5\n" " offset_commit_interval_seconds = 3\n" %% todo: matrix this - " offset_reset_policy = reset_to_latest\n" + " offset_reset_policy = latest\n" " }\n" "~s" " key_encoding_mode = none\n" @@ -1736,7 +1748,18 @@ t_node_joins_existing_cluster(Config) -> setup_group_subscriber_spy(N1), {{ok, _}, {ok, _}} = ?wait_async_action( - erpc:call(N1, fun() -> {ok, _} = create_bridge(Config) end), + erpc:call(N1, fun() -> + {ok, _} = create_bridge( + Config, + #{ + <<"kafka">> => + #{ + <<"offset_reset_policy">> => + <<"earliest">> + } + } + ) + end), #{?snk_kind := kafka_consumer_subscriber_started}, 15_000 ), @@ -1767,14 +1790,19 @@ t_node_joins_existing_cluster(Config) -> wait_for_cluster_rpc(N2), {ok, _} = snabbkaffe:receive_events(SRef0), - ?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId])), + ?retry( + _Sleep1 = 100, + _Attempts1 = 50, + ?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId])) + ), + %% Give some time for the consumers in both nodes to %% rebalance. {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000), %% Publish some messages so we can check they came from each node. ?retry( - _Sleep1 = 100, - _Attempts1 = 50, + _Sleep2 = 100, + _Attempts2 = 50, true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic]) ), {ok, SRef1} = @@ -1784,7 +1812,7 @@ t_node_joins_existing_cluster(Config) -> ?snk_span := {complete, _} }), NPartitions, - 10_000 + 20_000 ), lists:foreach( fun(N) -> @@ -1919,3 +1947,57 @@ t_cluster_node_down(Config) -> end ), ok. + +t_begin_offset_earliest(Config) -> + MQTTTopic = ?config(mqtt_topic, Config), + ResourceId = resource_id(Config), + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + {ok, C} = emqtt:start_link([{proto_ver, v5}]), + on_exit(fun() -> emqtt:stop(C) end), + {ok, _} = emqtt:connect(C), + {ok, _, [2]} = emqtt:subscribe(C, MQTTTopic, 2), + + ?check_trace( + begin + %% publish a message before the bridge is started. + NumMessages = 5, + lists:foreach( + fun(N) -> + publish(Config, [ + #{ + key => <<"mykey", (integer_to_binary(N))/binary>>, + value => Payload, + headers => [{<<"hkey">>, <<"hvalue">>}] + } + ]) + end, + lists:seq(1, NumMessages) + ), + + {ok, _} = create_bridge(Config, #{ + <<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>} + }), + + #{num_published => NumMessages} + end, + fun(Res, _Trace) -> + #{num_published := NumMessages} = Res, + %% we should receive messages published before starting + %% the consumers + Published = receive_published(#{n => NumMessages}), + Payloads = lists:map( + fun(#{payload := P}) -> emqx_json:decode(P, [return_maps]) end, + Published + ), + ?assert( + lists:all( + fun(#{<<"value">> := V}) -> V =:= Payload end, + Payloads + ), + #{payloads => Payloads} + ), + ?assertEqual(NumMessages, emqx_resource_metrics:received_get(ResourceId)), + ok + end + ), + ok. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl index 72096c7b1..1b32f856d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl @@ -260,7 +260,7 @@ bridges.kafka_consumer.my_consumer { max_batch_bytes = 896KB max_rejoin_attempts = 5 offset_commit_interval_seconds = 3 - offset_reset_policy = reset_to_latest + offset_reset_policy = latest } topic_mapping = [ {