diff --git a/apps/emqx_sn/src/emqx_sn.app.src b/apps/emqx_sn/src/emqx_sn.app.src index b0453c774..3d7db1b02 100644 --- a/apps/emqx_sn/src/emqx_sn.app.src +++ b/apps/emqx_sn/src/emqx_sn.app.src @@ -1,6 +1,6 @@ {application, emqx_sn, [{description, "EMQ X MQTT-SN Plugin"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.3.1"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,esockd]}, diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src new file mode 100644 index 000000000..499664fe1 --- /dev/null +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -0,0 +1,17 @@ +%% -*-: erlang -*- +{VSN, + [ + {"4.3.0", [ + {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []}, + {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]} + ]}, + {<<".*">>, []} + ], + [ + {"4.3.0", [ + {load_module, emqx_sn_asleep_timer, brutal_purge, soft_purge, []}, + {load_module, emqx_sn_gateway, brutal_purge, soft_purge, [emqx_sn_asleep_timer]} + ]}, + {<<".*">>, []} + ] +}. diff --git a/apps/emqx_sn/src/emqx_sn_asleep_timer.erl b/apps/emqx_sn/src/emqx_sn_asleep_timer.erl index 56a63ee2f..37ea67689 100644 --- a/apps/emqx_sn/src/emqx_sn_asleep_timer.erl +++ b/apps/emqx_sn/src/emqx_sn_asleep_timer.erl @@ -18,6 +18,7 @@ -export([ init/0 , ensure/2 + , cancel/1 ]). -record(asleep_state, { @@ -42,8 +43,8 @@ init() -> -spec(ensure(undefined | integer(), asleep_state()) -> asleep_state()). ensure(undefined, State = #asleep_state{duration = Duration}) -> ensure(Duration, State); -ensure(Duration, State = #asleep_state{tref = TRef}) -> - _ = cancel(TRef), +ensure(Duration, State) -> + cancel(State), State#asleep_state{duration = Duration, tref = start(Duration)}. %%-------------------------------------------------------------------- @@ -55,6 +56,10 @@ ensure(Duration, State = #asleep_state{tref = TRef}) -> start(Duration) -> erlang:send_after(timer:seconds(Duration), self(), asleep_timeout). -cancel(undefined) -> ok; -cancel(TRef) when is_reference(TRef) -> - erlang:cancel_timer(TRef). +cancel(#asleep_state{tref = Timer}) when is_reference(Timer) -> + case erlang:cancel_timer(Timer) of + false -> + receive {timeout, Timer, _} -> ok after 0 -> ok end; + _ -> ok + end; +cancel(_) -> ok. \ No newline at end of file diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index bfb2f28df..96f849974 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -439,12 +439,11 @@ asleep(cast, {incoming, ?SN_PUBREC_MSG(PubRec, MsgId)}, State) % 4) emq-sn regard this CONNECT as a signal to connected state, not a bootup CONNECT. For this reason, will procedure is lost % this should be a bug in mqtt-sn channel. asleep(cast, {incoming, ?SN_CONNECT_MSG(_Flags, _ProtoId, _Duration, _ClientId)}, - State = #state{keepalive_interval = _Interval}) -> - % device wakeup and goto connected state - % keepalive timer may timeout in asleep state and delete itself, need to restart keepalive - % TODO: Fixme later. - %% self() ! {keepalive, start, Interval}, - {next_state, connected, send_connack(State)}; + State = #state{channel = Channel, asleep_timer = Timer}) -> + NChannel = emqx_channel:ensure_keepalive(#{}, Channel), + emqx_sn_asleep_timer:cancel(Timer), + {next_state, connected, send_connack(State#state{channel = NChannel, + asleep_timer = emqx_sn_asleep_timer:init()})}; asleep(EventType, EventContent, State) -> handle_event(EventType, EventContent, asleep, State). @@ -771,10 +770,13 @@ send_message(Msg = #mqtt_sn_message{type = Type}, goto_asleep_state(State) -> goto_asleep_state(undefined, State). -goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer}) -> +goto_asleep_state(Duration, State=#state{asleep_timer = AsleepTimer, + channel = Channel}) -> ?LOG(debug, "goto_asleep_state Duration=~p", [Duration]), NewTimer = emqx_sn_asleep_timer:ensure(Duration, AsleepTimer), - {next_state, asleep, State#state{asleep_timer = NewTimer}, hibernate}. + NChannel = emqx_channel:clear_keepalive(Channel), + {next_state, asleep, State#state{asleep_timer = NewTimer, + channel = NChannel}, hibernate}. %%-------------------------------------------------------------------- %% Helper funcs diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index ad0c5f032..2972571be 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -856,7 +856,7 @@ t_will_test2(_) -> send_pingreq_msg(Socket, undefined), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), - timer:sleep(10000), + timer:sleep(4000), receive_response(Socket), % ignore PUBACK receive_response(Socket), % ignore PUBCOMP @@ -878,7 +878,7 @@ t_will_test3(_) -> send_pingreq_msg(Socket, undefined), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), - timer:sleep(10000), + timer:sleep(4000), ?assertEqual(udp_receive_timeout, receive_response(Socket)), @@ -906,7 +906,7 @@ t_will_test4(_) -> send_willmsgupd_msg(Socket, <<"1A2B3C">>), ?assertEqual(<<3, ?SN_WILLMSGRESP, ?SN_RC_ACCEPTED>>, receive_response(Socket)), - timer:sleep(10000), + timer:sleep(4000), receive_response(Socket), % ignore PUBACK @@ -1359,7 +1359,7 @@ t_asleep_test07_to_connected(_) -> timer:sleep(1500), % asleep timer should get timeout, without any effect - timer:sleep(9000), + timer:sleep(4000), % keepalive timer should get timeout gen_udp:close(Socket). @@ -1517,7 +1517,7 @@ t_awake_test01_to_connected(_) -> timer:sleep(1500), % asleep timer should get timeout - timer:sleep(9000), + timer:sleep(4000), % keepalive timer should get timeout gen_udp:close(Socket). diff --git a/src/emqx.appup.src b/src/emqx.appup.src index ef6764241..bde6af882 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -5,7 +5,8 @@ {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, - {load_module, emqx_node_dump, brutal_purge, soft_purge, []} + {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []} ]}, {"4.3.0", [ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, @@ -13,6 +14,9 @@ {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_trie, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_metrics, brutal_purge, soft_purge, []}, {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}} ]}, @@ -23,7 +27,8 @@ {load_module, emqx_connection, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, - {load_module, emqx_node_dump, brutal_purge, soft_purge, []} + {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []} ]}, {"4.3.0", [ {load_module, emqx_logger_jsonfmt, brutal_purge, soft_purge, []}, @@ -31,9 +36,13 @@ {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_trie, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, %% Just load the module. We don't need to change the 'messages.retained' %% and 'messages.retained' counter type. - {load_module, emqx_metrics, brutal_purge, soft_purge, []} + {load_module, emqx_metrics, brutal_purge, soft_purge, []}, + {apply, {emqx_metrics, upgrade_retained_delayed_counter_type, []}} ]}, {<<".*">>, []} diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index c8acccce9..3a818b9a7 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -49,7 +49,10 @@ ]). %% Export for emqx_sn --export([do_deliver/2]). +-export([ do_deliver/2 + , ensure_keepalive/2 + , clear_keepalive/1 + ]). %% Exports for CT -export([set_field/3]). @@ -1562,6 +1565,14 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone} Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). +clear_keepalive(Channel = #channel{timers = Timers}) -> + case maps:get(alive_timer, Timers, undefined) of + undefined -> + Channel; + TRef -> + emqx_misc:cancel_timer(TRef), + Channel#channel{timers = maps:without([alive_timer], Timers)} + end. %%-------------------------------------------------------------------- %% Maybe Resume Session