fix(mqtt): ensure publish willmsg when session expires
This commit is contained in:
parent
07eec31a8a
commit
9da4896f57
|
@ -1137,7 +1137,7 @@ handle_call(
|
||||||
conninfo = #{proto_ver := ProtoVer}
|
conninfo = #{proto_ver := ProtoVer}
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
Channel0 = maybe_publish_will_msg(Channel),
|
Channel0 = maybe_publish_will_msg(kick, Channel),
|
||||||
Channel1 =
|
Channel1 =
|
||||||
case ConnState of
|
case ConnState of
|
||||||
connected -> ensure_disconnected(kicked, Channel0);
|
connected -> ensure_disconnected(kicked, Channel0);
|
||||||
|
@ -1230,8 +1230,9 @@ handle_info(
|
||||||
) when
|
) when
|
||||||
ConnState =:= connected orelse ConnState =:= reauthenticating
|
ConnState =:= connected orelse ConnState =:= reauthenticating
|
||||||
->
|
->
|
||||||
|
|
||||||
{Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session),
|
{Intent, Session1} = session_disconnect(ClientInfo, ConnInfo, Session),
|
||||||
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)),
|
Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(sock_closed, Channel)),
|
||||||
Channel2 = Channel1#channel{session = Session1},
|
Channel2 = Channel1#channel{session = Session1},
|
||||||
case maybe_shutdown(Reason, Intent, Channel2) of
|
case maybe_shutdown(Reason, Intent, Channel2) of
|
||||||
{ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3};
|
{ok, Channel3} -> {ok, ?REPLY_EVENT(disconnected), Channel3};
|
||||||
|
@ -1425,7 +1426,7 @@ terminate(normal, Channel) ->
|
||||||
terminate({shutdown, kicked}, Channel) ->
|
terminate({shutdown, kicked}, Channel) ->
|
||||||
run_terminate_hook(kicked, Channel);
|
run_terminate_hook(kicked, Channel);
|
||||||
terminate(Reason, Channel) ->
|
terminate(Reason, Channel) ->
|
||||||
Channel1 = maybe_publish_will_msg(Channel),
|
Channel1 = maybe_publish_will_msg(Reason, Channel),
|
||||||
run_terminate_hook(Reason, Channel1).
|
run_terminate_hook(Reason, Channel1).
|
||||||
|
|
||||||
run_terminate_hook(_Reason, #channel{session = undefined}) ->
|
run_terminate_hook(_Reason, #channel{session = undefined}) ->
|
||||||
|
@ -2217,16 +2218,43 @@ session_disconnect(_ClientInfo, _ConnInfo, undefined) ->
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Maybe Publish will msg
|
%% Maybe Publish will msg
|
||||||
|
-spec maybe_publish_will_msg(Reason, channel()) -> channel() when
|
||||||
maybe_publish_will_msg(Channel = #channel{will_msg = undefined}) ->
|
Reason :: kick | sock_closed | {shutdown, atom()}.
|
||||||
|
maybe_publish_will_msg(_Reason, Channel = #channel{will_msg = undefined}) ->
|
||||||
Channel;
|
Channel;
|
||||||
maybe_publish_will_msg(Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) ->
|
maybe_publish_will_msg(
|
||||||
case will_delay_interval(WillMsg) of
|
_Reason,
|
||||||
0 ->
|
Channel = #channel{conninfo = #{proto_ver := ?MQTT_PROTO_V3}, will_msg = WillMsg}
|
||||||
ok = publish_will_msg(ClientInfo, WillMsg),
|
) ->
|
||||||
Channel#channel{will_msg = undefined};
|
%% Unconditionally publish will message for MQTT 3.1.1
|
||||||
I ->
|
ok = publish_will_msg(Channel#channel.clientinfo, WillMsg),
|
||||||
ensure_timer(will_message, timer:seconds(I), Channel)
|
Channel#channel{will_msg = undefined};
|
||||||
|
maybe_publish_will_msg(
|
||||||
|
Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers}
|
||||||
|
) when
|
||||||
|
Reason =:= {shutdown, expired}
|
||||||
|
->
|
||||||
|
%% Must publish now without delay and cancel the will message timer.
|
||||||
|
DelayedWillTimer = maps:get(will_message, Timers, undefined),
|
||||||
|
DelayedWillTimer =/= undefined andalso erlang:cancel_timer(DelayedWillTimer, [{async, true}]),
|
||||||
|
_ = publish_will_msg(ClientInfo, WillMsg),
|
||||||
|
Channel#channel{will_msg = undefined, timers = maps:remove(will_message, Timers)};
|
||||||
|
maybe_publish_will_msg(
|
||||||
|
_OtherReasons, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg, timers = Timers}
|
||||||
|
) ->
|
||||||
|
case maps:get(will_message, Timers, undefined) of
|
||||||
|
undefined ->
|
||||||
|
%% May defer the will message publishing
|
||||||
|
case will_delay_interval(WillMsg) of
|
||||||
|
0 ->
|
||||||
|
ok = publish_will_msg(ClientInfo, WillMsg),
|
||||||
|
Channel#channel{will_msg = undefined};
|
||||||
|
I ->
|
||||||
|
ensure_timer(will_message, timer:seconds(I), Channel)
|
||||||
|
end;
|
||||||
|
%% Will message is already scheduled
|
||||||
|
_ ->
|
||||||
|
Channel
|
||||||
end.
|
end.
|
||||||
|
|
||||||
will_delay_interval(WillMsg) ->
|
will_delay_interval(WillMsg) ->
|
||||||
|
|
Loading…
Reference in New Issue