improve will message
This commit is contained in:
parent
52eae65983
commit
9bcd4c3e08
|
@ -260,6 +260,7 @@ process_packet(?CONNECT_PACKET(
|
|||
clean_start = CleanStart,
|
||||
keepalive = Keepalive,
|
||||
properties = ConnProps,
|
||||
will_props = WillProps,
|
||||
will_topic = WillTopic,
|
||||
client_id = ClientId,
|
||||
username = Username,
|
||||
|
@ -267,7 +268,16 @@ process_packet(?CONNECT_PACKET(
|
|||
|
||||
%% TODO: Mountpoint...
|
||||
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
|
||||
WillMsg = emqx_packet:will_msg(Connect),
|
||||
Connect1 = if
|
||||
ProtoVer =:= ?MQTT_PROTO_V5 ->
|
||||
WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0),
|
||||
SessionExpiryInterval = get_property('Session-Expiry-Interval', ConnProps, 0),
|
||||
WillProps1 = set_property('Will-Delay-Interval', erlang:min(SessionExpiryInterval, WillDelayInterval), WillProps),
|
||||
Connect#mqtt_packet_connect{will_props = WillProps1};
|
||||
true ->
|
||||
Connect
|
||||
end,
|
||||
WillMsg = emqx_packet:will_msg(Connect1),
|
||||
|
||||
PState1 = set_username(Username,
|
||||
PState#pstate{client_id = ClientId,
|
||||
|
@ -642,6 +652,11 @@ set_property(Name, Value, undefined) ->
|
|||
set_property(Name, Value, Props) ->
|
||||
Props#{Name => Value}.
|
||||
|
||||
get_property(_Name, undefined, Default) ->
|
||||
Default;
|
||||
get_property(Name, Props, Default) ->
|
||||
maps:get(Name, Props, Default).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Check Packet
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -777,20 +792,22 @@ shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
|
|||
emqx_cm:unregister_connection(ClientId);
|
||||
shutdown(Reason, PState = #pstate{connected = true,
|
||||
client_id = ClientId,
|
||||
will_msg = WillMsg}) ->
|
||||
will_msg = WillMsg,
|
||||
session = Session}) ->
|
||||
?LOG(info, "Shutdown for ~p", [Reason], PState),
|
||||
_ = send_willmsg(WillMsg),
|
||||
_ = send_willmsg(WillMsg, Session),
|
||||
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
|
||||
emqx_cm:unregister_connection(ClientId).
|
||||
|
||||
send_willmsg(undefined) ->
|
||||
send_willmsg(undefined, _Session) ->
|
||||
ignore;
|
||||
send_willmsg(WillMsg = #message{topic = Topic,
|
||||
headers = #{'Will-Delay-Interval' := Interval}})
|
||||
headers = #{'Will-Delay-Interval' := Interval}}, Session)
|
||||
when is_integer(Interval), Interval > 0 ->
|
||||
SendAfter = integer_to_binary(Interval),
|
||||
emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>});
|
||||
send_willmsg(WillMsg) ->
|
||||
Session1 = list_to_binary(pid_to_list(Session)),
|
||||
emqx_broker:publish(WillMsg#message{topic = <<"$will/", Session1/binary, "/", SendAfter/binary, "/", Topic/binary>>});
|
||||
send_willmsg(WillMsg, _Session) ->
|
||||
emqx_broker:publish(WillMsg).
|
||||
|
||||
start_keepalive(0, _PState) ->
|
||||
|
|
|
@ -542,7 +542,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
|
|||
%% Clean Session: true -> false???
|
||||
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
|
||||
|
||||
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
|
||||
emqx_hooks:run('session.resumed', [#{client_id => ClientId, session => self()}, attrs(State)]),
|
||||
|
||||
%% Replay delivery and Dequeue pending messages
|
||||
noreply(dequeue(retry_delivery(true, State1)));
|
||||
|
|
Loading…
Reference in New Issue