test: ensure udp client keepalive value getting right value
This commit is contained in:
parent
ebb2824e15
commit
f8614196ac
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
-include_lib("emqx/include/types.hrl").
|
-include_lib("emqx/include/types.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([
|
-export([
|
||||||
|
@ -51,6 +52,9 @@
|
||||||
%% Internal callback
|
%% Internal callback
|
||||||
-export([wakeup_from_hib/2, recvloop/2]).
|
-export([wakeup_from_hib/2, recvloop/2]).
|
||||||
|
|
||||||
|
%% for channel module
|
||||||
|
-export([keepalive_stats/1]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
%% TCP/SSL/UDP/DTLS Wrapped Socket
|
%% TCP/SSL/UDP/DTLS Wrapped Socket
|
||||||
socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
|
socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
|
||||||
|
@ -573,9 +577,15 @@ terminate(
|
||||||
channel = Channel
|
channel = Channel
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
?SLOG(debug, #{msg => "conn_process_terminated", reason => Reason}),
|
|
||||||
_ = ChannMod:terminate(Reason, Channel),
|
_ = ChannMod:terminate(Reason, Channel),
|
||||||
_ = close_socket(State),
|
_ = close_socket(State),
|
||||||
|
ClientId =
|
||||||
|
try ChannMod:info(clientid, Channel) of
|
||||||
|
Id -> Id
|
||||||
|
catch
|
||||||
|
_:_ -> undefined
|
||||||
|
end,
|
||||||
|
?tp(debug, conn_process_terminated, #{reason => Reason, clientid => ClientId}),
|
||||||
exit(Reason).
|
exit(Reason).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -655,12 +665,7 @@ handle_timeout(
|
||||||
disconnected ->
|
disconnected ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
_ ->
|
_ ->
|
||||||
case keepalive_stats(Stat) of
|
handle_timeout(TRef, {Keepalive, keepalive_stats(Stat)}, State)
|
||||||
{ok, Oct} ->
|
|
||||||
handle_timeout(TRef, {Keepalive, Oct}, State);
|
|
||||||
{error, Reason} ->
|
|
||||||
handle_info({sock_error, Reason}, State)
|
|
||||||
end
|
|
||||||
end;
|
end;
|
||||||
handle_timeout(
|
handle_timeout(
|
||||||
_TRef,
|
_TRef,
|
||||||
|
|
|
@ -84,8 +84,6 @@
|
||||||
|
|
||||||
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
|
||||||
|
|
||||||
-define(DEFAULT_IDLE_TIMEOUT, 30000).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Info, Attrs and Caps
|
%% Info, Attrs and Caps
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -154,7 +152,7 @@ init(
|
||||||
Ctx = maps:get(ctx, Options),
|
Ctx = maps:get(ctx, Options),
|
||||||
GRpcChann = maps:get(handler, Options),
|
GRpcChann = maps:get(handler, Options),
|
||||||
PoolName = maps:get(pool_name, Options),
|
PoolName = maps:get(pool_name, Options),
|
||||||
IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT),
|
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
|
||||||
|
|
||||||
NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}),
|
NConnInfo = default_conninfo(ConnInfo#{idle_timeout => IdleTimeout}),
|
||||||
ListenerId =
|
ListenerId =
|
||||||
|
@ -301,7 +299,7 @@ handle_timeout(
|
||||||
Req = #{type => 'KEEPALIVE'},
|
Req = #{type => 'KEEPALIVE'},
|
||||||
NChannel = clean_timer(alive_timer, Channel),
|
NChannel = clean_timer(alive_timer, Channel),
|
||||||
%% close connection if keepalive timeout
|
%% close connection if keepalive timeout
|
||||||
Replies = [{event, disconnected}, {close, normal}],
|
Replies = [{event, disconnected}, {close, keepalive_timeout}],
|
||||||
{ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
|
{ok, Replies, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
|
||||||
end;
|
end;
|
||||||
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
||||||
|
@ -667,7 +665,8 @@ ensure_keepalive(Channel = #channel{clientinfo = ClientInfo}) ->
|
||||||
ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
|
ensure_keepalive_timer(Interval, Channel) when Interval =< 0 ->
|
||||||
Channel;
|
Channel;
|
||||||
ensure_keepalive_timer(Interval, Channel) ->
|
ensure_keepalive_timer(Interval, Channel) ->
|
||||||
Keepalive = emqx_keepalive:init(timer:seconds(Interval)),
|
StatVal = emqx_gateway_conn:keepalive_stats(recv_oct),
|
||||||
|
Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)),
|
||||||
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
|
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
|
||||||
|
|
||||||
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
|
ensure_timer(Name, Channel = #channel{timers = Timers}) ->
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx_hooks.hrl").
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-import(
|
-import(
|
||||||
emqx_exproto_echo_svr,
|
emqx_exproto_echo_svr,
|
||||||
|
@ -38,6 +39,7 @@
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-define(TCPOPTS, [binary, {active, false}]).
|
-define(TCPOPTS, [binary, {active, false}]).
|
||||||
-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
|
-define(DTLSOPTS, [binary, {active, false}, {protocol, dtls}]).
|
||||||
|
@ -223,14 +225,16 @@ t_acl_deny(Cfg) ->
|
||||||
close(Sock).
|
close(Sock).
|
||||||
|
|
||||||
t_keepalive_timeout(Cfg) ->
|
t_keepalive_timeout(Cfg) ->
|
||||||
|
ok = snabbkaffe:start_trace(),
|
||||||
SockType = proplists:get_value(listener_type, Cfg),
|
SockType = proplists:get_value(listener_type, Cfg),
|
||||||
Sock = open(SockType),
|
Sock = open(SockType),
|
||||||
|
|
||||||
|
ClientId1 = <<"keepalive_test_client1">>,
|
||||||
Client = #{
|
Client = #{
|
||||||
proto_name => <<"demo">>,
|
proto_name => <<"demo">>,
|
||||||
proto_ver => <<"v0.1">>,
|
proto_ver => <<"v0.1">>,
|
||||||
clientid => <<"test_client_1">>,
|
clientid => ClientId1,
|
||||||
keepalive => 2
|
keepalive => 5
|
||||||
},
|
},
|
||||||
Password = <<"123456">>,
|
Password = <<"123456">>,
|
||||||
|
|
||||||
|
@ -238,18 +242,41 @@ t_keepalive_timeout(Cfg) ->
|
||||||
ConnAckBin = frame_connack(0),
|
ConnAckBin = frame_connack(0),
|
||||||
|
|
||||||
send(Sock, ConnBin),
|
send(Sock, ConnBin),
|
||||||
{ok, ConnAckBin} = recv(Sock, 5000),
|
{ok, ConnAckBin} = recv(Sock),
|
||||||
|
|
||||||
%% Timed out connections are closed immediately,
|
case SockType of
|
||||||
%% so there may not be a disconnect message here
|
udp ->
|
||||||
%%DisconnectBin = frame_disconnect(),
|
%% another udp client should not affect the first
|
||||||
%%{ok, DisconnectBin} = recv(Sock, 10000),
|
%% udp client keepalive check
|
||||||
|
timer:sleep(4000),
|
||||||
SockType =/= udp andalso
|
Sock2 = open(SockType),
|
||||||
begin
|
ConnBin2 = frame_connect(
|
||||||
{error, closed} = recv(Sock, 5000)
|
Client#{clientid => <<"keepalive_test_client2">>},
|
||||||
end,
|
Password
|
||||||
ok.
|
),
|
||||||
|
send(Sock2, ConnBin2),
|
||||||
|
%% first client will be keepalive timeouted in 6s
|
||||||
|
?assertMatch(
|
||||||
|
{ok, #{
|
||||||
|
clientid := ClientId1,
|
||||||
|
reason := {shutdown, {sock_closed, keepalive_timeout}}
|
||||||
|
}},
|
||||||
|
?block_until(#{?snk_kind := conn_process_terminated}, 8000)
|
||||||
|
);
|
||||||
|
_ ->
|
||||||
|
?assertMatch(
|
||||||
|
{ok, #{
|
||||||
|
clientid := ClientId1,
|
||||||
|
reason := {shutdown, {sock_closed, keepalive_timeout}}
|
||||||
|
}},
|
||||||
|
?block_until(#{?snk_kind := conn_process_terminated}, 12000)
|
||||||
|
),
|
||||||
|
Trace = snabbkaffe:collect_trace(),
|
||||||
|
%% conn process should be terminated
|
||||||
|
?assertEqual(1, length(?of_kind(conn_process_terminated, Trace))),
|
||||||
|
%% socket port should be closed
|
||||||
|
?assertEqual({error, closed}, recv(Sock, 5000))
|
||||||
|
end.
|
||||||
|
|
||||||
t_hook_connected_disconnected(Cfg) ->
|
t_hook_connected_disconnected(Cfg) ->
|
||||||
SockType = proplists:get_value(listener_type, Cfg),
|
SockType = proplists:get_value(listener_type, Cfg),
|
||||||
|
@ -424,6 +451,9 @@ send({ssl, Sock}, Bin) ->
|
||||||
send({dtls, Sock}, Bin) ->
|
send({dtls, Sock}, Bin) ->
|
||||||
ssl:send(Sock, Bin).
|
ssl:send(Sock, Bin).
|
||||||
|
|
||||||
|
recv(Sock) ->
|
||||||
|
recv(Sock, infinity).
|
||||||
|
|
||||||
recv({tcp, Sock}, Ts) ->
|
recv({tcp, Sock}, Ts) ->
|
||||||
gen_tcp:recv(Sock, 0, Ts);
|
gen_tcp:recv(Sock, 0, Ts);
|
||||||
recv({udp, Sock}, Ts) ->
|
recv({udp, Sock}, Ts) ->
|
||||||
|
|
Loading…
Reference in New Issue