diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index cf505a873..b15a2ff79 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -111,6 +111,7 @@ start_link() -> insert_channel_info(ClientId, Info, Stats) -> Chan = {ClientId, self()}, true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}), + ?tp(debug, insert_channel_info, #{client_id => ClientId}), ok. %% @private diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 8b4940308..814fd9007 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -41,8 +41,13 @@ , stats/1 ]). +-export([ async_set_keepalive/4 + , async_set_socket_options/2 + ]). + -export([ call/2 , call/3 + , cast/2 ]). %% Callback @@ -56,7 +61,7 @@ ]). %% Internal callback --export([wakeup_from_hib/2, recvloop/2]). +-export([wakeup_from_hib/2, recvloop/2, get_state/1]). %% Export for CT -export([set_field/3]). @@ -184,6 +189,35 @@ stats(#state{transport = Transport, ProcStats = emqx_misc:proc_stats(), lists:append([SockStats, ConnStats, ChanStats, ProcStats]). +%% @doc Set TCP keepalive socket options to override system defaults. +%% Idle: The number of seconds a connection needs to be idle before +%% TCP begins sending out keep-alive probes (Linux default 7200). +%% Interval: The number of seconds between TCP keep-alive probes +%% (Linux default 75). +%% Probes: The maximum number of TCP keep-alive probes to send before +%% giving up and killing the connection if no response is +%% obtained from the other end (Linux default 9). +%% +%% NOTE: This API sets TCP socket options, which has nothing to do with +%% the MQTT layer's keepalive (PINGREQ and PINGRESP). +async_set_keepalive(Pid, Idle, Interval, Probes) -> + Options = [ {keepalive, true} + , {raw, 6, 4, <>} + , {raw, 6, 5, <>} + , {raw, 6, 6, <>} + ], + async_set_socket_options(Pid, Options). + +%% @doc Set custom socket options. +%% This API is made async because the call might be originated from +%% a hookpoint callback (otherwise deadlock). +%% If failed to set, the error message is logged. +async_set_socket_options(Pid, Options) -> + cast(Pid, {async_set_socket_options, Options}). + +cast(Pid, Req) -> + gen_server:cast(Pid, Req). + call(Pid, Req) -> call(Pid, Req, infinity). call(Pid, Req, Timeout) -> @@ -366,6 +400,9 @@ handle_msg({'$gen_call', From, Req}, State) -> gen_server:reply(From, Reply), stop(Reason, NState) end; +handle_msg({'$gen_cast', Req}, State) -> + NewState = handle_cast(Req, State), + {ok, NewState}; handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> ?LOG(debug, "RECV ~0p", [Data]), @@ -692,6 +729,22 @@ handle_info({sock_error, Reason}, State) -> handle_info(Info, State) -> with_channel(handle_info, [Info], State). +%%-------------------------------------------------------------------- +%% Handle Info + +handle_cast({async_set_socket_options, Opts}, + State = #state{transport = Transport, + socket = Socket + }) -> + case Transport:setopts(Socket, Opts) of + ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts}); + Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err}) + end, + State; +handle_cast(Req, State) -> + ?tp(error, "received_unknown_cast", #{cast => Req}), + State. + %%-------------------------------------------------------------------- %% Ensure rate limit @@ -820,3 +873,7 @@ set_field(Name, Value, State) -> Pos = emqx_misc:index_of(Name, record_info(fields, state)), setelement(Pos+1, State, Value). +get_state(Pid) -> + State = sys:get_state(Pid), + maps:from_list(lists:zip(record_info(fields, state), + tl(tuple_to_list(State)))). diff --git a/test/emqx_mqtt_SUITE.erl b/test/emqx_mqtt_SUITE.erl index cb6174712..c86d6334a 100644 --- a/test/emqx_mqtt_SUITE.erl +++ b/test/emqx_mqtt_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(STATS_KYES, [recv_pkt, recv_msg, send_pkt, send_msg, recv_oct, recv_cnt, send_oct, send_cnt, @@ -38,6 +39,19 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +init_per_testcase(TestCase, Config) -> + case erlang:function_exported(?MODULE, TestCase, 2) of + true -> ?MODULE:TestCase(init, Config); + false -> Config + end. + +end_per_testcase(TestCase, Config) -> + case erlang:function_exported(?MODULE, TestCase, 2) of + true -> ?MODULE:TestCase('end', Config); + false -> ok + end, + Config. + t_conn_stats(_) -> with_client(fun(CPid) -> Stats = emqx_connection:stats(CPid), @@ -134,3 +148,41 @@ with_client(TestFun, _Options) -> emqtt:stop(C) end. +t_async_set_keepalive(init, Config) -> + ok = snabbkaffe:start_trace(), + Config; +t_async_set_keepalive('end', _Config) -> + snabbkaffe:stop(), + ok. + +t_async_set_keepalive(_) -> + ClientID = <<"client-tcp-keepalive">>, + {ok, Client} = emqtt:start_link([{host, "localhost"}, + {proto_ver,v5}, + {clientid, ClientID}, + {clean_start, false}]), + {ok, _} = emqtt:connect(Client), + {ok, _} = ?block_until(#{?snk_kind := insert_channel_info, + client_id := ClientID}, 2000, 100), + [Pid] = emqx_cm:lookup_channels(ClientID), + State = emqx_connection:get_state(Pid), + Transport = maps:get(transport, State), + Socket = maps:get(socket, State), + ?assert(is_port(Socket)), + Opts = [{raw, 6, 4, 4}, {raw, 6, 5, 4}, {raw, 6, 6, 4}], + {ok, [ {raw, 6, 4, <>} + , {raw, 6, 5, <>} + , {raw, 6, 6, <>} + ]} = Transport:getopts(Socket, Opts), + ct:pal("Idle=~p, Interval=~p, Probes=~p", [Idle, Interval, Probes]), + emqx_connection:async_set_keepalive(Pid, Idle + 1, Interval + 1, Probes + 1), + {ok, _} = ?block_until(#{?snk_kind := "custom_socket_options_successfully"}, 1000), + {ok, [ {raw, 6, 4, <>} + , {raw, 6, 5, <>} + , {raw, 6, 6, <>} + ]} = Transport:getopts(Socket, Opts), + ?assertEqual(NewIdle, Idle + 1), + ?assertEqual(NewInterval, Interval + 1), + ?assertEqual(NewProbes, Probes + 1), + emqtt:stop(Client), + ok.