Merge pull request #11003 from SergeTupchiy/EMQX-8725-kafka-bridge-tcp-keepalive

Add kafka bridge tcp keepalive option
This commit is contained in:
SergeTupchiy 2023-06-09 23:06:14 +03:00 committed by GitHub
commit 22df275596
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 133 additions and 61 deletions

View File

@ -275,28 +275,17 @@ stats(#state{
async_set_keepalive(Idle, Interval, Probes) ->
async_set_keepalive(os:type(), self(), Idle, Interval, Probes).
async_set_keepalive({unix, linux}, Pid, Idle, Interval, Probes) ->
Options = [
{keepalive, true},
{raw, 6, 4, <<Idle:32/native>>},
{raw, 6, 5, <<Interval:32/native>>},
{raw, 6, 6, <<Probes:32/native>>}
],
async_set_socket_options(Pid, Options);
async_set_keepalive({unix, darwin}, Pid, Idle, Interval, Probes) ->
Options = [
{keepalive, true},
{raw, 6, 16#10, <<Idle:32/native>>},
{raw, 6, 16#101, <<Interval:32/native>>},
{raw, 6, 16#102, <<Probes:32/native>>}
],
async_set_socket_options(Pid, Options);
async_set_keepalive(OS, _Pid, _Idle, _Interval, _Probes) ->
?SLOG(warning, #{
msg => "Unsupported operation: set TCP keepalive",
os => OS
}),
ok.
async_set_keepalive(OS, Pid, Idle, Interval, Probes) ->
case emqx_utils:tcp_keepalive_opts(OS, Idle, Interval, Probes) of
{ok, Options} ->
async_set_socket_options(Pid, Options);
{error, {unsupported_os, OS}} ->
?SLOG(warning, #{
msg => "Unsupported operation: set TCP keepalive",
os => OS
}),
ok
end.
%% @doc Set custom socket options.
%% This API is made async because the call might be originated from

View File

@ -74,7 +74,8 @@ values(common_config) ->
socket_opts => #{
sndbuf => <<"1024KB">>,
recbuf => <<"1024KB">>,
nodelay => true
nodelay => true,
tcp_keepalive => <<"none">>
}
};
values(producer) ->
@ -236,7 +237,13 @@ fields(socket_opts) ->
importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC(socket_nodelay)
}
)}
)},
{tcp_keepalive,
mk(string(), #{
default => <<"none">>,
desc => ?DESC(socket_tcp_keepalive),
validator => fun emqx_schema:validate_tcp_keepalive/1
})}
];
fields(producer_opts) ->
[

View File

@ -8,9 +8,12 @@
-export([
hosts/1,
make_client_id/2,
sasl/1
sasl/1,
socket_opts/1
]).
-include_lib("emqx/include/logger.hrl").
%% Parse comma separated host:port list into a [{Host,Port}] list
hosts(Hosts) when is_binary(Hosts) ->
hosts(binary_to_list(Hosts));
@ -33,6 +36,51 @@ sasl(#{
}) ->
{callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}.
%% Extra socket options, such as sndbuf size etc.
socket_opts(Opts) when is_map(Opts) ->
socket_opts(maps:to_list(Opts));
socket_opts(Opts) when is_list(Opts) ->
socket_opts_loop(Opts, []).
socket_opts_loop([], Acc) ->
lists:reverse(Acc);
socket_opts_loop([{tcp_keepalive, KeepAlive} | Rest], Acc) ->
Acc1 = tcp_keepalive(KeepAlive) ++ Acc,
socket_opts_loop(Rest, Acc1);
socket_opts_loop([{T, Bytes} | Rest], Acc) when
T =:= sndbuf orelse T =:= recbuf orelse T =:= buffer
->
Acc1 = [{T, Bytes} | adjust_socket_buffer(Bytes, Acc)],
socket_opts_loop(Rest, Acc1);
socket_opts_loop([Other | Rest], Acc) ->
socket_opts_loop(Rest, [Other | Acc]).
%% https://www.erlang.org/doc/man/inet.html
%% For TCP it is recommended to have val(buffer) >= val(recbuf)
%% to avoid performance issues because of unnecessary copying.
adjust_socket_buffer(Bytes, Opts) ->
case lists:keytake(buffer, 1, Opts) of
false ->
[{buffer, Bytes} | Opts];
{value, {buffer, Bytes1}, Acc1} ->
[{buffer, max(Bytes1, Bytes)} | Acc1]
end.
tcp_keepalive(None) when None =:= "none"; None =:= <<"none">> ->
[];
tcp_keepalive(KeepAlive) ->
{Idle, Interval, Probes} = emqx_schema:parse_tcp_keepalive(KeepAlive),
case emqx_utils:tcp_keepalive_opts(os:type(), Idle, Interval, Probes) of
{ok, Opts} ->
Opts;
{error, {unsupported_os, OS}} ->
?SLOG(warning, #{
msg => "Unsupported operation: set TCP keepalive",
os => OS
}),
[]
end.
to_bin(A) when is_atom(A) ->
atom_to_binary(A);
to_bin(L) when is_list(L) ->

