Merge pull request #4851 from zmstone/tcp-keepalive

feat: async API to support tcp keepalive inet options
This commit is contained in:
Zaiming (Stone) Shi 2021-05-24 21:21:28 +02:00 committed by GitHub
commit b24ae5925a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 111 additions and 1 deletions

View File

@ -111,6 +111,7 @@ start_link() ->
insert_channel_info(ClientId, Info, Stats) -> insert_channel_info(ClientId, Info, Stats) ->
Chan = {ClientId, self()}, Chan = {ClientId, self()},
true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}), true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
?tp(debug, insert_channel_info, #{client_id => ClientId}),
ok. ok.
%% @private %% @private

View File

@ -41,8 +41,13 @@
, stats/1 , stats/1
]). ]).
-export([ async_set_keepalive/4
, async_set_socket_options/2
]).
-export([ call/2 -export([ call/2
, call/3 , call/3
, cast/2
]). ]).
%% Callback %% Callback
@ -56,7 +61,7 @@
]). ]).
%% Internal callback %% Internal callback
-export([wakeup_from_hib/2, recvloop/2]). -export([wakeup_from_hib/2, recvloop/2, get_state/1]).
%% Export for CT %% Export for CT
-export([set_field/3]). -export([set_field/3]).
@ -184,6 +189,35 @@ stats(#state{transport = Transport,
ProcStats = emqx_misc:proc_stats(), ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]). lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
%% @doc Set TCP keepalive socket options to override system defaults.
%% Idle: The number of seconds a connection needs to be idle before
%% TCP begins sending out keep-alive probes (Linux default 7200).
%% Interval: The number of seconds between TCP keep-alive probes
%% (Linux default 75).
%% Probes: The maximum number of TCP keep-alive probes to send before
%% giving up and killing the connection if no response is
%% obtained from the other end (Linux default 9).
%%
%% NOTE: This API sets TCP socket options, which has nothing to do with
%% the MQTT layer's keepalive (PINGREQ and PINGRESP).
async_set_keepalive(Pid, Idle, Interval, Probes) ->
Options = [ {keepalive, true}
, {raw, 6, 4, <<Idle:32/native>>}
, {raw, 6, 5, <<Interval:32/native>>}
, {raw, 6, 6, <<Probes:32/native>>}
],
async_set_socket_options(Pid, Options).
%% @doc Set custom socket options.
%% This API is made async because the call might be originated from
%% a hookpoint callback (otherwise deadlock).
%% If failed to set, the error message is logged.
async_set_socket_options(Pid, Options) ->
cast(Pid, {async_set_socket_options, Options}).
cast(Pid, Req) ->
gen_server:cast(Pid, Req).
call(Pid, Req) -> call(Pid, Req) ->
call(Pid, Req, infinity). call(Pid, Req, infinity).
call(Pid, Req, Timeout) -> call(Pid, Req, Timeout) ->
@ -366,6 +400,9 @@ handle_msg({'$gen_call', From, Req}, State) ->
gen_server:reply(From, Reply), gen_server:reply(From, Reply),
stop(Reason, NState) stop(Reason, NState)
end; end;
handle_msg({'$gen_cast', Req}, State) ->
NewState = handle_cast(Req, State),
{ok, NewState};
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
?LOG(debug, "RECV ~0p", [Data]), ?LOG(debug, "RECV ~0p", [Data]),
@ -692,6 +729,22 @@ handle_info({sock_error, Reason}, State) ->
handle_info(Info, State) -> handle_info(Info, State) ->
with_channel(handle_info, [Info], State). with_channel(handle_info, [Info], State).
%%--------------------------------------------------------------------
%% Handle Info
handle_cast({async_set_socket_options, Opts},
State = #state{transport = Transport,
socket = Socket
}) ->
case Transport:setopts(Socket, Opts) of
ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts});
Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err})
end,
State;
handle_cast(Req, State) ->
?tp(error, "received_unknown_cast", #{cast => Req}),
State.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Ensure rate limit %% Ensure rate limit
@ -820,3 +873,7 @@ set_field(Name, Value, State) ->
Pos = emqx_misc:index_of(Name, record_info(fields, state)), Pos = emqx_misc:index_of(Name, record_info(fields, state)),
setelement(Pos+1, State, Value). setelement(Pos+1, State, Value).
get_state(Pid) ->
State = sys:get_state(Pid),
maps:from_list(lists:zip(record_info(fields, state),
tl(tuple_to_list(State)))).

View File

@ -22,6 +22,7 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg, -define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg,
recv_oct, recv_cnt, send_oct, send_cnt, recv_oct, recv_cnt, send_oct, send_cnt,
@ -38,6 +39,19 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]). emqx_ct_helpers:stop_apps([]).
init_per_testcase(TestCase, Config) ->
case erlang:function_exported(?MODULE, TestCase, 2) of
true -> ?MODULE:TestCase(init, Config);
false -> Config
end.
end_per_testcase(TestCase, Config) ->
case erlang:function_exported(?MODULE, TestCase, 2) of
true -> ?MODULE:TestCase('end', Config);
false -> ok
end,
Config.
t_conn_stats(_) -> t_conn_stats(_) ->
with_client(fun(CPid) -> with_client(fun(CPid) ->
Stats = emqx_connection:stats(CPid), Stats = emqx_connection:stats(CPid),
@ -134,3 +148,41 @@ with_client(TestFun, _Options) ->
emqtt:stop(C) emqtt:stop(C)
end. end.
t_async_set_keepalive(init, Config) ->
ok = snabbkaffe:start_trace(),
Config;
t_async_set_keepalive('end', _Config) ->
snabbkaffe:stop(),
ok.
t_async_set_keepalive(_) ->
ClientID = <<"client-tcp-keepalive">>,
{ok, Client} = emqtt:start_link([{host, "localhost"},
{proto_ver,v5},
{clientid, ClientID},
{clean_start, false}]),
{ok, _} = emqtt:connect(Client),
{ok, _} = ?block_until(#{?snk_kind := insert_channel_info,
client_id := ClientID}, 2000, 100),
[Pid] = emqx_cm:lookup_channels(ClientID),
State = emqx_connection:get_state(Pid),
Transport = maps:get(transport, State),
Socket = maps:get(socket, State),
?assert(is_port(Socket)),
Opts = [{raw, 6, 4, 4}, {raw, 6, 5, 4}, {raw, 6, 6, 4}],
{ok, [ {raw, 6, 4, <<Idle:32/native>>}
, {raw, 6, 5, <<Interval:32/native>>}
, {raw, 6, 6, <<Probes:32/native>>}
]} = Transport:getopts(Socket, Opts),
ct:pal("Idle=~p, Interval=~p, Probes=~p", [Idle, Interval, Probes]),
emqx_connection:async_set_keepalive(Pid, Idle + 1, Interval + 1, Probes + 1),
{ok, _} = ?block_until(#{?snk_kind := "custom_socket_options_successfully"}, 1000),
{ok, [ {raw, 6, 4, <<NewIdle:32/native>>}
, {raw, 6, 5, <<NewInterval:32/native>>}
, {raw, 6, 6, <<NewProbes:32/native>>}
]} = Transport:getopts(Socket, Opts),
?assertEqual(NewIdle, Idle + 1),
?assertEqual(NewInterval, Interval + 1),
?assertEqual(NewProbes, Probes + 1),
emqtt:stop(Client),
ok.