feat(emqx_bridge_kafka): add tcp keepalive option
Closes: EMQX-8725
This commit is contained in:
parent
87b57112df
commit
48ac942807
|
@ -74,7 +74,8 @@ values(common_config) ->
|
||||||
socket_opts => #{
|
socket_opts => #{
|
||||||
sndbuf => <<"1024KB">>,
|
sndbuf => <<"1024KB">>,
|
||||||
recbuf => <<"1024KB">>,
|
recbuf => <<"1024KB">>,
|
||||||
nodelay => true
|
nodelay => true,
|
||||||
|
tcp_keepalive => <<"none">>
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
values(producer) ->
|
values(producer) ->
|
||||||
|
@ -236,7 +237,13 @@ fields(socket_opts) ->
|
||||||
importance => ?IMPORTANCE_HIDDEN,
|
importance => ?IMPORTANCE_HIDDEN,
|
||||||
desc => ?DESC(socket_nodelay)
|
desc => ?DESC(socket_nodelay)
|
||||||
}
|
}
|
||||||
)}
|
)},
|
||||||
|
{tcp_keepalive,
|
||||||
|
mk(string(), #{
|
||||||
|
default => <<"none">>,
|
||||||
|
desc => ?DESC(socket_tcp_keepalive),
|
||||||
|
validator => fun emqx_schema:validate_tcp_keepalive/1
|
||||||
|
})}
|
||||||
];
|
];
|
||||||
fields(producer_opts) ->
|
fields(producer_opts) ->
|
||||||
[
|
[
|
||||||
|
|
|
@ -8,9 +8,12 @@
|
||||||
-export([
|
-export([
|
||||||
hosts/1,
|
hosts/1,
|
||||||
make_client_id/2,
|
make_client_id/2,
|
||||||
sasl/1
|
sasl/1,
|
||||||
|
socket_opts/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
%% Parse comma separated host:port list into a [{Host,Port}] list
|
%% Parse comma separated host:port list into a [{Host,Port}] list
|
||||||
hosts(Hosts) when is_binary(Hosts) ->
|
hosts(Hosts) when is_binary(Hosts) ->
|
||||||
hosts(binary_to_list(Hosts));
|
hosts(binary_to_list(Hosts));
|
||||||
|
@ -33,6 +36,51 @@ sasl(#{
|
||||||
}) ->
|
}) ->
|
||||||
{callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}.
|
{callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}.
|
||||||
|
|
||||||
|
%% Extra socket options, such as sndbuf size etc.
|
||||||
|
socket_opts(Opts) when is_map(Opts) ->
|
||||||
|
socket_opts(maps:to_list(Opts));
|
||||||
|
socket_opts(Opts) when is_list(Opts) ->
|
||||||
|
socket_opts_loop(Opts, []).
|
||||||
|
|
||||||
|
socket_opts_loop([], Acc) ->
|
||||||
|
lists:reverse(Acc);
|
||||||
|
socket_opts_loop([{tcp_keepalive, KeepAlive} | Rest], Acc) ->
|
||||||
|
Acc1 = tcp_keepalive(KeepAlive) ++ Acc,
|
||||||
|
socket_opts_loop(Rest, Acc1);
|
||||||
|
socket_opts_loop([{T, Bytes} | Rest], Acc) when
|
||||||
|
T =:= sndbuf orelse T =:= recbuf orelse T =:= buffer
|
||||||
|
->
|
||||||
|
Acc1 = [{T, Bytes} | adjust_socket_buffer(Bytes, Acc)],
|
||||||
|
socket_opts_loop(Rest, Acc1);
|
||||||
|
socket_opts_loop([Other | Rest], Acc) ->
|
||||||
|
socket_opts_loop(Rest, [Other | Acc]).
|
||||||
|
|
||||||
|
%% https://www.erlang.org/doc/man/inet.html
|
||||||
|
%% For TCP it is recommended to have val(buffer) >= val(recbuf)
|
||||||
|
%% to avoid performance issues because of unnecessary copying.
|
||||||
|
adjust_socket_buffer(Bytes, Opts) ->
|
||||||
|
case lists:keytake(buffer, 1, Opts) of
|
||||||
|
false ->
|
||||||
|
[{buffer, Bytes} | Opts];
|
||||||
|
{value, {buffer, Bytes1}, Acc1} ->
|
||||||
|
[{buffer, max(Bytes1, Bytes)} | Acc1]
|
||||||
|
end.
|
||||||
|
|
||||||
|
tcp_keepalive(None) when None =:= "none"; None =:= <<"none">> ->
|
||||||
|
[];
|
||||||
|
tcp_keepalive(KeepAlive) ->
|
||||||
|
{Idle, Interval, Probes} = emqx_schema:parse_tcp_keepalive(KeepAlive),
|
||||||
|
case emqx_utils:tcp_keepalive_opts(os:type(), Idle, Interval, Probes) of
|
||||||
|
{ok, Opts} ->
|
||||||
|
Opts;
|
||||||
|
{error, {unsupported_os, OS}} ->
|
||||||
|
?SLOG(warning, #{
|
||||||
|
msg => "Unsupported operation: set TCP keepalive",
|
||||||
|
os => OS
|
||||||
|
}),
|
||||||
|
[]
|
||||||
|
end.
|
||||||
|
|
||||||
to_bin(A) when is_atom(A) ->
|
to_bin(A) when is_atom(A) ->
|
||||||
atom_to_binary(A);
|
atom_to_binary(A);
|
||||||
to_bin(L) when is_list(L) ->
|
to_bin(L) when is_list(L) ->
|
||||||
|
|
|
@ -131,6 +131,7 @@ on_start(ResourceId, Config) ->
|
||||||
offset_commit_interval_seconds := _,
|
offset_commit_interval_seconds := _,
|
||||||
offset_reset_policy := _
|
offset_reset_policy := _
|
||||||
},
|
},
|
||||||
|
socket_opts := SocketOpts0,
|
||||||
ssl := SSL,
|
ssl := SSL,
|
||||||
topic_mapping := _
|
topic_mapping := _
|
||||||
} = Config,
|
} = Config,
|
||||||
|
@ -144,8 +145,10 @@ on_start(ResourceId, Config) ->
|
||||||
Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}]
|
Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}]
|
||||||
end,
|
end,
|
||||||
ClientOpts = add_ssl_opts(ClientOpts0, SSL),
|
ClientOpts = add_ssl_opts(ClientOpts0, SSL),
|
||||||
|
SocketOpts = emqx_bridge_kafka_impl:socket_opts(SocketOpts0),
|
||||||
|
ClientOpts1 = [{extra_sock_opts, SocketOpts} | ClientOpts],
|
||||||
ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID),
|
ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID),
|
||||||
case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of
|
case brod:start_client(BootstrapHosts, ClientID, ClientOpts1) of
|
||||||
ok ->
|
ok ->
|
||||||
?tp(
|
?tp(
|
||||||
kafka_consumer_client_started,
|
kafka_consumer_client_started,
|
||||||
|
|
|
@ -61,7 +61,7 @@ on_start(InstId, Config) ->
|
||||||
connect_timeout => ConnTimeout,
|
connect_timeout => ConnTimeout,
|
||||||
client_id => ClientId,
|
client_id => ClientId,
|
||||||
request_timeout => MetaReqTimeout,
|
request_timeout => MetaReqTimeout,
|
||||||
extra_sock_opts => socket_opts(SocketOpts),
|
extra_sock_opts => emqx_bridge_kafka_impl:socket_opts(SocketOpts),
|
||||||
sasl => emqx_bridge_kafka_impl:sasl(Auth),
|
sasl => emqx_bridge_kafka_impl:sasl(Auth),
|
||||||
ssl => ssl(SSL)
|
ssl => ssl(SSL)
|
||||||
},
|
},
|
||||||
|
@ -309,33 +309,6 @@ do_get_status(Client, KafkaTopic) ->
|
||||||
disconnected
|
disconnected
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Extra socket options, such as sndbuf size etc.
|
|
||||||
socket_opts(Opts) when is_map(Opts) ->
|
|
||||||
socket_opts(maps:to_list(Opts));
|
|
||||||
socket_opts(Opts) when is_list(Opts) ->
|
|
||||||
socket_opts_loop(Opts, []).
|
|
||||||
|
|
||||||
socket_opts_loop([], Acc) ->
|
|
||||||
lists:reverse(Acc);
|
|
||||||
socket_opts_loop([{T, Bytes} | Rest], Acc) when
|
|
||||||
T =:= sndbuf orelse T =:= recbuf orelse T =:= buffer
|
|
||||||
->
|
|
||||||
Acc1 = [{T, Bytes} | adjust_socket_buffer(Bytes, Acc)],
|
|
||||||
socket_opts_loop(Rest, Acc1);
|
|
||||||
socket_opts_loop([Other | Rest], Acc) ->
|
|
||||||
socket_opts_loop(Rest, [Other | Acc]).
|
|
||||||
|
|
||||||
%% https://www.erlang.org/doc/man/inet.html
|
|
||||||
%% For TCP it is recommended to have val(buffer) >= val(recbuf)
|
|
||||||
%% to avoid performance issues because of unnecessary copying.
|
|
||||||
adjust_socket_buffer(Bytes, Opts) ->
|
|
||||||
case lists:keytake(buffer, 1, Opts) of
|
|
||||||
false ->
|
|
||||||
[{buffer, Bytes} | Opts];
|
|
||||||
{value, {buffer, Bytes1}, Acc1} ->
|
|
||||||
[{buffer, max(Bytes1, Bytes)} | Acc1]
|
|
||||||
end.
|
|
||||||
|
|
||||||
ssl(#{enable := true} = SSL) ->
|
ssl(#{enable := true} = SSL) ->
|
||||||
emqx_tls_lib:to_client_opts(SSL);
|
emqx_tls_lib:to_client_opts(SSL);
|
||||||
ssl(_) ->
|
ssl(_) ->
|
||||||
|
|
|
@ -168,6 +168,24 @@ message_key_dispatch_validations_test() ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
tcp_keepalive_validation_test_() ->
|
||||||
|
ProducerConf = parse(kafka_producer_new_hocon()),
|
||||||
|
ConsumerConf = parse(kafka_consumer_hocon()),
|
||||||
|
test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++
|
||||||
|
test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf).
|
||||||
|
|
||||||
|
test_keepalive_validation(Name, Conf) ->
|
||||||
|
Path = [<<"bridges">>] ++ Name ++ [<<"socket_opts">>, <<"tcp_keepalive">>],
|
||||||
|
Conf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,7">>),
|
||||||
|
Conf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"none">>),
|
||||||
|
ValidConfs = [Conf, Conf1, Conf2],
|
||||||
|
InvalidConf = emqx_utils_maps:deep_force_put(Path, Conf, <<"invalid">>),
|
||||||
|
InvalidConf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6">>),
|
||||||
|
InvalidConf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,1000">>),
|
||||||
|
InvalidConfs = [InvalidConf, InvalidConf1, InvalidConf2],
|
||||||
|
[?_assertMatch(#{<<"bridges">> := _}, check(C)) || C <- ValidConfs] ++
|
||||||
|
[?_assertThrow(_, check(C)) || C <- InvalidConfs].
|
||||||
|
|
||||||
%%===========================================================================
|
%%===========================================================================
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%===========================================================================
|
%%===========================================================================
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Add an option to configure TCP keepalive in Kafka bridge.
|
|
@ -38,6 +38,24 @@ socket_send_buffer.desc:
|
||||||
socket_send_buffer.label:
|
socket_send_buffer.label:
|
||||||
"""Socket Send Buffer Size"""
|
"""Socket Send Buffer Size"""
|
||||||
|
|
||||||
|
socket_receive_buffer.desc:
|
||||||
|
"""Fine tune the socket receive buffer. The default value is tuned for high throughput."""
|
||||||
|
|
||||||
|
socket_receive_buffer.label:
|
||||||
|
"""Socket Receive Buffer Size"""
|
||||||
|
|
||||||
|
socket_tcp_keepalive.desc:
|
||||||
|
"""Enable TCP keepalive for Kafka bridge connections.
|
||||||
|
The value is three comma separated numbers in the format of 'Idle,Interval,Probes'
|
||||||
|
- Idle: The number of seconds a connection needs to be idle before the server begins to send out keep-alive probes (Linux default 7200).
|
||||||
|
- Interval: The number of seconds between TCP keep-alive probes (Linux default 75).
|
||||||
|
- Probes: The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no response is obtained from the other end (Linux default 9).
|
||||||
|
For example "240,30,5" means: TCP keepalive probes are sent after the connection is idle for 240 seconds, and the probes are sent every 30 seconds until a response is received, if it misses 5 consecutive responses, the connection should be closed.
|
||||||
|
Default: 'none'"""
|
||||||
|
|
||||||
|
socket_tcp_keepalive.label:
|
||||||
|
"""TCP keepalive options"""
|
||||||
|
|
||||||
desc_name.desc:
|
desc_name.desc:
|
||||||
"""Bridge name, used as a human-readable description of the bridge."""
|
"""Bridge name, used as a human-readable description of the bridge."""
|
||||||
|
|
||||||
|
@ -56,12 +74,6 @@ consumer_max_batch_bytes.desc:
|
||||||
consumer_max_batch_bytes.label:
|
consumer_max_batch_bytes.label:
|
||||||
"""Fetch Bytes"""
|
"""Fetch Bytes"""
|
||||||
|
|
||||||
socket_receive_buffer.desc:
|
|
||||||
"""Fine tune the socket receive buffer. The default value is tuned for high throughput."""
|
|
||||||
|
|
||||||
socket_receive_buffer.label:
|
|
||||||
"""Socket Receive Buffer Size"""
|
|
||||||
|
|
||||||
consumer_topic_mapping.desc:
|
consumer_topic_mapping.desc:
|
||||||
"""Defines the mapping between Kafka topics and MQTT topics. Must contain at least one item."""
|
"""Defines the mapping between Kafka topics and MQTT topics. Must contain at least one item."""
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue