From ffca5812293da368f7878340aa06eba0552df8a6 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 16 Aug 2023 11:05:33 -0300 Subject: [PATCH] feat(kafka): add option to configure health check interval Fixes https://emqx.atlassian.net/browse/EMQX-10781 --- .../src/emqx_bridge_kafka.app.src | 2 +- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl | 14 +++++++++++--- .../test/emqx_bridge_kafka_impl_producer_SUITE.erl | 1 - .../test/emqx_bridge_kafka_tests.erl | 6 ++++++ changes/ee/feat-11459.en.md | 1 + 5 files changed, 19 insertions(+), 5 deletions(-) create mode 100644 changes/ee/feat-11459.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 3792409c6..55b02560b 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 544c95b85..6b3f3cd64 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -268,7 +268,8 @@ fields(producer_opts) -> required => true, desc => ?DESC(producer_kafka_opts), validator => fun producer_strategy_key_validator/1 - })} + })}, + {resource_opts, mk(ref(resource_opts), #{default => #{}})} ]; fields(producer_kafka_opts) -> [ @@ -425,7 +426,8 @@ fields(consumer_opts) -> {value_encoding_mode, mk(enum([none, base64]), #{ default => none, desc => ?DESC(consumer_value_encoding_mode) - })} + })}, + {resource_opts, mk(ref(resource_opts), #{default => #{}})} ]; fields(consumer_topic_mapping) -> [ @@ -460,10 +462,16 @@ fields(consumer_kafka_opts) -> emqx_schema:timeout_duration_s(), #{default => <<"5s">>, desc => ?DESC(consumer_offset_commit_interval_seconds)} )} - ]. + ]; +fields(resource_opts) -> + SupportedFields = [health_check_interval], + CreationOpts = emqx_resource_schema:create_opts(_Overrides = []), + lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts). desc("config") -> ?DESC("desc_config"); +desc(resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc("get_" ++ Type) when Type =:= "consumer"; Type =:= "producer" -> ["Configuration for Kafka using `GET` method."]; desc("put_" ++ Type) when Type =:= "consumer"; Type =:= "producer" -> diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 31cd4c66a..d93b6dd7d 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -596,7 +596,6 @@ t_send_message_with_headers(Config) -> }, KafkaMsg ), - ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), %% TODO: refactor those into init/end per testcase ok = ?PRODUCER:on_stop(ResourceId, State), ?assertEqual([], supervisor:which_children(wolff_client_sup)), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 367423cd4..f476ded39 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -306,6 +306,9 @@ kafka_producer_new_hocon() -> " sndbuf = \"1024KB\"\n" " }\n" " ssl {enable = false, verify = \"verify_peer\"}\n" + " resource_opts {\n" + " health_check_interval = 10s\n" + " }\n" " }\n" "}\n" "". @@ -351,5 +354,8 @@ bridges.kafka_consumer.my_consumer { verify = verify_none server_name_indication = \"auto\" } + resource_opts { + health_check_interval = 10s + } } """. diff --git a/changes/ee/feat-11459.en.md b/changes/ee/feat-11459.en.md new file mode 100644 index 000000000..88b2047c4 --- /dev/null +++ b/changes/ee/feat-11459.en.md @@ -0,0 +1 @@ +Added the option to configure health check interval for Kafka bridges.