delayed will message
This commit is contained in:
parent
ed6ffeda91
commit
cf0f55d057
|
@ -373,22 +373,12 @@ init([Options]) ->
|
||||||
{_ver, undefined} -> random_client_id();
|
{_ver, undefined} -> random_client_id();
|
||||||
{_ver, Id} -> iolist_to_binary(Id)
|
{_ver, Id} -> iolist_to_binary(Id)
|
||||||
end,
|
end,
|
||||||
Username = case proplists:get_value(username, Options) of
|
|
||||||
undefined -> <<>>;
|
|
||||||
Name -> Name
|
|
||||||
end,
|
|
||||||
Password = case proplists:get_value(password, Options) of
|
|
||||||
undefined -> <<>>;
|
|
||||||
Passw -> Passw
|
|
||||||
end,
|
|
||||||
State = init(Options, #state{host = {127,0,0,1},
|
State = init(Options, #state{host = {127,0,0,1},
|
||||||
port = 1883,
|
port = 1883,
|
||||||
hosts = [],
|
hosts = [],
|
||||||
sock_opts = [],
|
sock_opts = [],
|
||||||
bridge_mode = false,
|
bridge_mode = false,
|
||||||
client_id = ClientId,
|
client_id = ClientId,
|
||||||
username = Username,
|
|
||||||
password = Password,
|
|
||||||
clean_start = true,
|
clean_start = true,
|
||||||
proto_ver = ?MQTT_PROTO_V4,
|
proto_ver = ?MQTT_PROTO_V4,
|
||||||
proto_name = <<"MQTT">>,
|
proto_name = <<"MQTT">>,
|
||||||
|
@ -450,9 +440,9 @@ init([{client_id, ClientId} | Opts], State) ->
|
||||||
init(Opts, State#state{client_id = iolist_to_binary(ClientId)});
|
init(Opts, State#state{client_id = iolist_to_binary(ClientId)});
|
||||||
init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) ->
|
init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) ->
|
||||||
init(Opts, State#state{clean_start = CleanStart});
|
init(Opts, State#state{clean_start = CleanStart});
|
||||||
init([{useranme, Username} | Opts], State) ->
|
init([{username, Username} | Opts], State) ->
|
||||||
init(Opts, State#state{username = iolist_to_binary(Username)});
|
init(Opts, State#state{username = iolist_to_binary(Username)});
|
||||||
init([{passwrod, Password} | Opts], State) ->
|
init([{password, Password} | Opts], State) ->
|
||||||
init(Opts, State#state{password = iolist_to_binary(Password)});
|
init(Opts, State#state{password = iolist_to_binary(Password)});
|
||||||
init([{keepalive, Secs} | Opts], State) ->
|
init([{keepalive, Secs} | Opts], State) ->
|
||||||
init(Opts, State#state{keepalive = timer:seconds(Secs)});
|
init(Opts, State#state{keepalive = timer:seconds(Secs)});
|
||||||
|
@ -552,8 +542,6 @@ mqtt_connect(State = #state{client_id = ClientId,
|
||||||
properties = Properties}) ->
|
properties = Properties}) ->
|
||||||
?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg,
|
?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg,
|
||||||
ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties),
|
ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties),
|
||||||
io:format("ConnProps: ~p, ClientID: ~p, Username: ~p, Password: ~p~n",
|
|
||||||
[ConnProps, ClientId, Username, Password]),
|
|
||||||
send(?CONNECT_PACKET(
|
send(?CONNECT_PACKET(
|
||||||
#mqtt_packet_connect{proto_ver = ProtoVer,
|
#mqtt_packet_connect{proto_ver = ProtoVer,
|
||||||
proto_name = ProtoName,
|
proto_name = ProtoName,
|
||||||
|
|
|
@ -264,7 +264,6 @@ process_packet(?CONNECT_PACKET(
|
||||||
%% TODO: Mountpoint...
|
%% TODO: Mountpoint...
|
||||||
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
|
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
|
||||||
WillMsg = emqx_packet:will_msg(Connect),
|
WillMsg = emqx_packet:will_msg(Connect),
|
||||||
|
|
||||||
PState1 = set_username(Username,
|
PState1 = set_username(Username,
|
||||||
PState#pstate{client_id = ClientId,
|
PState#pstate{client_id = ClientId,
|
||||||
proto_ver = ProtoVer,
|
proto_ver = ProtoVer,
|
||||||
|
@ -656,18 +655,23 @@ shutdown(conflict, #pstate{client_id = ClientId}) ->
|
||||||
shutdown(mnesia_conflict, #pstate{client_id = ClientId}) ->
|
shutdown(mnesia_conflict, #pstate{client_id = ClientId}) ->
|
||||||
emqx_cm:unregister_connection(ClientId),
|
emqx_cm:unregister_connection(ClientId),
|
||||||
ignore;
|
ignore;
|
||||||
shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) ->
|
shutdown(Error, PState = #pstate{connected = Connected,
|
||||||
|
client_id = ClientId,
|
||||||
|
will_msg = WillMsg}) ->
|
||||||
?LOG(info, "Shutdown for ~p", [Error], PState),
|
?LOG(info, "Shutdown for ~p", [Error], PState),
|
||||||
%% TODO: Auth failure not publish the will message
|
case Connected of
|
||||||
case Error =:= auth_failure of
|
false -> ok;
|
||||||
true -> ok;
|
true -> send_willmsg(WillMsg)
|
||||||
false -> send_willmsg(WillMsg)
|
|
||||||
end,
|
end,
|
||||||
emqx_hooks:run('client.disconnected', [credentials(PState), Error]),
|
emqx_hooks:run('client.disconnected', [credentials(PState), Error]),
|
||||||
emqx_cm:unregister_connection(ClientId).
|
emqx_cm:unregister_connection(ClientId).
|
||||||
|
|
||||||
send_willmsg(undefined) ->
|
send_willmsg(undefined) ->
|
||||||
ignore;
|
ignore;
|
||||||
|
send_willmsg(WillMsg = #message{topic = Topic,
|
||||||
|
headers = #{'Will-Delay-Interval' := Interval}}) when is_integer(Interval) ->
|
||||||
|
SendAfter = integer_to_binary(Interval),
|
||||||
|
emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>});
|
||||||
send_willmsg(WillMsg) ->
|
send_willmsg(WillMsg) ->
|
||||||
emqx_broker:publish(WillMsg).
|
emqx_broker:publish(WillMsg).
|
||||||
|
|
||||||
|
@ -709,4 +713,3 @@ update_mountpoint(PState = #pstate{mountpoint = MountPoint}) ->
|
||||||
|
|
||||||
sp(true) -> 1;
|
sp(true) -> 1;
|
||||||
sp(false) -> 0.
|
sp(false) -> 0.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue