Merge pull request #8575 from HJianBo/fix-exproto-on-udp
Fix(exproto): refine keepalive timer checking
This commit is contained in:
commit
836b988b91
|
@ -20,6 +20,8 @@ File format:
|
|||
- Fixed crash when shared persistent subscription [#8441]
|
||||
- Fixed issue in Lua hook that prevented messages from being
|
||||
rejected [#8535]
|
||||
- Fix ExProto UDP client keepalive checking error.
|
||||
This causes the clients to not expire as long as a new UDP packet arrives [#8575]
|
||||
|
||||
### Enhancements
|
||||
- HTTP API(GET /rules/) support for pagination and fuzzy filtering. [#8450]
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_exproto,
|
||||
[{description, "EMQ X Extension for Protocol"},
|
||||
{vsn, "4.3.8"}, %% 4.3.3 is used by ee
|
||||
{vsn, "4.3.9"}, %% 4.3.3 is used by ee
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{mod, {emqx_exproto_app, []}},
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{<<"4\\.3\\.[6-7]">>,
|
||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[2-5]">>,
|
||||
[{<<"4\\.3\\.[2-8]">>,
|
||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[0-1]">>,
|
||||
|
@ -12,9 +10,7 @@
|
|||
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{<<"4\\.3\\.[6-7]">>,
|
||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[2-5]">>,
|
||||
[{<<"4\\.3\\.[2-8]">>,
|
||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[0-1]">>,
|
||||
|
|
|
@ -260,7 +260,8 @@ handle_timeout(_TRef, {keepalive, StatVal},
|
|||
{ok, reset_timer(alive_timer, NChannel)};
|
||||
{error, timeout} ->
|
||||
Req = #{type => 'KEEPALIVE'},
|
||||
{ok, try_dispatch(on_timer_timeout, wrap(Req), Channel)}
|
||||
NChannel = clean_timer(alive_timer, Channel),
|
||||
{ok, try_dispatch(on_timer_timeout, wrap(Req), NChannel)}
|
||||
end;
|
||||
|
||||
handle_timeout(_TRef, force_close, Channel = #channel{closed_reason = Reason}) ->
|
||||
|
@ -327,7 +328,7 @@ handle_call({start_timer, keepalive, Interval},
|
|||
NConnInfo = ConnInfo#{keepalive => Interval},
|
||||
NClientInfo = ClientInfo#{keepalive => Interval},
|
||||
NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo},
|
||||
{reply, ok, ensure_keepalive(NChannel)};
|
||||
{reply, ok, [{event, updated}], ensure_keepalive(NChannel)};
|
||||
|
||||
handle_call({subscribe, TopicFilter, Qos},
|
||||
Channel = #channel{
|
||||
|
@ -339,13 +340,13 @@ handle_call({subscribe, TopicFilter, Qos},
|
|||
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
||||
_ ->
|
||||
{ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
|
||||
{reply, ok, NChannel}
|
||||
{reply, ok, [{event, updated}], NChannel}
|
||||
end;
|
||||
|
||||
handle_call({unsubscribe, TopicFilter},
|
||||
Channel = #channel{conn_state = connected}) ->
|
||||
{ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel),
|
||||
{reply, ok, NChannel};
|
||||
{reply, ok, [{event, updated}], NChannel};
|
||||
|
||||
handle_call({publish, Topic, Qos, Payload},
|
||||
Channel = #channel{
|
||||
|
|
|
@ -211,16 +211,35 @@ esockd_setopts({esockd_transport, Socket}, Opts) ->
|
|||
%% FIXME: DTLS works??
|
||||
esockd_transport:setopts(Socket, Opts).
|
||||
|
||||
esockd_getstat({udp, _SockPid, Sock}, Stats) ->
|
||||
inet:getstat(Sock, Stats);
|
||||
esockd_getstat({esockd_transport, Sock}, Stats) ->
|
||||
esockd_transport:getstat(Sock, Stats).
|
||||
esockd_transport:getstat(Sock, Stats);
|
||||
esockd_getstat({udp, _SockPid, _Sock}, Stats) ->
|
||||
{ok, lists:map(fun(K) -> {K, get_stats(K)} end, Stats)}.
|
||||
|
||||
send(Data, #state{socket = {udp, _SockPid, Sock}, peername = {Ip, Port}}) ->
|
||||
send(Data, State = #state{socket = {udp, _SockPid, Sock}, peername = {Ip, Port}}) ->
|
||||
incr_send_stats(Data, State),
|
||||
gen_udp:send(Sock, Ip, Port, Data);
|
||||
send(Data, #state{socket = {esockd_transport, Sock}}) ->
|
||||
esockd_transport:async_send(Sock, Data).
|
||||
|
||||
incr_recv_stats(Data, #state{socket = {udp, _, _}}) ->
|
||||
incr_stats(recv_oct, byte_size(Data)),
|
||||
incr_stats(recv_cnt, 1).
|
||||
|
||||
incr_send_stats(Data, #state{socket = {udp, _, _}}) ->
|
||||
incr_stats(send_oct, byte_size(Data)),
|
||||
incr_stats(send_cnt, 1).
|
||||
|
||||
incr_stats(Key, Cnt) ->
|
||||
Cnt0 = get_stats(Key),
|
||||
put({stats, Key}, Cnt0 + Cnt).
|
||||
|
||||
get_stats(Key) ->
|
||||
case get({stats, Key}) of
|
||||
undefined -> 0;
|
||||
Cnt -> Cnt
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -386,6 +405,7 @@ handle_msg({'$gen_cast', Req}, State) ->
|
|||
with_channel(handle_cast, [Req], State);
|
||||
|
||||
handle_msg({datagram, _SockPid, Data}, State) ->
|
||||
incr_recv_stats(Data, State),
|
||||
process_incoming(Data, State);
|
||||
|
||||
handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
|
||||
|
@ -450,11 +470,11 @@ handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
|
|||
emqx_cm:connection_closed(ClientId),
|
||||
{ok, State};
|
||||
|
||||
%handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
||||
% ClientId = emqx_exproto_channel:info(clientid, Channel),
|
||||
% emqx_cm:set_chan_info(ClientId, info(State)),
|
||||
% emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||
% {ok, State};
|
||||
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
|
||||
ClientId = emqx_exproto_channel:info(clientid, Channel),
|
||||
emqx_cm:set_chan_info(ClientId, info(State)),
|
||||
emqx_cm:set_chan_stats(ClientId, stats(State)),
|
||||
{ok, State};
|
||||
|
||||
handle_msg({timeout, TRef, TMsg}, State) ->
|
||||
handle_timeout(TRef, TMsg, State);
|
||||
|
|
Loading…
Reference in New Issue