feat(gw-conn): support the incoming keepalive oct
This commit is contained in:
parent
ef6a38bfd2
commit
0f79ffca01
|
@ -568,16 +568,22 @@ handle_timeout(_TRef, limit_timeout, State) ->
|
||||||
limit_timer = undefined
|
limit_timer = undefined
|
||||||
},
|
},
|
||||||
handle_info(activate_socket, NState);
|
handle_info(activate_socket, NState);
|
||||||
handle_timeout(TRef, keepalive, State = #state{
|
handle_timeout(TRef, Keepalive, State = #state{
|
||||||
chann_mod = ChannMod,
|
chann_mod = ChannMod,
|
||||||
socket = Socket,
|
socket = Socket,
|
||||||
channel = Channel})->
|
channel = Channel})
|
||||||
|
when Keepalive == keepalive;
|
||||||
|
Keepalive == keepalive_send ->
|
||||||
|
Stat = case Keepalive of
|
||||||
|
keepalive -> recv_oct;
|
||||||
|
keepalive_send -> send_oct
|
||||||
|
end,
|
||||||
case ChannMod:info(conn_state, Channel) of
|
case ChannMod:info(conn_state, Channel) of
|
||||||
disconnected -> {ok, State};
|
disconnected -> {ok, State};
|
||||||
_ ->
|
_ ->
|
||||||
case esockd_getstat(Socket, [recv_oct, send_oct]) of
|
case esockd_getstat(Socket, [Stat]) of
|
||||||
{ok, [{recv_oct, RecvOct}, {send_oct, SendOct}]} ->
|
{ok, [{Stat, RecvOct}]} ->
|
||||||
handle_timeout(TRef, {keepalive, {RecvOct, SendOct}}, State);
|
handle_timeout(TRef, {Keepalive, RecvOct}, State);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
handle_info({sock_error, Reason}, State)
|
handle_info({sock_error, Reason}, State)
|
||||||
end
|
end
|
||||||
|
|
|
@ -1272,7 +1272,7 @@ handle_timeout(_TRef, {keepalive, _StatVal},
|
||||||
when ConnState =:= disconnected;
|
when ConnState =:= disconnected;
|
||||||
ConnState =:= asleep ->
|
ConnState =:= asleep ->
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
handle_timeout(_TRef, {keepalive, {StatVal, _}},
|
handle_timeout(_TRef, {keepalive, StatVal},
|
||||||
Channel = #channel{keepalive = Keepalive}) ->
|
Channel = #channel{keepalive = Keepalive}) ->
|
||||||
case emqx_keepalive:check(StatVal, Keepalive) of
|
case emqx_keepalive:check(StatVal, Keepalive) of
|
||||||
{ok, NKeepalive} ->
|
{ok, NKeepalive} ->
|
||||||
|
|
|
@ -42,6 +42,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ handle_call/2
|
-export([ handle_call/2
|
||||||
|
, handle_cast/2
|
||||||
, handle_info/2
|
, handle_info/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -84,8 +85,8 @@
|
||||||
-type(replies() :: stomp_frame() | reply() | [reply()]).
|
-type(replies() :: stomp_frame() | reply() | [reply()]).
|
||||||
|
|
||||||
-define(TIMER_TABLE, #{
|
-define(TIMER_TABLE, #{
|
||||||
incoming_timer => incoming,
|
incoming_timer => keepalive,
|
||||||
outgoing_timer => outgoing,
|
outgoing_timer => keepalive_send,
|
||||||
clean_trans_timer => clean_trans
|
clean_trans_timer => clean_trans
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -740,7 +741,7 @@ handle_deliver(Delivers,
|
||||||
| {ok, replies(), channel()}
|
| {ok, replies(), channel()}
|
||||||
| {shutdown, Reason :: term(), channel()}).
|
| {shutdown, Reason :: term(), channel()}).
|
||||||
|
|
||||||
handle_timeout(_TRef, {incoming, NewVal},
|
handle_timeout(_TRef, {keepalive, NewVal},
|
||||||
Channel = #channel{heartbeat = HrtBt}) ->
|
Channel = #channel{heartbeat = HrtBt}) ->
|
||||||
case emqx_stomp_heartbeat:check(incoming, NewVal, HrtBt) of
|
case emqx_stomp_heartbeat:check(incoming, NewVal, HrtBt) of
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
|
@ -751,7 +752,7 @@ handle_timeout(_TRef, {incoming, NewVal},
|
||||||
)}
|
)}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_timeout(_TRef, {outgoing, NewVal},
|
handle_timeout(_TRef, {keepalive_send, NewVal},
|
||||||
Channel = #channel{heartbeat = HrtBt}) ->
|
Channel = #channel{heartbeat = HrtBt}) ->
|
||||||
case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
|
case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
|
|
Loading…
Reference in New Issue