diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index 53bc5dddd..c6bc52bdd 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -597,4 +597,14 @@ emqx_ee_bridge_kafka { zh: "偏移重置政策" } } + consumer_offset_commit_interval_seconds { + desc { + en: "Defines the time interval between two OffsetCommitRequest messages." + zh: "定义了两个OffsetCommitRequest消息之间的时间间隔。" + } + label { + en: "Offset Commit Interval" + zh: "偏移承诺间隔" + } + } } diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 865a5f64b..89cad3421 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -280,6 +280,11 @@ fields(consumer_kafka_opts) -> mk( enum([reset_to_latest, reset_to_earliest, reset_by_subscriber]), #{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)} + )}, + {offset_commit_interval_seconds, + mk( + pos_integer(), + #{default => 5, desc => ?DESC(consumer_offset_commit_interval_seconds)} )} ]. diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index 076d7fd97..a38807f91 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -37,6 +37,7 @@ kafka := #{ max_batch_bytes := emqx_schema:bytesize(), max_rejoin_attempts := non_neg_integer(), + offset_commit_interval_seconds := pos_integer(), offset_reset_policy := offset_reset_policy(), topic := binary() }, @@ -90,6 +91,7 @@ on_start(InstanceId, Config) -> kafka := #{ max_batch_bytes := _, max_rejoin_attempts := _, + offset_commit_interval_seconds := _, offset_reset_policy := _, topic := _ }, @@ -248,6 +250,7 @@ start_subscriber(Config, InstanceId, ClientID) -> kafka := #{ max_batch_bytes := MaxBatchBytes, max_rejoin_attempts := MaxRejoinAttempts, + offset_commit_interval_seconds := OffsetCommitInterval, offset_reset_policy := OffsetResetPolicy, topic := KafkaTopic }, @@ -272,7 +275,10 @@ start_subscriber(Config, InstanceId, ClientID) -> {max_bytes, MaxBatchBytes}, {offset_reset_policy, OffsetResetPolicy} ], - GroupConfig = [{max_rejoin_attempts, MaxRejoinAttempts}], + GroupConfig = [ + {max_rejoin_attempts, MaxRejoinAttempts}, + {offset_commit_interval_seconds, OffsetCommitInterval} + ], GroupSubscriberConfig = #{ client => ClientID, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index ce5003a8b..086699a07 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -494,6 +494,7 @@ kafka_config(TestCase, _KafkaType, Config) -> " topic = ~s\n" " max_batch_bytes = 896KB\n" " max_rejoin_attempts = 5\n" + " offset_commit_interval_seconds = 3\n" %% todo: matrix this " offset_reset_policy = reset_to_latest\n" " }\n"