chore(exproto): start idle timer for udp clients

This commit is contained in:
JianBo He 2022-08-02 10:36:43 +08:00
parent 763f0a852d
commit 9e2987034b
1 changed files with 28 additions and 4 deletions

View File

@ -76,7 +76,8 @@
-define(TIMER_TABLE, #{ -define(TIMER_TABLE, #{
alive_timer => keepalive, alive_timer => keepalive,
force_timer => force_close force_timer => force_close,
idle_timer => force_close_idle
}). }).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
@ -94,6 +95,8 @@
awaiting_rel_max awaiting_rel_max
]). ]).
-define(DEFAULT_IDLE_TIMEOUT, 30000).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Info, Attrs and Caps %% Info, Attrs and Caps
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -149,8 +152,12 @@ init(ConnInfo = #{socktype := Socktype,
GRpcChann = proplists:get_value(handler, Options), GRpcChann = proplists:get_value(handler, Options),
NConnInfo = default_conninfo(ConnInfo), NConnInfo = default_conninfo(ConnInfo),
ClientInfo = default_clientinfo(ConnInfo), ClientInfo = default_clientinfo(ConnInfo),
IdleTimeout = proplists:get_value(idle_timeout, Options, ?DEFAULT_IDLE_TIMEOUT),
NConnInfo1 = NConnInfo#{idle_timeout => IdleTimeout},
Channel = #channel{gcli = #{channel => GRpcChann}, Channel = #channel{gcli = #{channel => GRpcChann},
conninfo = NConnInfo, conninfo = NConnInfo1,
clientinfo = ClientInfo, clientinfo = ClientInfo,
conn_state = accepted, conn_state = accepted,
timers = #{} timers = #{}
@ -165,7 +172,8 @@ init(ConnInfo = #{socktype := Socktype,
#{socktype => socktype(Socktype), #{socktype => socktype(Socktype),
peername => address(Peername), peername => address(Peername),
sockname => address(Sockname)})}, sockname => address(Sockname)})},
try_dispatch(on_socket_created, wrap(Req), Channel) start_idle_checking_timer(
try_dispatch(on_socket_created, wrap(Req), Channel))
end. end.
register_the_anonymous_client(ClientInfo, ConnInfo) -> register_the_anonymous_client(ClientInfo, ConnInfo) ->
@ -184,6 +192,12 @@ register_the_anonymous_client(ClientInfo, ConnInfo) ->
unregister_the_anonymous_client(ClientId) -> unregister_the_anonymous_client(ClientId) ->
emqx_cm:unregister_channel(ClientId). emqx_cm:unregister_channel(ClientId).
start_idle_checking_timer(Channel = #channel{conninfo = #{socktype := udp}}) ->
ensure_timer(idle_timer, Channel);
start_idle_checking_timer(Channel) ->
Channel.
%% @private %% @private
peercert(NoSsl, ConnInfo) when NoSsl == nossl; peercert(NoSsl, ConnInfo) when NoSsl == nossl;
NoSsl == undefined -> NoSsl == undefined ->
@ -267,6 +281,9 @@ handle_timeout(_TRef, {keepalive, StatVal},
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) -> handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
{shutdown, Reason, Channel}; {shutdown, Reason, Channel};
handle_timeout(_TRef, force_close_idle, Channel) ->
{shutdown, idle_timeout, Channel};
handle_timeout(_TRef, Msg, Channel) -> handle_timeout(_TRef, Msg, Channel) ->
?WARN("Unexpected timeout: ~p", [Msg]), ?WARN("Unexpected timeout: ~p", [Msg]),
{ok, Channel}. {ok, Channel}.
@ -328,7 +345,8 @@ handle_call({start_timer, keepalive, Interval},
NConnInfo = ConnInfo#{keepalive => Interval}, NConnInfo = ConnInfo#{keepalive => Interval},
NClientInfo = ClientInfo#{keepalive => Interval}, NClientInfo = ClientInfo#{keepalive => Interval},
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}, NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
{reply, ok, [{event, updated}], ensure_keepalive(NChannel)}; {reply, ok, [{event, updated}],
ensure_keepalive(cancel_timer(idle_timer, NChannel))};
handle_call({subscribe, TopicFilter, Qos}, handle_call({subscribe, TopicFilter, Qos},
Channel = #channel{ Channel = #channel{
@ -561,6 +579,12 @@ reset_timer(Name, Channel) ->
clean_timer(Name, Channel = #channel{timers = Timers}) -> clean_timer(Name, Channel = #channel{timers = Timers}) ->
Channel#channel{timers = maps:remove(Name, Timers)}. Channel#channel{timers = maps:remove(Name, Timers)}.
cancel_timer(Name, Channel = #channel{timers = Timers}) ->
emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)),
clean_timer(Name, Channel).
interval(idle_timer, #channel{conninfo = #{idle_timeout := IdleTimeout}}) ->
IdleTimeout;
interval(force_timer, _) -> interval(force_timer, _) ->
15000; 15000;
interval(alive_timer, #channel{keepalive = Keepalive}) -> interval(alive_timer, #channel{keepalive = Keepalive}) ->