Merge pull request #1772 from terry-xiaoyu/fix_terminate

delayed will message
This commit is contained in:
Shawn 2018-08-30 23:34:50 +08:00 committed by GitHub
commit bbe629d303
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 22 deletions

View File

@ -373,22 +373,12 @@ init([Options]) ->
{_ver, undefined} -> random_client_id();
{_ver, Id} -> iolist_to_binary(Id)
end,
Username = case proplists:get_value(username, Options) of
undefined -> <<>>;
Name -> Name
end,
Password = case proplists:get_value(password, Options) of
undefined -> <<>>;
Passw -> Passw
end,
State = init(Options, #state{host = {127,0,0,1},
port = 1883,
hosts = [],
sock_opts = [],
bridge_mode = false,
client_id = ClientId,
username = Username,
password = Password,
clean_start = true,
proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
@ -450,9 +440,9 @@ init([{client_id, ClientId} | Opts], State) ->
init(Opts, State#state{client_id = iolist_to_binary(ClientId)});
init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) ->
init(Opts, State#state{clean_start = CleanStart});
init([{useranme, Username} | Opts], State) ->
init([{username, Username} | Opts], State) ->
init(Opts, State#state{username = iolist_to_binary(Username)});
init([{passwrod, Password} | Opts], State) ->
init([{password, Password} | Opts], State) ->
init(Opts, State#state{password = iolist_to_binary(Password)});
init([{keepalive, Secs} | Opts], State) ->
init(Opts, State#state{keepalive = timer:seconds(Secs)});
@ -552,8 +542,6 @@ mqtt_connect(State = #state{client_id = ClientId,
properties = Properties}) ->
?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg,
ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties),
io:format("ConnProps: ~p, ClientID: ~p, Username: ~p, Password: ~p~n",
[ConnProps, ClientId, Username, Password]),
send(?CONNECT_PACKET(
#mqtt_packet_connect{proto_ver = ProtoVer,
proto_name = ProtoName,

View File

@ -264,7 +264,6 @@ process_packet(?CONNECT_PACKET(
%% TODO: Mountpoint...
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
WillMsg = emqx_packet:will_msg(Connect),
PState1 = set_username(Username,
PState#pstate{client_id = ClientId,
proto_ver = ProtoVer,
@ -656,18 +655,23 @@ shutdown(conflict, #pstate{client_id = ClientId}) ->
shutdown(mnesia_conflict, #pstate{client_id = ClientId}) ->
emqx_cm:unregister_connection(ClientId),
ignore;
shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) ->
shutdown(Error, PState = #pstate{connected = false}) ->
?LOG(info, "Shutdown for ~p", [Error], PState),
%% TODO: Auth failure not publish the will message
case Error =:= auth_failure of
true -> ok;
false -> send_willmsg(WillMsg)
end,
ignore;
shutdown(Error, PState = #pstate{connected = true,
client_id = ClientId,
will_msg = WillMsg}) ->
?LOG(info, "Shutdown for ~p", [Error], PState),
send_willmsg(WillMsg),
emqx_hooks:run('client.disconnected', [credentials(PState), Error]),
emqx_cm:unregister_connection(ClientId).
send_willmsg(undefined) ->
ignore;
send_willmsg(WillMsg = #message{topic = Topic,
headers = #{'Will-Delay-Interval' := Interval}}) when is_integer(Interval) ->
SendAfter = integer_to_binary(Interval),
emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>});
send_willmsg(WillMsg) ->
emqx_broker:publish(WillMsg).
@ -709,4 +713,3 @@ update_mountpoint(PState = #pstate{mountpoint = MountPoint}) ->
sp(true) -> 1;
sp(false) -> 0.