peername
This commit is contained in:
parent
f9027ed1f4
commit
3f41a6c241
|
@ -47,17 +47,16 @@
|
||||||
code_change/3, terminate/2]).
|
code_change/3, terminate/2]).
|
||||||
|
|
||||||
%% Client State
|
%% Client State
|
||||||
-record(client_state, {connection, peername, peerhost, peerport,
|
-record(client_state, {connection, connname, peername, peerhost, peerport,
|
||||||
await_recv, conn_state, rate_limit,
|
await_recv, conn_state, rate_limit, parser_fun,
|
||||||
parser_fun, proto_state, packet_opts,
|
proto_state, packet_opts, keepalive}).
|
||||||
keepalive}).
|
|
||||||
|
|
||||||
-define(INFO_KEYS, [peername, peerhost, peerport, await_recv, conn_state]).
|
-define(INFO_KEYS, [peername, peerhost, peerport, await_recv, conn_state]).
|
||||||
|
|
||||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
||||||
|
|
||||||
-define(LOG(Level, Format, Args, State),
|
-define(LOG(Level, Format, Args, State),
|
||||||
lager:Level("Client(~s): " ++ Format, [State#client_state.peername | Args])).
|
lager:Level("Client(~s): " ++ Format, [State#client_state.connname | Args])).
|
||||||
|
|
||||||
start_link(Connection, MqttEnv) ->
|
start_link(Connection, MqttEnv) ->
|
||||||
{ok, proc_lib:spawn_link(?MODULE, init, [[Connection, MqttEnv]])}.
|
{ok, proc_lib:spawn_link(?MODULE, init, [[Connection, MqttEnv]])}.
|
||||||
|
@ -81,8 +80,8 @@ init([Connection0, MqttEnv]) ->
|
||||||
{ok, Connection} = Connection0:wait(),
|
{ok, Connection} = Connection0:wait(),
|
||||||
{PeerHost, PeerPort, PeerName} =
|
{PeerHost, PeerPort, PeerName} =
|
||||||
case Connection:peername() of
|
case Connection:peername() of
|
||||||
{ok, {Host, Port}} ->
|
{ok, Peer = {Host, Port}} ->
|
||||||
{Host, Port, esockd_net:format({Host, Port})};
|
{Host, Port, Peer};
|
||||||
{error, enotconn} ->
|
{error, enotconn} ->
|
||||||
Connection:fast_close(),
|
Connection:fast_close(),
|
||||||
exit(normal);
|
exit(normal);
|
||||||
|
@ -90,6 +89,7 @@ init([Connection0, MqttEnv]) ->
|
||||||
Connection:fast_close(),
|
Connection:fast_close(),
|
||||||
exit({shutdown, Reason})
|
exit({shutdown, Reason})
|
||||||
end,
|
end,
|
||||||
|
ConnName = esockd_net:format(PeerName),
|
||||||
SendFun = fun(Data) ->
|
SendFun = fun(Data) ->
|
||||||
try Connection:async_send(Data) of
|
try Connection:async_send(Data) of
|
||||||
true -> ok
|
true -> ok
|
||||||
|
@ -102,6 +102,7 @@ init([Connection0, MqttEnv]) ->
|
||||||
ProtoState = emqttd_protocol:init(PeerName, SendFun, PktOpts),
|
ProtoState = emqttd_protocol:init(PeerName, SendFun, PktOpts),
|
||||||
RateLimit = proplists:get_value(rate_limit, Connection:opts()),
|
RateLimit = proplists:get_value(rate_limit, Connection:opts()),
|
||||||
State = run_socket(#client_state{connection = Connection,
|
State = run_socket(#client_state{connection = Connection,
|
||||||
|
connname = ConnName,
|
||||||
peername = PeerName,
|
peername = PeerName,
|
||||||
peerhost = PeerHost,
|
peerhost = PeerHost,
|
||||||
peerport = PeerPort,
|
peerport = PeerPort,
|
||||||
|
|
|
@ -74,13 +74,15 @@ connack_name(?CONNACK_AUTH) -> 'CONNACK_AUTH'.
|
||||||
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->
|
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->
|
||||||
format_header(Header, format_variable(Variable, Payload)).
|
format_header(Header, format_variable(Variable, Payload)).
|
||||||
|
|
||||||
format_header(#mqtt_packet_header{type = Type, dup = Dup, qos = QoS, retain = Retain}, S) ->
|
format_header(#mqtt_packet_header{type = Type,
|
||||||
S1 =
|
dup = Dup,
|
||||||
if
|
qos = QoS,
|
||||||
S == undefined -> <<>>;
|
retain = Retain}, S) ->
|
||||||
true -> [", ", S]
|
S1 = if
|
||||||
end,
|
S == undefined -> <<>>;
|
||||||
io_lib:format("~s(Qos=~p, Retain=~s, Dup=~s~s)", [type_name(Type), QoS, Retain, Dup, S1]).
|
true -> [", ", S]
|
||||||
|
end,
|
||||||
|
io_lib:format("~s(Q~p, R~p, D~p~s)", [type_name(Type), QoS, i(Retain), i(Dup), S1]).
|
||||||
|
|
||||||
format_variable(undefined, _) ->
|
format_variable(undefined, _) ->
|
||||||
undefined;
|
undefined;
|
||||||
|
@ -105,8 +107,8 @@ format_variable(#mqtt_packet_connect{
|
||||||
Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s",
|
Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s",
|
||||||
Args = [ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, format_password(Password)],
|
Args = [ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, format_password(Password)],
|
||||||
{Format1, Args1} = if
|
{Format1, Args1} = if
|
||||||
WillFlag -> { Format ++ ", Will(Qos=~p, Retain=~s, Topic=~s, Msg=~s)",
|
WillFlag -> { Format ++ ", Will(Q~p, R~p, Topic=~s, Msg=~s)",
|
||||||
Args ++ [ WillQoS, WillRetain, WillTopic, WillMsg ] };
|
Args ++ [WillQoS, i(WillRetain), WillTopic, WillMsg] };
|
||||||
true -> {Format, Args}
|
true -> {Format, Args}
|
||||||
end,
|
end,
|
||||||
io_lib:format(Format1, Args1);
|
io_lib:format(Format1, Args1);
|
||||||
|
@ -145,3 +147,6 @@ format_variable(undefined) -> undefined.
|
||||||
format_password(undefined) -> undefined;
|
format_password(undefined) -> undefined;
|
||||||
format_password(_Password) -> '******'.
|
format_password(_Password) -> '******'.
|
||||||
|
|
||||||
|
i(true) -> 1;
|
||||||
|
i(false) -> 0;
|
||||||
|
i(I) when is_integer(I) -> I.
|
||||||
|
|
|
@ -56,7 +56,7 @@
|
||||||
|
|
||||||
-define(LOG(Level, Format, Args, State),
|
-define(LOG(Level, Format, Args, State),
|
||||||
lager:Level([{client, State#proto_state.client_id}], "Client(~s@~s): " ++ Format,
|
lager:Level([{client, State#proto_state.client_id}], "Client(~s@~s): " ++ Format,
|
||||||
[State#proto_state.client_id, State#proto_state.peername | Args])).
|
[State#proto_state.client_id, esockd_net:format(State#proto_state.peername) | Args])).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Init protocol
|
%% @doc Init protocol
|
||||||
|
@ -269,14 +269,10 @@ send(Packet, State = #proto_state{sendfun = SendFun})
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
trace(recv, Packet, ProtoState) ->
|
trace(recv, Packet, ProtoState) ->
|
||||||
trace2("RECV <-", Packet, ProtoState);
|
?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);
|
||||||
|
|
||||||
trace(send, Packet, ProtoState) ->
|
trace(send, Packet, ProtoState) ->
|
||||||
trace2("SEND ->", Packet, ProtoState).
|
?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState).
|
||||||
|
|
||||||
trace2(Tag, Packet, #proto_state{peername = Peername, client_id = ClientId}) ->
|
|
||||||
lager:info([{client, ClientId}], "Client(~s@~s): ~s ~s",
|
|
||||||
[ClientId, Peername, Tag, emqttd_packet:format(Packet)]).
|
|
||||||
|
|
||||||
%% @doc redeliver PUBREL PacketId
|
%% @doc redeliver PUBREL PacketId
|
||||||
redeliver({?PUBREL, PacketId}, State) ->
|
redeliver({?PUBREL, PacketId}, State) ->
|
||||||
|
@ -289,7 +285,7 @@ shutdown(confict, #proto_state{client_id = ClientId}) ->
|
||||||
emqttd_cm:unregister(ClientId);
|
emqttd_cm:unregister(ClientId);
|
||||||
|
|
||||||
shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) ->
|
shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) ->
|
||||||
?LOG(info, "shutdown for ~p", [Error], State),
|
?LOG(info, "Shutdown for ~p", [Error], State),
|
||||||
send_willmsg(ClientId, WillMsg),
|
send_willmsg(ClientId, WillMsg),
|
||||||
emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]),
|
emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]),
|
||||||
emqttd_cm:unregister(ClientId).
|
emqttd_cm:unregister(ClientId).
|
||||||
|
|
Loading…
Reference in New Issue