feat(kafka_consumer): add `offset_commit_interval_seconds` kafka parameter

This commit is contained in:
Thales Macedo Garitezi 2023-03-01 16:43:34 -03:00
parent 1d5fe14a30
commit e1fdd041b3
4 changed files with 23 additions and 1 deletions

View File

@ -597,4 +597,14 @@ emqx_ee_bridge_kafka {
zh: "偏移重置政策"
}
}
consumer_offset_commit_interval_seconds {
desc {
en: "Defines the time interval between two <code>OffsetCommitRequest</code> messages."
zh: "定义了两个<code>OffsetCommitRequest</code>消息之间的时间间隔。"
}
label {
en: "Offset Commit Interval"
zh: "偏移承诺间隔"
}
}
}

View File

@ -280,6 +280,11 @@ fields(consumer_kafka_opts) ->
mk(
enum([reset_to_latest, reset_to_earliest, reset_by_subscriber]),
#{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)}
)},
{offset_commit_interval_seconds,
mk(
pos_integer(),
#{default => 5, desc => ?DESC(consumer_offset_commit_interval_seconds)}
)}
].

View File

@ -37,6 +37,7 @@
kafka := #{
max_batch_bytes := emqx_schema:bytesize(),
max_rejoin_attempts := non_neg_integer(),
offset_commit_interval_seconds := pos_integer(),
offset_reset_policy := offset_reset_policy(),
topic := binary()
},
@ -90,6 +91,7 @@ on_start(InstanceId, Config) ->
kafka := #{
max_batch_bytes := _,
max_rejoin_attempts := _,
offset_commit_interval_seconds := _,
offset_reset_policy := _,
topic := _
},
@ -248,6 +250,7 @@ start_subscriber(Config, InstanceId, ClientID) ->
kafka := #{
max_batch_bytes := MaxBatchBytes,
max_rejoin_attempts := MaxRejoinAttempts,
offset_commit_interval_seconds := OffsetCommitInterval,
offset_reset_policy := OffsetResetPolicy,
topic := KafkaTopic
},
@ -272,7 +275,10 @@ start_subscriber(Config, InstanceId, ClientID) ->
{max_bytes, MaxBatchBytes},
{offset_reset_policy, OffsetResetPolicy}
],
GroupConfig = [{max_rejoin_attempts, MaxRejoinAttempts}],
GroupConfig = [
{max_rejoin_attempts, MaxRejoinAttempts},
{offset_commit_interval_seconds, OffsetCommitInterval}
],
GroupSubscriberConfig =
#{
client => ClientID,

View File

@ -494,6 +494,7 @@ kafka_config(TestCase, _KafkaType, Config) ->
" topic = ~s\n"
" max_batch_bytes = 896KB\n"
" max_rejoin_attempts = 5\n"
" offset_commit_interval_seconds = 3\n"
%% todo: matrix this
" offset_reset_policy = reset_to_latest\n"
" }\n"