fix(coap): increase received packet counter for keepalive
Fixes https://emqx.atlassian.net/browse/EMQX-11193 Fixes https://github.com/emqx/emqx/issues/11779
This commit is contained in:
parent
16cc816bd3
commit
5b9866f630
|
@ -86,7 +86,6 @@
|
||||||
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
|
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]).
|
||||||
|
|
||||||
-define(DEF_IDLE_TIME, timer:seconds(30)).
|
-define(DEF_IDLE_TIME, timer:seconds(30)).
|
||||||
-define(GET_IDLE_TIME(Cfg), maps:get(idle_timeout, Cfg, ?DEF_IDLE_TIME)).
|
|
||||||
|
|
||||||
-import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
|
-import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]).
|
||||||
|
|
||||||
|
@ -150,8 +149,7 @@ init(
|
||||||
mountpoint => Mountpoint
|
mountpoint => Mountpoint
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
%% FIXME: it should coap.hearbeat instead of idle_timeout?
|
Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_TIME),
|
||||||
Heartbeat = ?GET_IDLE_TIME(Config),
|
|
||||||
#channel{
|
#channel{
|
||||||
ctx = Ctx,
|
ctx = Ctx,
|
||||||
conninfo = ConnInfo,
|
conninfo = ConnInfo,
|
||||||
|
@ -179,8 +177,8 @@ send_request(Channel, Request) ->
|
||||||
| {ok, replies(), channel()}
|
| {ok, replies(), channel()}
|
||||||
| {shutdown, Reason :: term(), channel()}
|
| {shutdown, Reason :: term(), channel()}
|
||||||
| {shutdown, Reason :: term(), replies(), channel()}.
|
| {shutdown, Reason :: term(), replies(), channel()}.
|
||||||
handle_in(Msg, ChannleT) ->
|
handle_in(Msg, Channel0) ->
|
||||||
Channel = ensure_keepalive_timer(ChannleT),
|
Channel = ensure_keepalive_timer(Channel0),
|
||||||
case emqx_coap_message:is_request(Msg) of
|
case emqx_coap_message:is_request(Msg) of
|
||||||
true ->
|
true ->
|
||||||
check_auth_state(Msg, Channel);
|
check_auth_state(Msg, Channel);
|
||||||
|
@ -321,6 +319,9 @@ handle_call(Req, _From, Channel) ->
|
||||||
handle_cast(close, Channel) ->
|
handle_cast(close, Channel) ->
|
||||||
?SLOG(info, #{msg => "close_connection"}),
|
?SLOG(info, #{msg => "close_connection"}),
|
||||||
shutdown(normal, Channel);
|
shutdown(normal, Channel);
|
||||||
|
handle_cast(inc_recv_pkt, Channel) ->
|
||||||
|
_ = emqx_pd:inc_counter(recv_pkt, 1),
|
||||||
|
{ok, Channel};
|
||||||
handle_cast(Req, Channel) ->
|
handle_cast(Req, Channel) ->
|
||||||
?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
|
?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
@ -455,6 +456,13 @@ check_token(
|
||||||
Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
|
Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
|
||||||
{shutdown, normal, Reply, Channel};
|
{shutdown, normal, Reply, Channel};
|
||||||
true ->
|
true ->
|
||||||
|
%% hack: since each message request can spawn a new connection
|
||||||
|
%% process, we can't rely on the `inc_incoming_stats' call in
|
||||||
|
%% `emqx_gateway_conn:handle_incoming' to properly keep track of
|
||||||
|
%% bumping incoming requests for an existing channel. Since this
|
||||||
|
%% number is used by keepalive, we have to bump it inside the
|
||||||
|
%% requested channel/connection pid so heartbeats actually work.
|
||||||
|
emqx_gateway_cm:cast(coap, ReqClientId, inc_recv_pkt),
|
||||||
call_session(handle_request, Msg, Channel)
|
call_session(handle_request, Msg, Channel)
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
|
|
|
@ -83,10 +83,26 @@ init_per_testcase(t_connection_with_authn_failed, Config) ->
|
||||||
fun(_) -> {error, bad_username_or_password} end
|
fun(_) -> {error, bad_username_or_password} end
|
||||||
),
|
),
|
||||||
Config;
|
Config;
|
||||||
|
init_per_testcase(t_heartbeat, Config) ->
|
||||||
|
NewHeartbeat = 800,
|
||||||
|
OldConf = emqx:get_raw_config([gateway, coap]),
|
||||||
|
{ok, _} = emqx_gateway_conf:update_gateway(
|
||||||
|
coap,
|
||||||
|
OldConf#{<<"heartbeat">> => <<"800ms">>}
|
||||||
|
),
|
||||||
|
[
|
||||||
|
{old_conf, OldConf},
|
||||||
|
{new_heartbeat, NewHeartbeat}
|
||||||
|
| Config
|
||||||
|
];
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
ok = meck:new(emqx_access_control, [passthrough]),
|
ok = meck:new(emqx_access_control, [passthrough]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(t_heartbeat, Config) ->
|
||||||
|
OldConf = ?config(old_conf, Config),
|
||||||
|
{ok, _} = emqx_gateway_conf:update_gateway(coap, OldConf),
|
||||||
|
ok;
|
||||||
end_per_testcase(_, Config) ->
|
end_per_testcase(_, Config) ->
|
||||||
ok = meck:unload(emqx_access_control),
|
ok = meck:unload(emqx_access_control),
|
||||||
Config.
|
Config.
|
||||||
|
@ -123,13 +139,49 @@ t_connection(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% heartbeat
|
%% heartbeat
|
||||||
HeartURI =
|
{ok, changed, _} = send_heartbeat(Token),
|
||||||
?MQTT_PREFIX ++
|
|
||||||
"/connection?clientid=client1&token=" ++
|
|
||||||
Token,
|
|
||||||
|
|
||||||
?LOGT("send heartbeat request:~ts~n", [HeartURI]),
|
disconnection(Channel, Token),
|
||||||
{ok, changed, _} = er_coap_client:request(put, HeartURI),
|
|
||||||
|
timer:sleep(100),
|
||||||
|
?assertEqual(
|
||||||
|
[],
|
||||||
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
do(Action).
|
||||||
|
|
||||||
|
t_heartbeat(Config) ->
|
||||||
|
Heartbeat = ?config(new_heartbeat, Config),
|
||||||
|
Action = fun(Channel) ->
|
||||||
|
Token = connection(Channel),
|
||||||
|
|
||||||
|
timer:sleep(100),
|
||||||
|
?assertNotEqual(
|
||||||
|
[],
|
||||||
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
||||||
|
),
|
||||||
|
|
||||||
|
%% must keep client connection alive
|
||||||
|
Delay = Heartbeat div 2,
|
||||||
|
lists:foreach(
|
||||||
|
fun(_) ->
|
||||||
|
?assertMatch({ok, changed, _}, send_heartbeat(Token)),
|
||||||
|
timer:sleep(Delay)
|
||||||
|
end,
|
||||||
|
lists:seq(1, 5)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertNotEqual(
|
||||||
|
[],
|
||||||
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
||||||
|
),
|
||||||
|
|
||||||
|
timer:sleep(Heartbeat * 2),
|
||||||
|
?assertEqual(
|
||||||
|
[],
|
||||||
|
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
|
||||||
|
),
|
||||||
|
|
||||||
disconnection(Channel, Token),
|
disconnection(Channel, Token),
|
||||||
|
|
||||||
|
@ -491,6 +543,15 @@ t_connectionless_pubsub(_) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% helpers
|
%% helpers
|
||||||
|
|
||||||
|
send_heartbeat(Token) ->
|
||||||
|
HeartURI =
|
||||||
|
?MQTT_PREFIX ++
|
||||||
|
"/connection?clientid=client1&token=" ++
|
||||||
|
Token,
|
||||||
|
|
||||||
|
?LOGT("send heartbeat request:~ts~n", [HeartURI]),
|
||||||
|
er_coap_client:request(put, HeartURI).
|
||||||
|
|
||||||
connection(Channel) ->
|
connection(Channel) ->
|
||||||
URI =
|
URI =
|
||||||
?MQTT_PREFIX ++
|
?MQTT_PREFIX ++
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed an issue that prevented heartbeats from correctly keeping the CoAP Gateway connections alive.
|
Loading…
Reference in New Issue