View File

@ -131,6 +131,7 @@ on_start(ResourceId, Config) ->
offset_commit_interval_seconds := _,
offset_reset_policy := _
},
socket_opts := SocketOpts0,
ssl := SSL,
topic_mapping := _
} = Config,
@ -144,8 +145,10 @@ on_start(ResourceId, Config) ->
Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}]
end,
ClientOpts = add_ssl_opts(ClientOpts0, SSL),
SocketOpts = emqx_bridge_kafka_impl:socket_opts(SocketOpts0),
ClientOpts1 = [{extra_sock_opts, SocketOpts} | ClientOpts],
ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID),
case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of
case brod:start_client(BootstrapHosts, ClientID, ClientOpts1) of
ok ->
?tp(
kafka_consumer_client_started,

View File

@ -61,7 +61,7 @@ on_start(InstId, Config) ->
connect_timeout => ConnTimeout,
client_id => ClientId,
request_timeout => MetaReqTimeout,
extra_sock_opts => socket_opts(SocketOpts),
extra_sock_opts => emqx_bridge_kafka_impl:socket_opts(SocketOpts),
sasl => emqx_bridge_kafka_impl:sasl(Auth),
ssl => ssl(SSL)
},
@ -309,33 +309,6 @@ do_get_status(Client, KafkaTopic) ->
disconnected
end.
%% Extra socket options, such as sndbuf size etc.
socket_opts(Opts) when is_map(Opts) ->
socket_opts(maps:to_list(Opts));
socket_opts(Opts) when is_list(Opts) ->
socket_opts_loop(Opts, []).
socket_opts_loop([], Acc) ->
lists:reverse(Acc);
socket_opts_loop([{T, Bytes} | Rest], Acc) when
T =:= sndbuf orelse T =:= recbuf orelse T =:= buffer
->
Acc1 = [{T, Bytes} | adjust_socket_buffer(Bytes, Acc)],
socket_opts_loop(Rest, Acc1);
socket_opts_loop([Other | Rest], Acc) ->
socket_opts_loop(Rest, [Other | Acc]).
%% https://www.erlang.org/doc/man/inet.html
%% For TCP it is recommended to have val(buffer) >= val(recbuf)
%% to avoid performance issues because of unnecessary copying.
adjust_socket_buffer(Bytes, Opts) ->
case lists:keytake(buffer, 1, Opts) of
false ->
[{buffer, Bytes} | Opts];
{value, {buffer, Bytes1}, Acc1} ->
[{buffer, max(Bytes1, Bytes)} | Acc1]
end.
ssl(#{enable := true} = SSL) ->
emqx_tls_lib:to_client_opts(SSL);
ssl(_) ->

View File

@ -168,6 +168,24 @@ message_key_dispatch_validations_test() ->
),
ok.
tcp_keepalive_validation_test_() ->
ProducerConf = parse(kafka_producer_new_hocon()),
ConsumerConf = parse(kafka_consumer_hocon()),
test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++
test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf).
test_keepalive_validation(Name, Conf) ->
Path = [<<"bridges">>] ++ Name ++ [<<"socket_opts">>, <<"tcp_keepalive">>],
Conf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,7">>),
Conf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"none">>),
ValidConfs = [Conf, Conf1, Conf2],
InvalidConf = emqx_utils_maps:deep_force_put(Path, Conf, <<"invalid">>),
InvalidConf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6">>),
InvalidConf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,1000">>),
InvalidConfs = [InvalidConf, InvalidConf1, InvalidConf2],
[?_assertMatch(#{<<"bridges">> := _}, check(C)) || C <- ValidConfs] ++
[?_assertThrow(_, check(C)) || C <- InvalidConfs].
%%===========================================================================
%% Helper functions
%%===========================================================================

View File

@ -57,7 +57,8 @@
pub_props_to_packet/1,
safe_filename/1,
diff_lists/3,
merge_lists/3
merge_lists/3,
tcp_keepalive_opts/4
]).
-export([
@ -488,6 +489,26 @@ safe_to_existing_atom(Atom, _Encoding) when is_atom(Atom) ->
safe_to_existing_atom(_Any, _Encoding) ->
{error, invalid_type}.
-spec tcp_keepalive_opts(term(), non_neg_integer(), non_neg_integer(), non_neg_integer()) ->
{ok, [{keepalive, true} | {raw, non_neg_integer(), non_neg_integer(), binary()}]}
| {error, {unsupported_os, term()}}.
tcp_keepalive_opts({unix, linux}, Idle, Interval, Probes) ->
{ok, [
{keepalive, true},
{raw, 6, 4, <<Idle:32/native>>},
{raw, 6, 5, <<Interval:32/native>>},
{raw, 6, 6, <<Probes:32/native>>}
]};
tcp_keepalive_opts({unix, darwin}, Idle, Interval, Probes) ->
{ok, [
{keepalive, true},
{raw, 6, 16#10, <<Idle:32/native>>},
{raw, 6, 16#101, <<Interval:32/native>>},
{raw, 6, 16#102, <<Probes:32/native>>}
]};
tcp_keepalive_opts(OS, _Idle, _Interval, _Probes) ->
{error, {unsupported_os, OS}}.
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------

View File

@ -0,0 +1 @@
Add an option to configure TCP keepalive in Kafka bridge.

View File

@ -38,6 +38,24 @@ socket_send_buffer.desc:
socket_send_buffer.label:
"""Socket Send Buffer Size"""
socket_receive_buffer.desc:
"""Fine tune the socket receive buffer. The default value is tuned for high throughput."""
socket_receive_buffer.label:
"""Socket Receive Buffer Size"""
socket_tcp_keepalive.desc:
"""Enable TCP keepalive for Kafka bridge connections.
The value is three comma separated numbers in the format of 'Idle,Interval,Probes'
- Idle: The number of seconds a connection needs to be idle before the server begins to send out keep-alive probes (Linux default 7200).
- Interval: The number of seconds between TCP keep-alive probes (Linux default 75).
- Probes: The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no response is obtained from the other end (Linux default 9).
For example "240,30,5" means: TCP keepalive probes are sent after the connection is idle for 240 seconds, and the probes are sent every 30 seconds until a response is received, if it misses 5 consecutive responses, the connection should be closed.
Default: 'none'"""
socket_tcp_keepalive.label:
"""TCP keepalive options"""
desc_name.desc:
"""Bridge name, used as a human-readable description of the bridge."""
@ -56,12 +74,6 @@ consumer_max_batch_bytes.desc:
consumer_max_batch_bytes.label:
"""Fetch Bytes"""
socket_receive_buffer.desc:
"""Fine tune the socket receive buffer. The default value is tuned for high throughput."""
socket_receive_buffer.label:
"""Socket Receive Buffer Size"""
consumer_topic_mapping.desc:
"""Defines the mapping between Kafka topics and MQTT topics. Must contain at least one item."""