feat(kafka): add option to configure health check interval

Fixes https://emqx.atlassian.net/browse/EMQX-10781
This commit is contained in:
Thales Macedo Garitezi 2023-08-16 11:05:33 -03:00
parent eb878f60b1
commit ffca581229
5 changed files with 19 additions and 5 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_kafka, [ {application, emqx_bridge_kafka, [
{description, "EMQX Enterprise Kafka Bridge"}, {description, "EMQX Enterprise Kafka Bridge"},
{vsn, "0.1.7"}, {vsn, "0.1.8"},
{registered, [emqx_bridge_kafka_consumer_sup]}, {registered, [emqx_bridge_kafka_consumer_sup]},
{applications, [ {applications, [
kernel, kernel,

View File

@ -268,7 +268,8 @@ fields(producer_opts) ->
required => true, required => true,
desc => ?DESC(producer_kafka_opts), desc => ?DESC(producer_kafka_opts),
validator => fun producer_strategy_key_validator/1 validator => fun producer_strategy_key_validator/1
})} })},
{resource_opts, mk(ref(resource_opts), #{default => #{}})}
]; ];
fields(producer_kafka_opts) -> fields(producer_kafka_opts) ->
[ [
@ -425,7 +426,8 @@ fields(consumer_opts) ->
{value_encoding_mode, {value_encoding_mode,
mk(enum([none, base64]), #{ mk(enum([none, base64]), #{
default => none, desc => ?DESC(consumer_value_encoding_mode) default => none, desc => ?DESC(consumer_value_encoding_mode)
})} })},
{resource_opts, mk(ref(resource_opts), #{default => #{}})}
]; ];
fields(consumer_topic_mapping) -> fields(consumer_topic_mapping) ->
[ [
@ -460,10 +462,16 @@ fields(consumer_kafka_opts) ->
emqx_schema:timeout_duration_s(), emqx_schema:timeout_duration_s(),
#{default => <<"5s">>, desc => ?DESC(consumer_offset_commit_interval_seconds)} #{default => <<"5s">>, desc => ?DESC(consumer_offset_commit_interval_seconds)}
)} )}
]. ];
fields(resource_opts) ->
SupportedFields = [health_check_interval],
CreationOpts = emqx_resource_schema:create_opts(_Overrides = []),
lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts).
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc("get_" ++ Type) when Type =:= "consumer"; Type =:= "producer" -> desc("get_" ++ Type) when Type =:= "consumer"; Type =:= "producer" ->
["Configuration for Kafka using `GET` method."]; ["Configuration for Kafka using `GET` method."];
desc("put_" ++ Type) when Type =:= "consumer"; Type =:= "producer" -> desc("put_" ++ Type) when Type =:= "consumer"; Type =:= "producer" ->

View File

@ -596,7 +596,6 @@ t_send_message_with_headers(Config) ->
}, },
KafkaMsg KafkaMsg
), ),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
%% TODO: refactor those into init/end per testcase %% TODO: refactor those into init/end per testcase
ok = ?PRODUCER:on_stop(ResourceId, State), ok = ?PRODUCER:on_stop(ResourceId, State),
?assertEqual([], supervisor:which_children(wolff_client_sup)), ?assertEqual([], supervisor:which_children(wolff_client_sup)),

View File

@ -306,6 +306,9 @@ kafka_producer_new_hocon() ->
" sndbuf = \"1024KB\"\n" " sndbuf = \"1024KB\"\n"
" }\n" " }\n"
" ssl {enable = false, verify = \"verify_peer\"}\n" " ssl {enable = false, verify = \"verify_peer\"}\n"
" resource_opts {\n"
" health_check_interval = 10s\n"
" }\n"
" }\n" " }\n"
"}\n" "}\n"
"". "".
@ -351,5 +354,8 @@ bridges.kafka_consumer.my_consumer {
verify = verify_none verify = verify_none
server_name_indication = \"auto\" server_name_indication = \"auto\"
} }
resource_opts {
health_check_interval = 10s
}
} }
""". """.

View File

@ -0,0 +1 @@
Added the option to configure health check interval for Kafka bridges.