diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 1fb80c0ce..0a5858e44 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -260,6 +260,7 @@ process_packet(?CONNECT_PACKET( clean_start = CleanStart, keepalive = Keepalive, properties = ConnProps, + will_props = WillProps, will_topic = WillTopic, client_id = ClientId, username = Username, @@ -267,7 +268,16 @@ process_packet(?CONNECT_PACKET( %% TODO: Mountpoint... %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) - WillMsg = emqx_packet:will_msg(Connect), + Connect1 = if + ProtoVer =:= ?MQTT_PROTO_V5 -> + WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0), + SessionExpiryInterval = get_property('Session-Expiry-Interval', ConnProps, 0), + WillProps1 = set_property('Will-Delay-Interval', erlang:min(SessionExpiryInterval, WillDelayInterval), WillProps), + Connect#mqtt_packet_connect{will_props = WillProps1}; + true -> + Connect + end, + WillMsg = emqx_packet:will_msg(Connect1), PState1 = set_username(Username, PState#pstate{client_id = ClientId, @@ -642,6 +652,11 @@ set_property(Name, Value, undefined) -> set_property(Name, Value, Props) -> Props#{Name => Value}. +get_property(_Name, undefined, Default) -> + Default; +get_property(Name, Props, Default) -> + maps:get(Name, Props, Default). + %%------------------------------------------------------------------------------ %% Check Packet %%------------------------------------------------------------------------------ @@ -777,20 +792,22 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict; emqx_cm:unregister_connection(ClientId); shutdown(Reason, PState = #pstate{connected = true, client_id = ClientId, - will_msg = WillMsg}) -> + will_msg = WillMsg, + session = Session}) -> ?LOG(info, "Shutdown for ~p", [Reason], PState), - _ = send_willmsg(WillMsg), + _ = send_willmsg(WillMsg, Session), emqx_hooks:run('client.disconnected', [credentials(PState), Reason]), emqx_cm:unregister_connection(ClientId). -send_willmsg(undefined) -> +send_willmsg(undefined, _Session) -> ignore; send_willmsg(WillMsg = #message{topic = Topic, - headers = #{'Will-Delay-Interval' := Interval}}) + headers = #{'Will-Delay-Interval' := Interval}}, Session) when is_integer(Interval), Interval > 0 -> SendAfter = integer_to_binary(Interval), - emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>}); -send_willmsg(WillMsg) -> + Session1 = list_to_binary(pid_to_list(Session)), + emqx_broker:publish(WillMsg#message{topic = <<"$will/", Session1/binary, "/", SendAfter/binary, "/", Topic/binary>>}); +send_willmsg(WillMsg, _Session) -> emqx_broker:publish(WillMsg). start_keepalive(0, _PState) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d327723f5..b63b45b18 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -542,7 +542,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, %% Clean Session: true -> false??? CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)), - emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]), + emqx_hooks:run('session.resumed', [#{client_id => ClientId, session => self()}, attrs(State)]), %% Replay delivery and Dequeue pending messages noreply(dequeue(retry_delivery(true, State1)));