From 87b57112df361588c4fb26bde4dd31a19e6ef30e Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 9 Jun 2023 21:43:50 +0300 Subject: [PATCH 1/2] refactor: move tcp keepalive options helper to emqx_utils --- apps/emqx/src/emqx_connection.erl | 33 ++++++++++-------------------- apps/emqx_utils/src/emqx_utils.erl | 23 ++++++++++++++++++++- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 385c20393..1172460ac 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -275,28 +275,17 @@ stats(#state{ async_set_keepalive(Idle, Interval, Probes) -> async_set_keepalive(os:type(), self(), Idle, Interval, Probes). -async_set_keepalive({unix, linux}, Pid, Idle, Interval, Probes) -> - Options = [ - {keepalive, true}, - {raw, 6, 4, <>}, - {raw, 6, 5, <>}, - {raw, 6, 6, <>} - ], - async_set_socket_options(Pid, Options); -async_set_keepalive({unix, darwin}, Pid, Idle, Interval, Probes) -> - Options = [ - {keepalive, true}, - {raw, 6, 16#10, <>}, - {raw, 6, 16#101, <>}, - {raw, 6, 16#102, <>} - ], - async_set_socket_options(Pid, Options); -async_set_keepalive(OS, _Pid, _Idle, _Interval, _Probes) -> - ?SLOG(warning, #{ - msg => "Unsupported operation: set TCP keepalive", - os => OS - }), - ok. +async_set_keepalive(OS, Pid, Idle, Interval, Probes) -> + case emqx_utils:tcp_keepalive_opts(OS, Idle, Interval, Probes) of + {ok, Options} -> + async_set_socket_options(Pid, Options); + {error, {unsupported_os, OS}} -> + ?SLOG(warning, #{ + msg => "Unsupported operation: set TCP keepalive", + os => OS + }), + ok + end. %% @doc Set custom socket options. %% This API is made async because the call might be originated from diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index c7888cd36..53ed37dee 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -57,7 +57,8 @@ pub_props_to_packet/1, safe_filename/1, diff_lists/3, - merge_lists/3 + merge_lists/3, + tcp_keepalive_opts/4 ]). -export([ @@ -488,6 +489,26 @@ safe_to_existing_atom(Atom, _Encoding) when is_atom(Atom) -> safe_to_existing_atom(_Any, _Encoding) -> {error, invalid_type}. +-spec tcp_keepalive_opts(term(), non_neg_integer(), non_neg_integer(), non_neg_integer()) -> + {ok, [{keepalive, true} | {raw, non_neg_integer(), non_neg_integer(), binary()}]} + | {error, {unsupported_os, term()}}. +tcp_keepalive_opts({unix, linux}, Idle, Interval, Probes) -> + {ok, [ + {keepalive, true}, + {raw, 6, 4, <>}, + {raw, 6, 5, <>}, + {raw, 6, 6, <>} + ]}; +tcp_keepalive_opts({unix, darwin}, Idle, Interval, Probes) -> + {ok, [ + {keepalive, true}, + {raw, 6, 16#10, <>}, + {raw, 6, 16#101, <>}, + {raw, 6, 16#102, <>} + ]}; +tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) -> + {error, {unsupported_os, OS}}. + %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ From 48ac94280771e876169f66a27093f877a847894a Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 9 Jun 2023 21:45:41 +0300 Subject: [PATCH 2/2] feat(emqx_bridge_kafka): add tcp keepalive option Closes: EMQX-8725 --- .../src/emqx_bridge_kafka.erl | 11 +++- .../src/emqx_bridge_kafka_impl.erl | 50 ++++++++++++++++++- .../src/emqx_bridge_kafka_impl_consumer.erl | 5 +- .../src/emqx_bridge_kafka_impl_producer.erl | 29 +---------- .../test/emqx_bridge_kafka_tests.erl | 18 +++++++ changes/ee/feat-11003.en.md | 1 + rel/i18n/emqx_bridge_kafka.hocon | 24 ++++++--- 7 files changed, 100 insertions(+), 38 deletions(-) create mode 100644 changes/ee/feat-11003.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 149346e4a..a81d93a1b 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -74,7 +74,8 @@ values(common_config) -> socket_opts => #{ sndbuf => <<"1024KB">>, recbuf => <<"1024KB">>, - nodelay => true + nodelay => true, + tcp_keepalive => <<"none">> } }; values(producer) -> @@ -236,7 +237,13 @@ fields(socket_opts) -> importance => ?IMPORTANCE_HIDDEN, desc => ?DESC(socket_nodelay) } - )} + )}, + {tcp_keepalive, + mk(string(), #{ + default => <<"none">>, + desc => ?DESC(socket_tcp_keepalive), + validator => fun emqx_schema:validate_tcp_keepalive/1 + })} ]; fields(producer_opts) -> [ diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl index 22a67c551..747515d9b 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl @@ -8,9 +8,12 @@ -export([ hosts/1, make_client_id/2, - sasl/1 + sasl/1, + socket_opts/1 ]). +-include_lib("emqx/include/logger.hrl"). + %% Parse comma separated host:port list into a [{Host,Port}] list hosts(Hosts) when is_binary(Hosts) -> hosts(binary_to_list(Hosts)); @@ -33,6 +36,51 @@ sasl(#{ }) -> {callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}. +%% Extra socket options, such as sndbuf size etc. +socket_opts(Opts) when is_map(Opts) -> + socket_opts(maps:to_list(Opts)); +socket_opts(Opts) when is_list(Opts) -> + socket_opts_loop(Opts, []). + +socket_opts_loop([], Acc) -> + lists:reverse(Acc); +socket_opts_loop([{tcp_keepalive, KeepAlive} | Rest], Acc) -> + Acc1 = tcp_keepalive(KeepAlive) ++ Acc, + socket_opts_loop(Rest, Acc1); +socket_opts_loop([{T, Bytes} | Rest], Acc) when + T =:= sndbuf orelse T =:= recbuf orelse T =:= buffer +-> + Acc1 = [{T, Bytes} | adjust_socket_buffer(Bytes, Acc)], + socket_opts_loop(Rest, Acc1); +socket_opts_loop([Other | Rest], Acc) -> + socket_opts_loop(Rest, [Other | Acc]). + +%% https://www.erlang.org/doc/man/inet.html +%% For TCP it is recommended to have val(buffer) >= val(recbuf) +%% to avoid performance issues because of unnecessary copying. +adjust_socket_buffer(Bytes, Opts) -> + case lists:keytake(buffer, 1, Opts) of + false -> + [{buffer, Bytes} | Opts]; + {value, {buffer, Bytes1}, Acc1} -> + [{buffer, max(Bytes1, Bytes)} | Acc1] + end. + +tcp_keepalive(None) when None =:= "none"; None =:= <<"none">> -> + []; +tcp_keepalive(KeepAlive) -> + {Idle, Interval, Probes} = emqx_schema:parse_tcp_keepalive(KeepAlive), + case emqx_utils:tcp_keepalive_opts(os:type(), Idle, Interval, Probes) of + {ok, Opts} -> + Opts; + {error, {unsupported_os, OS}} -> + ?SLOG(warning, #{ + msg => "Unsupported operation: set TCP keepalive", + os => OS + }), + [] + end. + to_bin(A) when is_atom(A) -> atom_to_binary(A); to_bin(L) when is_list(L) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c0de23d94..99266fccb 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -131,6 +131,7 @@ on_start(ResourceId, Config) -> offset_commit_interval_seconds := _, offset_reset_policy := _ }, + socket_opts := SocketOpts0, ssl := SSL, topic_mapping := _ } = Config, @@ -144,8 +145,10 @@ on_start(ResourceId, Config) -> Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}] end, ClientOpts = add_ssl_opts(ClientOpts0, SSL), + SocketOpts = emqx_bridge_kafka_impl:socket_opts(SocketOpts0), + ClientOpts1 = [{extra_sock_opts, SocketOpts} | ClientOpts], ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID), - case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of + case brod:start_client(BootstrapHosts, ClientID, ClientOpts1) of ok -> ?tp( kafka_consumer_client_started, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 8b8337b09..ad651a26d 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -61,7 +61,7 @@ on_start(InstId, Config) -> connect_timeout => ConnTimeout, client_id => ClientId, request_timeout => MetaReqTimeout, - extra_sock_opts => socket_opts(SocketOpts), + extra_sock_opts => emqx_bridge_kafka_impl:socket_opts(SocketOpts), sasl => emqx_bridge_kafka_impl:sasl(Auth), ssl => ssl(SSL) }, @@ -309,33 +309,6 @@ do_get_status(Client, KafkaTopic) -> disconnected end. -%% Extra socket options, such as sndbuf size etc. -socket_opts(Opts) when is_map(Opts) -> - socket_opts(maps:to_list(Opts)); -socket_opts(Opts) when is_list(Opts) -> - socket_opts_loop(Opts, []). - -socket_opts_loop([], Acc) -> - lists:reverse(Acc); -socket_opts_loop([{T, Bytes} | Rest], Acc) when - T =:= sndbuf orelse T =:= recbuf orelse T =:= buffer --> - Acc1 = [{T, Bytes} | adjust_socket_buffer(Bytes, Acc)], - socket_opts_loop(Rest, Acc1); -socket_opts_loop([Other | Rest], Acc) -> - socket_opts_loop(Rest, [Other | Acc]). - -%% https://www.erlang.org/doc/man/inet.html -%% For TCP it is recommended to have val(buffer) >= val(recbuf) -%% to avoid performance issues because of unnecessary copying. -adjust_socket_buffer(Bytes, Opts) -> - case lists:keytake(buffer, 1, Opts) of - false -> - [{buffer, Bytes} | Opts]; - {value, {buffer, Bytes1}, Acc1} -> - [{buffer, max(Bytes1, Bytes)} | Acc1] - end. - ssl(#{enable := true} = SSL) -> emqx_tls_lib:to_client_opts(SSL); ssl(_) -> diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 23bcf8e9d..0ccc19778 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -168,6 +168,24 @@ message_key_dispatch_validations_test() -> ), ok. +tcp_keepalive_validation_test_() -> + ProducerConf = parse(kafka_producer_new_hocon()), + ConsumerConf = parse(kafka_consumer_hocon()), + test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++ + test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf). + +test_keepalive_validation(Name, Conf) -> + Path = [<<"bridges">>] ++ Name ++ [<<"socket_opts">>, <<"tcp_keepalive">>], + Conf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,7">>), + Conf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"none">>), + ValidConfs = [Conf, Conf1, Conf2], + InvalidConf = emqx_utils_maps:deep_force_put(Path, Conf, <<"invalid">>), + InvalidConf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6">>), + InvalidConf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,1000">>), + InvalidConfs = [InvalidConf, InvalidConf1, InvalidConf2], + [?_assertMatch(#{<<"bridges">> := _}, check(C)) || C <- ValidConfs] ++ + [?_assertThrow(_, check(C)) || C <- InvalidConfs]. + %%=========================================================================== %% Helper functions %%=========================================================================== diff --git a/changes/ee/feat-11003.en.md b/changes/ee/feat-11003.en.md new file mode 100644 index 000000000..57f3dd3b5 --- /dev/null +++ b/changes/ee/feat-11003.en.md @@ -0,0 +1 @@ +Add an option to configure TCP keepalive in Kafka bridge. diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index ef2e27972..9739cf3fc 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -38,6 +38,24 @@ socket_send_buffer.desc: socket_send_buffer.label: """Socket Send Buffer Size""" +socket_receive_buffer.desc: +"""Fine tune the socket receive buffer. The default value is tuned for high throughput.""" + +socket_receive_buffer.label: +"""Socket Receive Buffer Size""" + +socket_tcp_keepalive.desc: +"""Enable TCP keepalive for Kafka bridge connections. +The value is three comma separated numbers in the format of 'Idle,Interval,Probes' + - Idle: The number of seconds a connection needs to be idle before the server begins to send out keep-alive probes (Linux default 7200). + - Interval: The number of seconds between TCP keep-alive probes (Linux default 75). + - Probes: The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no response is obtained from the other end (Linux default 9). +For example "240,30,5" means: TCP keepalive probes are sent after the connection is idle for 240 seconds, and the probes are sent every 30 seconds until a response is received, if it misses 5 consecutive responses, the connection should be closed. +Default: 'none'""" + +socket_tcp_keepalive.label: +"""TCP keepalive options""" + desc_name.desc: """Bridge name, used as a human-readable description of the bridge.""" @@ -56,12 +74,6 @@ consumer_max_batch_bytes.desc: consumer_max_batch_bytes.label: """Fetch Bytes""" -socket_receive_buffer.desc: -"""Fine tune the socket receive buffer. The default value is tuned for high throughput.""" - -socket_receive_buffer.label: -"""Socket Receive Buffer Size""" - consumer_topic_mapping.desc: """Defines the mapping between Kafka topics and MQTT topics. Must contain at least one item."""