presence and misc fix
This commit is contained in:
parent
230a348f51
commit
5a2dfd2a10
|
@ -100,7 +100,6 @@ handle_info(timeout, State) ->
|
||||||
|
|
||||||
handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState,
|
handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState,
|
||||||
conn_name=ConnName}) ->
|
conn_name=ConnName}) ->
|
||||||
%% TODO: to...
|
|
||||||
%% need transfer data???
|
%% need transfer data???
|
||||||
%% emqttd_client:transfer(NewPid, Data),
|
%% emqttd_client:transfer(NewPid, Data),
|
||||||
lager:error("Shutdown for duplicate clientid: ~s, conn:~s",
|
lager:error("Shutdown for duplicate clientid: ~s, conn:~s",
|
||||||
|
|
|
@ -43,24 +43,26 @@ client_connected(ConnAck, #mqtt_client{clientid = ClientId,
|
||||||
username = Username,
|
username = Username,
|
||||||
ipaddress = IpAddress,
|
ipaddress = IpAddress,
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
proto_ver = ProtoVer}, _Opts) ->
|
proto_ver = ProtoVer}, Opts) ->
|
||||||
Sess = case CleanSess of
|
Sess = case CleanSess of
|
||||||
true -> false;
|
true -> false;
|
||||||
false -> true
|
false -> true
|
||||||
end,
|
end,
|
||||||
Json = mochijson2:encode([{username, Username},
|
Json = mochijson2:encode([{username, Username},
|
||||||
{ipaddress, emqttd_net:ntoa(IpAddress)},
|
{ipaddress, list_to_binary(emqttd_net:ntoa(IpAddress))},
|
||||||
{session, Sess},
|
{session, Sess},
|
||||||
{protocol, ProtoVer},
|
{protocol, ProtoVer},
|
||||||
{connack, ConnAck},
|
{connack, ConnAck},
|
||||||
{ts, emqttd_vm:timestamp()}]),
|
{ts, emqttd_vm:timestamp()}]),
|
||||||
Message = #mqtt_message{topic = topic(connected, ClientId),
|
Message = #mqtt_message{qos = proplists:get_value(qos, Opts, 0),
|
||||||
|
topic = topic(connected, ClientId),
|
||||||
payload = iolist_to_binary(Json)},
|
payload = iolist_to_binary(Json)},
|
||||||
emqttd_pubsub:publish(presence, Message).
|
emqttd_pubsub:publish(presence, Message).
|
||||||
|
|
||||||
client_disconnected(Reason, ClientId, _Opts) ->
|
client_disconnected(Reason, ClientId, Opts) ->
|
||||||
Json = mochijson2:encode([{reason, reason(Reason)}, {ts, emqttd_vm:timestamp()}]),
|
Json = mochijson2:encode([{reason, reason(Reason)}, {ts, emqttd_vm:timestamp()}]),
|
||||||
emqttd_pubsub:publish(presence, #mqtt_message{topic = topic(disconnected, ClientId),
|
emqttd_pubsub:publish(presence, #mqtt_message{qos = proplists:get_value(qos, Opts, 0),
|
||||||
|
topic = topic(disconnected, ClientId),
|
||||||
payload = iolist_to_binary(Json)}).
|
payload = iolist_to_binary(Json)}).
|
||||||
|
|
||||||
unload(_Opts) ->
|
unload(_Opts) ->
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
|
|
||||||
-export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]).
|
-export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]).
|
||||||
|
|
||||||
-export([peername/1, sockname/1, format/2, format/1, connection_string/2]).
|
-export([peername/1, sockname/1, format/2, format/1, connection_string/2, ntoa/1]).
|
||||||
|
|
||||||
-define(FIRST_TEST_BIND_PORT, 10000).
|
-define(FIRST_TEST_BIND_PORT, 10000).
|
||||||
|
|
||||||
|
|
|
@ -121,7 +121,7 @@ received(Packet = ?PACKET(_Type), State) ->
|
||||||
{error, Reason, State}
|
{error, Reason, State}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername = {Addr, _}}) ->
|
handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) ->
|
||||||
|
|
||||||
#mqtt_packet_connect{proto_ver = ProtoVer,
|
#mqtt_packet_connect{proto_ver = ProtoVer,
|
||||||
proto_name = ProtoName,
|
proto_name = ProtoName,
|
||||||
|
@ -299,6 +299,9 @@ redeliver({?PUBREL, PacketId}, State) ->
|
||||||
shutdown(duplicate_id, _State) ->
|
shutdown(duplicate_id, _State) ->
|
||||||
quiet; %%
|
quiet; %%
|
||||||
|
|
||||||
|
shutdown(_, #proto_state{clientid = undefined}) ->
|
||||||
|
ignore;
|
||||||
|
|
||||||
shutdown(normal, #proto_state{peername = Peername, clientid = ClientId}) ->
|
shutdown(normal, #proto_state{peername = Peername, clientid = ClientId}) ->
|
||||||
lager:info([{client, ClientId}], "Client ~s@~s: normal shutdown",
|
lager:info([{client, ClientId}], "Client ~s@~s: normal shutdown",
|
||||||
[ClientId, emqttd_net:format(Peername)]),
|
[ClientId, emqttd_net:format(Peername)]),
|
||||||
|
|
|
@ -360,7 +360,7 @@ handle_info({dispatch, {_From, Message}}, State) ->
|
||||||
|
|
||||||
handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId,
|
handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId,
|
||||||
client_pid = ClientPid}) ->
|
client_pid = ClientPid}) ->
|
||||||
lager:error("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]),
|
lager:info("Session: client ~s@~p exited for ~p", [ClientId, ClientPid, Reason]),
|
||||||
{noreply, start_expire_timer(State#session_state{client_pid = undefined})};
|
{noreply, start_expire_timer(State#session_state{client_pid = undefined})};
|
||||||
|
|
||||||
handle_info({'EXIT', ClientPid0, _Reason}, State = #session_state{client_pid = ClientPid}) ->
|
handle_info({'EXIT', ClientPid0, _Reason}, State = #session_state{client_pid = ClientPid}) ->
|
||||||
|
|
|
@ -124,7 +124,7 @@
|
||||||
{modules, [
|
{modules, [
|
||||||
%% Client presence management module.
|
%% Client presence management module.
|
||||||
%% Publish messages when client connected or disconnected
|
%% Publish messages when client connected or disconnected
|
||||||
{presence, []},
|
{presence, [{qos, 0}]},
|
||||||
|
|
||||||
%% Subscribe topics automatically when client connected
|
%% Subscribe topics automatically when client connected
|
||||||
{autosub, [{"$Q/client/$c", 0}]}
|
{autosub, [{"$Q/client/$c", 0}]}
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 17afdf7fb8e148f376d63592bbf3dd4ccdf19e84
|
Subproject commit bdc690db847cec1c682e604dca571c72ff756305
|
Loading…
Reference in New Issue