diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index 467ac20a2..5e3461c52 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -86,7 +86,6 @@ -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). -define(DEF_IDLE_TIME, timer:seconds(30)). --define(GET_IDLE_TIME(Cfg), maps:get(idle_timeout, Cfg, ?DEF_IDLE_TIME)). -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). @@ -150,8 +149,7 @@ init( mountpoint => Mountpoint } ), - %% FIXME: it should coap.hearbeat instead of idle_timeout? - Heartbeat = ?GET_IDLE_TIME(Config), + Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_TIME), #channel{ ctx = Ctx, conninfo = ConnInfo, @@ -179,8 +177,8 @@ send_request(Channel, Request) -> | {ok, replies(), channel()} | {shutdown, Reason :: term(), channel()} | {shutdown, Reason :: term(), replies(), channel()}. -handle_in(Msg, ChannleT) -> - Channel = ensure_keepalive_timer(ChannleT), +handle_in(Msg, Channel0) -> + Channel = ensure_keepalive_timer(Channel0), case emqx_coap_message:is_request(Msg) of true -> check_auth_state(Msg, Channel); @@ -321,6 +319,9 @@ handle_call(Req, _From, Channel) -> handle_cast(close, Channel) -> ?SLOG(info, #{msg => "close_connection"}), shutdown(normal, Channel); +handle_cast(inc_recv_pkt, Channel) -> + _ = emqx_pd:inc_counter(recv_pkt, 1), + {ok, Channel}; handle_cast(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Req}), {ok, Channel}. @@ -455,6 +456,13 @@ check_token( Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg), {shutdown, normal, Reply, Channel}; true -> + %% hack: since each message request can spawn a new connection + %% process, we can't rely on the `inc_incoming_stats' call in + %% `emqx_gateway_conn:handle_incoming' to properly keep track of + %% bumping incoming requests for an existing channel. Since this + %% number is used by keepalive, we have to bump it inside the + %% requested channel/connection pid so heartbeats actually work. + emqx_gateway_cm:cast(coap, ReqClientId, inc_recv_pkt), call_session(handle_request, Msg, Channel) end; _ -> diff --git a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl index 4459d84f1..c066b84ff 100644 --- a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl @@ -83,10 +83,26 @@ init_per_testcase(t_connection_with_authn_failed, Config) -> fun(_) -> {error, bad_username_or_password} end ), Config; +init_per_testcase(t_heartbeat, Config) -> + NewHeartbeat = 800, + OldConf = emqx:get_raw_config([gateway, coap]), + {ok, _} = emqx_gateway_conf:update_gateway( + coap, + OldConf#{<<"heartbeat">> => <<"800ms">>} + ), + [ + {old_conf, OldConf}, + {new_heartbeat, NewHeartbeat} + | Config + ]; init_per_testcase(_, Config) -> ok = meck:new(emqx_access_control, [passthrough]), Config. +end_per_testcase(t_heartbeat, Config) -> + OldConf = ?config(old_conf, Config), + {ok, _} = emqx_gateway_conf:update_gateway(coap, OldConf), + ok; end_per_testcase(_, Config) -> ok = meck:unload(emqx_access_control), Config. @@ -123,13 +139,49 @@ t_connection(_) -> ), %% heartbeat - HeartURI = - ?MQTT_PREFIX ++ - "/connection?clientid=client1&token=" ++ - Token, + {ok, changed, _} = send_heartbeat(Token), - ?LOGT("send heartbeat request:~ts~n", [HeartURI]), - {ok, changed, _} = er_coap_client:request(put, HeartURI), + disconnection(Channel, Token), + + timer:sleep(100), + ?assertEqual( + [], + emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) + ) + end, + do(Action). + +t_heartbeat(Config) -> + Heartbeat = ?config(new_heartbeat, Config), + Action = fun(Channel) -> + Token = connection(Channel), + + timer:sleep(100), + ?assertNotEqual( + [], + emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) + ), + + %% must keep client connection alive + Delay = Heartbeat div 2, + lists:foreach( + fun(_) -> + ?assertMatch({ok, changed, _}, send_heartbeat(Token)), + timer:sleep(Delay) + end, + lists:seq(1, 5) + ), + + ?assertNotEqual( + [], + emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) + ), + + timer:sleep(Heartbeat * 2), + ?assertEqual( + [], + emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) + ), disconnection(Channel, Token), @@ -491,6 +543,15 @@ t_connectionless_pubsub(_) -> %%-------------------------------------------------------------------- %% helpers +send_heartbeat(Token) -> + HeartURI = + ?MQTT_PREFIX ++ + "/connection?clientid=client1&token=" ++ + Token, + + ?LOGT("send heartbeat request:~ts~n", [HeartURI]), + er_coap_client:request(put, HeartURI). + connection(Channel) -> URI = ?MQTT_PREFIX ++ diff --git a/changes/ce/fix-11791.en.md b/changes/ce/fix-11791.en.md new file mode 100644 index 000000000..983347605 --- /dev/null +++ b/changes/ce/fix-11791.en.md @@ -0,0 +1 @@ +Fixed an issue that prevented heartbeats from correctly keeping the CoAP Gateway connections alive.