diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 7b23121c5..1eb3a2ea8 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1137,7 +1137,7 @@ handle_call( conninfo = #{proto_ver := ProtoVer} } ) -> - Channel0 = maybe_publish_will_msg(Channel), + Channel0 = maybe_publish_will_msg(kick, Channel), Channel1 = case ConnState of connected -> ensure_disconnected(kicked, Channel0); @@ -1230,8 +1230,9 @@ handle_info( ) when ConnState =:= connected orelse ConnState =:= reauthenticating -> + {Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session), - Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), + Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)), Channel2 = Channel1#channel{session = Session1}, case maybe_shutdown(Reason, Intent, Channel2) of {ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3}; @@ -1425,7 +1426,7 @@ terminate(normal, Channel) -> terminate({shutdown, kicked}, Channel) -> run_terminate_hook(kicked, Channel); terminate(Reason, Channel) -> - Channel1 = maybe_publish_will_msg(Channel), + Channel1 = maybe_publish_will_msg(Reason, Channel), run_terminate_hook(Reason, Channel1). run_terminate_hook(_Reason, #channel{session = undefined}) -> @@ -2217,16 +2218,43 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) -> %%-------------------------------------------------------------------- %% Maybe Publish will msg - -maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) -> +-spec maybe_publish_will_msg(Reason, channel()) -> channel() when + Reason :: kick | sock_closed | {shutdown, atom()}. +maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) -> Channel; -maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) -> - case will_delay_interval(WillMsg) of - 0 -> - ok = publish_will_msg(ClientInfo, WillMsg), - Channel#channel{will_msg = undefined}; - I -> - ensure_timer(will_message, timer:seconds(I), Channel) +maybe_publish_will_msg( + _Reason, + Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V3}, will_msg = WillMsg} +) -> + %% Unconditionally publish will message for MQTT 3.1.1 + ok = publish_will_msg(Channel#channel.clientinfo, WillMsg), + Channel#channel{will_msg = undefined}; +maybe_publish_will_msg( + Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} +) when + Reason =:= {shutdown, expired} +-> + %% Must publish now without delay and cancel the will message timer. + DelayedWillTimer = maps:get(will_message, Timers, undefined), + DelayedWillTimer =/= undefined andalso erlang:cancel_timer(DelayedWillTimer, [{async, true}]), + _ = publish_will_msg(ClientInfo, WillMsg), + Channel#channel{will_msg = undefined, timers = maps:remove(will_message, Timers)}; +maybe_publish_will_msg( + _OtherReasons, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers} +) -> + case maps:get(will_message, Timers, undefined) of + undefined -> + %% May defer the will message publishing + case will_delay_interval(WillMsg) of + 0 -> + ok = publish_will_msg(ClientInfo, WillMsg), + Channel#channel{will_msg = undefined}; + I -> + ensure_timer(will_message, timer:seconds(I), Channel) + end; + %% Will message is already scheduled + _ -> + Channel end. will_delay_interval(WillMsg) ->