From 0d36b179c06e8b314acae175412c4ece93623970 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Mar 2023 11:09:58 -0300 Subject: [PATCH 1/3] docs: fix kafka offset reset policy config description --- lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index 39b9d48f4..af0afb72b 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -600,9 +600,12 @@ emqx_ee_bridge_kafka { consumer_offset_reset_policy { desc { en: "Defines how the consumers should reset the start offset when " - "a topic partition has and invalid or no initial offset." + "a topic partition has an invalid offset (i.e. when an `OffsetOutOfRange` occurs)." + " Note that this is not the same as the `begin_offset`, which defines where to start" + " consumption when there is no offset committed yet." zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时," - "消费者应如何重置开始偏移量。" + "消费者应如何重置开始偏移量。(即当发生 \"OffsetOutOfRange\" 时)。" + " 请注意,这与`begin_offset'不同,后者定义了在还没有提交偏移量的情况下从哪里开始消费。" } label { en: "Offset Reset Policy" From 5cf09209cd7485ff82142ef244cca244907f18af Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Mar 2023 14:18:00 -0300 Subject: [PATCH 2/3] feat: tie `offset_reset_policy` and `begin_offset` together MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit To make the configuration more intuitive and avoid exposing more parameters to the user, we should: 1) Remove reset_by_subscriber as an enum constructor for `offset_reset_policy`, as that might make the consumer hang indefinitely without manual action. 2) Set the `begin_offset` `brod_consumer` parameter to `earliest` or `latest` depending on the value of `offset_reset_policy`, as that’s probably the user’s intention. --- .../i18n/emqx_ee_bridge_kafka.conf | 7 +-- .../emqx_ee_bridge/src/emqx_ee_bridge.app.src | 2 +- .../src/emqx_ee_bridge_kafka.erl | 2 +- .../kafka/emqx_bridge_impl_kafka_consumer.erl | 6 ++ .../emqx_bridge_impl_kafka_consumer_SUITE.erl | 55 +++++++++++++++++++ 5 files changed, 66 insertions(+), 6 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index af0afb72b..5f6a31b39 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -600,12 +600,11 @@ emqx_ee_bridge_kafka { consumer_offset_reset_policy { desc { en: "Defines how the consumers should reset the start offset when " - "a topic partition has an invalid offset (i.e. when an `OffsetOutOfRange` occurs)." - " Note that this is not the same as the `begin_offset`, which defines where to start" - " consumption when there is no offset committed yet." + "a topic partition has an invalid offset (i.e. when an `OffsetOutOfRange` occurs) or" + " when there is no committed offset for the topic-partition yet." zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时," "消费者应如何重置开始偏移量。(即当发生 \"OffsetOutOfRange\" 时)。" - " 请注意,这与`begin_offset'不同,后者定义了在还没有提交偏移量的情况下从哪里开始消费。" + " 或者当主题分区还没有承诺的偏移量时。" } label { en: "Offset Reset Policy" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 6647ec212..156c3eeac 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_bridge, [ {description, "EMQX Enterprise data bridges"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, [emqx_ee_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 3db8dd5f1..61124f1c1 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -370,7 +370,7 @@ fields(consumer_kafka_opts) -> })}, {offset_reset_policy, mk( - enum([reset_to_latest, reset_to_earliest, reset_by_subscriber]), + enum([reset_to_latest, reset_to_earliest]), #{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)} )}, {offset_commit_interval_seconds, diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index 44633213c..5407bebf7 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -290,7 +290,13 @@ start_consumer(Config, InstanceId, ClientID) -> %% cluster, so that the load gets distributed between all %% consumers and we don't repeat messages in the same cluster. GroupID = consumer_group_id(BridgeName), + BeginOffset = + case OffsetResetPolicy of + reset_to_latest -> latest; + reset_to_earliest -> earliest + end, ConsumerConfig = [ + {begin_offset, BeginOffset}, {max_bytes, MaxBatchBytes}, {offset_reset_policy, OffsetResetPolicy} ], diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index 15b4fbe40..37e697add 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -53,6 +53,7 @@ sasl_only_tests() -> %% tests that do not need to be run on all groups only_once_tests() -> [ + t_begin_offset_earliest, t_bridge_rule_action_source, t_cluster_group, t_node_joins_existing_cluster, @@ -1915,3 +1916,57 @@ t_cluster_node_down(Config) -> end ), ok. + +t_begin_offset_earliest(Config) -> + MQTTTopic = ?config(mqtt_topic, Config), + ResourceId = resource_id(Config), + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + {ok, C} = emqtt:start_link([{proto_ver, v5}]), + on_exit(fun() -> emqtt:stop(C) end), + {ok, _} = emqtt:connect(C), + {ok, _, [2]} = emqtt:subscribe(C, MQTTTopic, 2), + + ?check_trace( + begin + %% publish a message before the bridge is started. + NumMessages = 5, + lists:foreach( + fun(N) -> + publish(Config, [ + #{ + key => <<"mykey", (integer_to_binary(N))/binary>>, + value => Payload, + headers => [{<<"hkey">>, <<"hvalue">>}] + } + ]) + end, + lists:seq(1, NumMessages) + ), + + {ok, _} = create_bridge(Config, #{ + <<"kafka">> => #{<<"offset_reset_policy">> => <<"reset_to_earliest">>} + }), + + #{num_published => NumMessages} + end, + fun(Res, _Trace) -> + #{num_published := NumMessages} = Res, + %% we should receive messages published before starting + %% the consumers + Published = receive_published(#{n => NumMessages}), + Payloads = lists:map( + fun(#{payload := P}) -> emqx_json:decode(P, [return_maps]) end, + Published + ), + ?assert( + lists:all( + fun(#{<<"value">> := V}) -> V =:= Payload end, + Payloads + ), + #{payloads => Payloads} + ), + ?assertEqual(NumMessages, emqx_resource_metrics:received_get(ResourceId)), + ok + end + ), + ok. From 69fc1123eeb02e53073912eccfaf73e3851c888e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 27 Mar 2023 15:11:06 -0300 Subject: [PATCH 3/3] refactor: change enum constructors and improve docs --- deploy/docker/Dockerfile | 2 +- .../emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf | 10 ++++------ .../emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl | 6 +++--- .../src/kafka/emqx_bridge_impl_kafka_consumer.erl | 15 ++++++++------- .../emqx_bridge_impl_kafka_consumer_SUITE.erl | 4 ++-- .../test/emqx_ee_bridge_kafka_tests.erl | 2 +- 6 files changed, 19 insertions(+), 20 deletions(-) diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index f26926bce..bb5c23ea4 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -53,7 +53,7 @@ VOLUME ["/opt/emqx/log", "/opt/emqx/data"] # - 11883 port for internal MQTT/TCP # - 18083 for dashboard and API # - 4370 default Erlang distribution port -# - 5369 for backplain gen_rpc +# - 5369 for backplane gen_rpc EXPOSE 1883 8083 8084 8883 11883 18083 4370 5369 ENTRYPOINT ["/usr/bin/docker-entrypoint.sh"] diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index 5f6a31b39..787a39fdb 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -599,12 +599,10 @@ emqx_ee_bridge_kafka { } consumer_offset_reset_policy { desc { - en: "Defines how the consumers should reset the start offset when " - "a topic partition has an invalid offset (i.e. when an `OffsetOutOfRange` occurs) or" - " when there is no committed offset for the topic-partition yet." - zh: "定义当一个主题分区的初始偏移量无效或没有初始偏移量时," - "消费者应如何重置开始偏移量。(即当发生 \"OffsetOutOfRange\" 时)。" - " 或者当主题分区还没有承诺的偏移量时。" + en: "Defines from which offset a consumer should start fetching when there" + " is no commit history or when the commit history becomes invalid." + zh: "当没有主题分区没有偏移量的历史记录,或则历史记录失效后," + "消费者应该使用哪个偏移量重新开始消费" } label { en: "Offset Reset Policy" diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 61124f1c1..f623417b2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -105,7 +105,7 @@ values(consumer) -> #{ kafka => #{ max_batch_bytes => <<"896KB">>, - offset_reset_policy => <<"reset_to_latest">>, + offset_reset_policy => <<"latest">>, offset_commit_interval_seconds => 5 }, key_encoding_mode => <<"none">>, @@ -370,8 +370,8 @@ fields(consumer_kafka_opts) -> })}, {offset_reset_policy, mk( - enum([reset_to_latest, reset_to_earliest]), - #{default => reset_to_latest, desc => ?DESC(consumer_offset_reset_policy)} + enum([latest, earliest]), + #{default => latest, desc => ?DESC(consumer_offset_reset_policy)} )}, {offset_commit_interval_seconds, mk( diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index 5407bebf7..a05f6ec13 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -59,8 +59,7 @@ subscriber_id := subscriber_id(), kafka_client_id := brod:client_id() }. --type offset_reset_policy() :: reset_to_latest | reset_to_earliest | reset_by_subscriber. -%% -type mqtt_payload() :: full_message | message_value. +-type offset_reset_policy() :: latest | earliest. -type encoding_mode() :: none | base64. -type consumer_init_data() :: #{ hookpoint := binary(), @@ -271,7 +270,7 @@ start_consumer(Config, InstanceId, ClientID) -> max_batch_bytes := MaxBatchBytes, max_rejoin_attempts := MaxRejoinAttempts, offset_commit_interval_seconds := OffsetCommitInterval, - offset_reset_policy := OffsetResetPolicy + offset_reset_policy := OffsetResetPolicy0 }, key_encoding_mode := KeyEncodingMode, topic_mapping := TopicMapping0, @@ -290,10 +289,12 @@ start_consumer(Config, InstanceId, ClientID) -> %% cluster, so that the load gets distributed between all %% consumers and we don't repeat messages in the same cluster. GroupID = consumer_group_id(BridgeName), - BeginOffset = - case OffsetResetPolicy of - reset_to_latest -> latest; - reset_to_earliest -> earliest + %% earliest or latest + BeginOffset = OffsetResetPolicy0, + OffsetResetPolicy = + case OffsetResetPolicy0 of + latest -> reset_to_latest; + earliest -> reset_to_earliest end, ConsumerConfig = [ {begin_offset, BeginOffset}, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index 37e697add..8c2ff4e66 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -575,7 +575,7 @@ kafka_config(TestCase, _KafkaType, Config) -> " max_rejoin_attempts = 5\n" " offset_commit_interval_seconds = 3\n" %% todo: matrix this - " offset_reset_policy = reset_to_latest\n" + " offset_reset_policy = latest\n" " }\n" "~s" " key_encoding_mode = none\n" @@ -1944,7 +1944,7 @@ t_begin_offset_earliest(Config) -> ), {ok, _} = create_bridge(Config, #{ - <<"kafka">> => #{<<"offset_reset_policy">> => <<"reset_to_earliest">>} + <<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>} }), #{num_published => NumMessages} diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl index 72096c7b1..1b32f856d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_kafka_tests.erl @@ -260,7 +260,7 @@ bridges.kafka_consumer.my_consumer { max_batch_bytes = 896KB max_rejoin_attempts = 5 offset_commit_interval_seconds = 3 - offset_reset_policy = reset_to_latest + offset_reset_policy = latest } topic_mapping = [ {