From cf0f55d057607b1d2ae21b25d6a8f010eb9d0ca9 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Thu, 30 Aug 2018 21:43:23 +0800 Subject: [PATCH] delayed will message --- src/emqx_client.erl | 16 ++-------------- src/emqx_protocol.erl | 17 ++++++++++------- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 82e331abd..5c50519bc 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -373,22 +373,12 @@ init([Options]) -> {_ver, undefined} -> random_client_id(); {_ver, Id} -> iolist_to_binary(Id) end, - Username = case proplists:get_value(username, Options) of - undefined -> <<>>; - Name -> Name - end, - Password = case proplists:get_value(password, Options) of - undefined -> <<>>; - Passw -> Passw - end, State = init(Options, #state{host = {127,0,0,1}, port = 1883, hosts = [], sock_opts = [], bridge_mode = false, client_id = ClientId, - username = Username, - password = Password, clean_start = true, proto_ver = ?MQTT_PROTO_V4, proto_name = <<"MQTT">>, @@ -450,9 +440,9 @@ init([{client_id, ClientId} | Opts], State) -> init(Opts, State#state{client_id = iolist_to_binary(ClientId)}); init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) -> init(Opts, State#state{clean_start = CleanStart}); -init([{useranme, Username} | Opts], State) -> +init([{username, Username} | Opts], State) -> init(Opts, State#state{username = iolist_to_binary(Username)}); -init([{passwrod, Password} | Opts], State) -> +init([{password, Password} | Opts], State) -> init(Opts, State#state{password = iolist_to_binary(Password)}); init([{keepalive, Secs} | Opts], State) -> init(Opts, State#state{keepalive = timer:seconds(Secs)}); @@ -552,8 +542,6 @@ mqtt_connect(State = #state{client_id = ClientId, properties = Properties}) -> ?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg, ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties), - io:format("ConnProps: ~p, ClientID: ~p, Username: ~p, Password: ~p~n", - [ConnProps, ClientId, Username, Password]), send(?CONNECT_PACKET( #mqtt_packet_connect{proto_ver = ProtoVer, proto_name = ProtoName, diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 9ee24609f..054bcdf16 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -264,7 +264,6 @@ process_packet(?CONNECT_PACKET( %% TODO: Mountpoint... %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) WillMsg = emqx_packet:will_msg(Connect), - PState1 = set_username(Username, PState#pstate{client_id = ClientId, proto_ver = ProtoVer, @@ -656,18 +655,23 @@ shutdown(conflict, #pstate{client_id = ClientId}) -> shutdown(mnesia_conflict, #pstate{client_id = ClientId}) -> emqx_cm:unregister_connection(ClientId), ignore; -shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) -> +shutdown(Error, PState = #pstate{connected = Connected, + client_id = ClientId, + will_msg = WillMsg}) -> ?LOG(info, "Shutdown for ~p", [Error], PState), - %% TODO: Auth failure not publish the will message - case Error =:= auth_failure of - true -> ok; - false -> send_willmsg(WillMsg) + case Connected of + false -> ok; + true -> send_willmsg(WillMsg) end, emqx_hooks:run('client.disconnected', [credentials(PState), Error]), emqx_cm:unregister_connection(ClientId). send_willmsg(undefined) -> ignore; +send_willmsg(WillMsg = #message{topic = Topic, + headers = #{'Will-Delay-Interval' := Interval}}) when is_integer(Interval) -> + SendAfter = integer_to_binary(Interval), + emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>}); send_willmsg(WillMsg) -> emqx_broker:publish(WillMsg). @@ -709,4 +713,3 @@ update_mountpoint(PState = #pstate{mountpoint = MountPoint}) -> sp(true) -> 1; sp(false) -> 0. -