From f3a92f35f6f4d0202cdeaec57a041204ad5cedb3 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Fri, 23 Aug 2019 13:21:45 +0800 Subject: [PATCH] Handle will message correctly --- src/emqx_channel.erl | 29 ++++++++++++++++++++++------- src/emqx_protocol.erl | 23 ++++++++++++++++++++++- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 137770c73..38e3513cb 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -87,7 +87,8 @@ alive_timer => keepalive, retry_timer => retry_delivery, await_timer => expire_awaiting_rel, - expire_timer => expire_session + expire_timer => expire_session, + will_timer => will_message }). %%-------------------------------------------------------------------- @@ -342,16 +343,18 @@ handle_in(?DISCONNECT_PACKET(RC, Properties), Channel = #channel{session = Sessi _ -> Channel#channel{session = emqx_session:update_expiry_interval(Interval, Session)} end), case Interval of - ?UINT_MAX -> {ok, NChannel}; - Int when Int > 0 -> {ok, ensure_timer(expire_timer, NChannel)}; + ?UINT_MAX -> + {ok, ensure_timer(will_timer, NChannel)}; + Int when Int > 0 -> + {ok, ensure_timer([will_timer, expire_timer], NChannel)}; _Other -> Reason = case RC of - ?RC_SUCCESS -> closed; + ?RC_SUCCESS -> normal; _ -> Ver = emqx_protocol:info(proto_ver, Protocol), emqx_reason_codes:name(RC, Ver) end, - {stop, {shutdown, Reason}, Channel} + {stop, {shutdown, Reason}, NChannel} end end; @@ -680,9 +683,14 @@ timeout(TRef, expire_awaiting_rel, Channel = #channel{session = Session, {ok, reset_timer(await_timer, Timeout, Channel#channel{session = Session})} end; -timeout(_TRef, expire_session, Channel) -> +timeout(TRef, expire_session, Channel = #channel{timers = #{expire_timer := TRef}}) -> shutdown(expired, Channel); +timeout(TRef, will_message, Channel = #channel{protocol = Protocol, + timers = #{will_timer := TRef}}) -> + publish_will_msg(emqx_protocol:info(will_msg, Protocol)), + {ok, clean_timer(will_timer, Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)})}; + timeout(_TRef, Msg, Channel) -> ?LOG(error, "Unexpected timeout: ~p~n", [Msg]), {ok, Channel}. @@ -691,6 +699,11 @@ timeout(_TRef, Msg, Channel) -> %% Ensure timers %%-------------------------------------------------------------------- +ensure_timer([Name], Channel) -> + ensure_timer(Name, Channel); +ensure_timer([Name | Rest], Channel) -> + ensure_timer(Rest, ensure_timer(Name, Channel)); + ensure_timer(Name, Channel = #channel{timers = Timers}) -> TRef = maps:get(Name, Timers, undefined), Time = interval(Name, Channel), @@ -723,7 +736,9 @@ interval(retry_timer, #channel{session = Session}) -> interval(await_timer, #channel{session = Session}) -> emqx_session:info(await_rel_timeout, Session); interval(expire_timer, #channel{session = Session}) -> - timer:seconds(emqx_session:info(expiry_interval, Session)). + timer:seconds(emqx_session:info(expiry_interval, Session)); +interval(will_timer, #channel{protocol = Protocol}) -> + timer:seconds(emqx_protocol:info(will_delay_interval, Protocol)). %%-------------------------------------------------------------------- %% Terminate diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 8bb5df7bf..1007db0b5 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -59,13 +59,22 @@ -spec(init(#mqtt_packet_connect{}) -> protocol()). init(#mqtt_packet_connect{proto_name = ProtoName, proto_ver = ProtoVer, + will_props = WillProps, clean_start = CleanStart, keepalive = Keepalive, properties = Properties, client_id = ClientId, username = Username } = ConnPkt) -> - WillMsg = emqx_packet:will_msg(ConnPkt), + WillMsg = emqx_packet:will_msg( + case ProtoVer of + ?MQTT_PROTO_V5 -> + WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0), + ConnPkt#mqtt_packet_connect{ + will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)}; + _ -> + ConnPkt + end), #protocol{proto_name = ProtoName, proto_ver = ProtoVer, clean_start = CleanStart, @@ -110,6 +119,8 @@ info(username, #protocol{username = Username}) -> Username; info(will_msg, #protocol{will_msg = WillMsg}) -> WillMsg; +info(will_delay_interval, #protocol{will_msg = WillMsg}) -> + emqx_message:get_header('Will-Delay-Interval', WillMsg, 0); info(conn_props, #protocol{conn_props = ConnProps}) -> ConnProps; info(topic_aliases, #protocol{topic_aliases = Aliases}) -> @@ -137,3 +148,13 @@ save_alias(AliasId, Topic, Protocol = #protocol{topic_aliases = Aliases}) -> clear_will_msg(Protocol) -> Protocol#protocol{will_msg = undefined}. + +set_property(Name, Value, undefined) -> + #{Name => Value}; +set_property(Name, Value, Props) -> + Props#{Name => Value}. + +get_property(_Name, undefined, Default) -> + Default; +get_property(Name, Props, Default) -> + maps:get(Name, Props, Default).