refactor: change enum constructors and improve docs

This commit is contained in:
Thales Macedo Garitezi 2023-03-27 15:11:06 -03:00
parent 5cf09209cd
commit 69fc1123ee
6 changed files with 19 additions and 20 deletions

View File

@ -53,7 +53,7 @@ VOLUME ["/opt/emqx/log", "/opt/emqx/data"]
# - 11883 port for internal MQTT/TCP # - 11883 port for internal MQTT/TCP
# - 18083 for dashboard and API # - 18083 for dashboard and API
# - 4370 default Erlang distribution port # - 4370 default Erlang distribution port
# - 5369 for backplain gen_rpc # - 5369 for backplane gen_rpc
EXPOSE 1883 8083 8084 8883 11883 18083 4370 5369 EXPOSE 1883 8083 8084 8883 11883 18083 4370 5369
ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"] ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"]

View File

@ -599,12 +599,10 @@ emqx_ee_bridge_kafka {
} }
consumer_offset_reset_policy { consumer_offset_reset_policy {
desc { desc {
en: "Defines how the consumers should reset the start offset when " en: "Defines from which offset a consumer should start fetching when there"
"a topic partition has an invalid offset (i.e. when an `OffsetOutOfRange` occurs) or" " is no commit history or when the commit history becomes invalid."
" when there is no committed offset for the topic-partition yet." zh: "当没有主题分区没有偏移量的历史记录,或则历史记录失效后,"
zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时," "消费者应该使用哪个偏移量重新开始消费"
"消费者应如何重置开始偏移量。(即当发生 \"OffsetOutOfRange\" 时)。"
" 或者当主题分区还没有承诺的偏移量时。"
} }
label { label {
en: "Offset Reset Policy" en: "Offset Reset Policy"

View File

@ -105,7 +105,7 @@ values(consumer) ->
#{ #{
kafka => #{ kafka => #{
max_batch_bytes => <<"896KB">>, max_batch_bytes => <<"896KB">>,
offset_reset_policy => <<"reset_to_latest">>, offset_reset_policy => <<"latest">>,
offset_commit_interval_seconds => 5 offset_commit_interval_seconds => 5
}, },
key_encoding_mode => <<"none">>, key_encoding_mode => <<"none">>,
@ -370,8 +370,8 @@ fields(consumer_kafka_opts) ->
})}, })},
{offset_reset_policy, {offset_reset_policy,
mk( mk(
enum([reset_to_latest, reset_to_earliest]), enum([latest, earliest]),
#{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)} #{default => latest, desc => ?DESC(consumer_offset_reset_policy)}
)}, )},
{offset_commit_interval_seconds, {offset_commit_interval_seconds,
mk( mk(

View File

@ -59,8 +59,7 @@
subscriber_id := subscriber_id(), subscriber_id := subscriber_id(),
kafka_client_id := brod:client_id() kafka_client_id := brod:client_id()
}. }.
-type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber. -type offset_reset_policy() :: latest | earliest.
%% -type mqtt_payload() :: full_message | message_value.
-type encoding_mode() :: none | base64. -type encoding_mode() :: none | base64.
-type consumer_init_data() :: #{ -type consumer_init_data() :: #{
hookpoint := binary(), hookpoint := binary(),
@ -271,7 +270,7 @@ start_consumer(Config, InstanceId, ClientID) ->
max_batch_bytes := MaxBatchBytes, max_batch_bytes := MaxBatchBytes,
max_rejoin_attempts := MaxRejoinAttempts, max_rejoin_attempts := MaxRejoinAttempts,
offset_commit_interval_seconds := OffsetCommitInterval, offset_commit_interval_seconds := OffsetCommitInterval,
offset_reset_policy := OffsetResetPolicy offset_reset_policy := OffsetResetPolicy0
}, },
key_encoding_mode := KeyEncodingMode, key_encoding_mode := KeyEncodingMode,
topic_mapping := TopicMapping0, topic_mapping := TopicMapping0,
@ -290,10 +289,12 @@ start_consumer(Config, InstanceId, ClientID) ->
%% cluster, so that the load gets distributed between all %% cluster, so that the load gets distributed between all
%% consumers and we don't repeat messages in the same cluster. %% consumers and we don't repeat messages in the same cluster.
GroupID = consumer_group_id(BridgeName), GroupID = consumer_group_id(BridgeName),
BeginOffset = %% earliest or latest
case OffsetResetPolicy of BeginOffset = OffsetResetPolicy0,
reset_to_latest -> latest; OffsetResetPolicy =
reset_to_earliest -> earliest case OffsetResetPolicy0 of
latest -> reset_to_latest;
earliest -> reset_to_earliest
end, end,
ConsumerConfig = [ ConsumerConfig = [
{begin_offset, BeginOffset}, {begin_offset, BeginOffset},

View File

@ -575,7 +575,7 @@ kafka_config(TestCase, _KafkaType, Config) ->
" max_rejoin_attempts = 5\n" " max_rejoin_attempts = 5\n"
" offset_commit_interval_seconds = 3\n" " offset_commit_interval_seconds = 3\n"
%% todo: matrix this %% todo: matrix this
" offset_reset_policy = reset_to_latest\n" " offset_reset_policy = latest\n"
" }\n" " }\n"
"~s" "~s"
" key_encoding_mode = none\n" " key_encoding_mode = none\n"
@ -1944,7 +1944,7 @@ t_begin_offset_earliest(Config) ->
), ),
{ok, _} = create_bridge(Config, #{ {ok, _} = create_bridge(Config, #{
<<"kafka">> => #{<<"offset_reset_policy">> => <<"reset_to_earliest">>} <<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>}
}), }),
#{num_published => NumMessages} #{num_published => NumMessages}

View File

@ -260,7 +260,7 @@ bridges.kafka_consumer.my_consumer {
max_batch_bytes = 896KB max_batch_bytes = 896KB
max_rejoin_attempts = 5 max_rejoin_attempts = 5
offset_commit_interval_seconds = 3 offset_commit_interval_seconds = 3
offset_reset_policy = reset_to_latest offset_reset_policy = latest
} }
topic_mapping = [ topic_mapping = [
{ {