authentication with clientid

This commit is contained in:
Ery Lee 2015-04-07 17:12:12 +08:00
parent 920547b9c5
commit 174226c0b0
7 changed files with 129 additions and 82 deletions

View File

@ -1,7 +1,7 @@
{application, emqttd, {application, emqttd,
[ [
{description, "Erlang MQTT Broker"}, {description, "Erlang MQTT Broker"},
{vsn, "0.5.4"}, {vsn, "0.6.0"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel, {applications, [kernel,

View File

@ -41,7 +41,7 @@
-define(AUTH_CLIENTID_TABLE, mqtt_auth_clientid). -define(AUTH_CLIENTID_TABLE, mqtt_auth_clientid).
-record(?AUTH_CLIENTID_TABLE, {clientid, password = undefined}). -record(?AUTH_CLIENTID_TABLE, {clientid, ipaddr, password}).
add_clientid(ClientId) when is_binary(ClientId) -> add_clientid(ClientId) when is_binary(ClientId) ->
R = #mqtt_auth_clientid{clientid = ClientId}, R = #mqtt_auth_clientid{clientid = ClientId},
@ -62,17 +62,24 @@ remove_clientid(ClientId) ->
init(Opts) -> init(Opts) ->
mnesia:create_table(?AUTH_CLIENTID_TABLE, [ mnesia:create_table(?AUTH_CLIENTID_TABLE, [
{type, set}, {ram_copies, [node()]},
{disc_copies, [node()]},
{attributes, record_info(fields, ?AUTH_CLIENTID_TABLE)}]), {attributes, record_info(fields, ?AUTH_CLIENTID_TABLE)}]),
mnesia:add_table_copy(?AUTH_CLIENTID_TABLE, node(), disc_copies), mnesia:add_table_copy(?AUTH_CLIENTID_TABLE, node(), ram_copies),
case proplists:get_value(file, Opts) of
undefined -> ok;
File -> load(File)
end,
{ok, Opts}. {ok, Opts}.
check(#mqtt_user{clientid = ClientId}, _Password, []) -> check(#mqtt_user{clientid = undefined}, _Password, []) ->
check_clientid_only(ClientId); {error, "ClientId undefined"};
check(#mqtt_user{clientid = ClientId}, _Password, [{password, no}|_]) -> check(#mqtt_user{clientid = ClientId, ipaddr = IpAddr}, _Password, []) ->
check_clientid_only(ClientId); check_clientid_only(ClientId, IpAddr);
check(#mqtt_user{clientid = ClientId}, Password, [{password, yes}|_]) -> check(#mqtt_user{clientid = ClientId, ipaddr = IpAddr}, _Password, [{password, no}|_]) ->
check_clientid_only(ClientId, IpAddr);
check(_User, undefined, [{password, yes}|_]) ->
{error, "Password undefined"};
check(#mqtt_user{clientid = ClientId}, Password, [{password, yes}|_]) ->
case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of
[] -> {error, "ClientId Not Found"}; [] -> {error, "ClientId Not Found"};
[#?AUTH_CLIENTID_TABLE{password = Password}] -> ok; %% TODO: plaintext?? [#?AUTH_CLIENTID_TABLE{password = Password}] -> ok; %% TODO: plaintext??
@ -85,10 +92,41 @@ description() -> "ClientId authentication module".
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
check_clientid_only(ClientId) -> load(File) ->
{ok, Fd} = file:open(File, [read]),
load(Fd, file:read_line(Fd), []).
load(Fd, {ok, Line}, Clients) when is_list(Line) ->
Clients1 =
case string:tokens(Line, " ") of
[ClientIdS] ->
ClientId = list_to_binary(string:strip(ClientIdS, right, $\n)),
[#mqtt_auth_clientid{clientid = ClientId} | Clients];
[ClientId, IpAddr0] ->
IpAddr = string:strip(IpAddr0, right, $\n),
Range = esockd_access:range(IpAddr),
[#mqtt_auth_clientid{clientid = list_to_binary(ClientId),
ipaddr = {IpAddr, Range}}|Clients];
BadLine ->
lager:error("BadLine in clients.config: ~s", [BadLine]),
Clients
end,
load(Fd, file:read_line(Fd), Clients1);
load(Fd, eof, Clients) ->
mnesia:transaction(fun() -> [mnesia:write(C) || C<- Clients] end),
file:close(Fd).
check_clientid_only(ClientId, IpAddr) ->
case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of
[] -> {error, "ClientId Not Found"}; [] -> {error, "ClientId Not Found"};
_ -> ok [#?AUTH_CLIENTID_TABLE{ipaddr = undefined}] -> ok;
[#?AUTH_CLIENTID_TABLE{ipaddr = {_, {Start, End}}}] ->
I = esockd_access:atoi(IpAddr),
case I >= Start andalso I =< End of
true -> ok;
false -> {error, "ClientId with wrong IP address"}
end
end. end.

View File

@ -62,18 +62,21 @@ all_users() ->
%%%============================================================================= %%%=============================================================================
init(Opts) -> init(Opts) ->
mnesia:create_table(?AUTH_USERNAME_TABLE, [ mnesia:create_table(?AUTH_USERNAME_TABLE, [
{type, set}, {ram_copies, [node()]},
{disc_copies, [node()]},
{attributes, record_info(fields, ?AUTH_USERNAME_TABLE)}]), {attributes, record_info(fields, ?AUTH_USERNAME_TABLE)}]),
mnesia:add_table_copy(?AUTH_USERNAME_TABLE, node(), disc_copies), mnesia:add_table_copy(?AUTH_USERNAME_TABLE, node(), ram_copies),
{ok, Opts}. {ok, Opts}.
check(#mqtt_user{username = undefined}, _Password, _Opts) ->
{error, "Username undefined"};
check(_User, undefined, _Opts) ->
{error, "Password undefined"};
check(#mqtt_user{username = Username}, Password, _Opts) -> check(#mqtt_user{username = Username}, Password, _Opts) ->
case mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username) of case mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username) of
[] -> [] ->
{error, "Username Not Found"}; {error, "Username Not Found"};
[#?AUTH_USERNAME_TABLE{password = <<Salt:4/binary, Hash>>}] -> [#?AUTH_USERNAME_TABLE{password = <<Salt:4/binary, Hash/binary>>}] ->
case Hash =:= hash(Salt, Password) of case Hash =:= md5_hash(Salt, Password) of
true -> ok; true -> ok;
false -> {error, "Password Not Right"} false -> {error, "Password Not Right"}
end end
@ -86,11 +89,11 @@ description() -> "Username password authentication module".
%%%============================================================================= %%%=============================================================================
hash(Password) -> hash(Password) ->
hash(salt(), Password). SaltBin = salt(),
<<SaltBin/binary, (md5_hash(SaltBin, Password))/binary>>.
hash(SaltBin, Password) -> md5_hash(SaltBin, Password) ->
Hash = erlang:md5(<<SaltBin/binary, Password/binary>>), erlang:md5(<<SaltBin/binary, Password/binary>>).
<<SaltBin/binary, Hash/binary>>.
salt() -> salt() ->
{A1,A2,A3} = now(), {A1,A2,A3} = now(),

View File

@ -46,7 +46,7 @@
%%Client State... %%Client State...
-record(state, {transport, -record(state, {transport,
socket, socket,
peer_name, peername,
conn_name, conn_name,
await_recv, await_recv,
conn_state, conn_state,
@ -66,14 +66,14 @@ info(Pid) ->
init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
%transform if ssl. %transform if ssl.
{ok, NewSock} = esockd_connection:accept(SockArgs), {ok, NewSock} = esockd_connection:accept(SockArgs),
{ok, Peername} = emqttd_net:peer_string(Sock), {ok, Peername} = emqttd_net:peername(Sock),
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
lager:info("Connect from ~s", [ConnStr]), lager:info("Connect from ~s", [ConnStr]),
ParserState = emqttd_parser:init(PacketOpts), ParserState = emqttd_parser:init(PacketOpts),
ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts), ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts),
State = control_throttle(#state{transport = Transport, State = control_throttle(#state{transport = Transport,
socket = NewSock, socket = NewSock,
peer_name = Peername, peername = Peername,
conn_name = ConnStr, conn_name = ConnStr,
await_recv = false, await_recv = false,
conn_state = running, conn_state = running,
@ -118,8 +118,8 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} =
handle_info({inet_reply, _Ref, ok}, State) -> handle_info({inet_reply, _Ref, ok}, State) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peer_name = PeerName, socket = Sock}) -> handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) ->
lager:debug("RECV from ~s: ~p", [PeerName, Data]), lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]),
emqttd_metrics:inc('bytes/received', size(Data)), emqttd_metrics:inc('bytes/received', size(Data)),
process_received_bytes(Data, process_received_bytes(Data,
control_throttle(State #state{await_recv = false})); control_throttle(State #state{await_recv = false}));
@ -127,31 +127,31 @@ handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peer_name = Pee
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
network_error(Reason, State); network_error(Reason, State);
handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peer_name = PeerName}) -> handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected inet_reply '~p'", [PeerName, Reason]), lager:critical("Client ~s: unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]),
{noreply, State}; {noreply, State};
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) -> handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) ->
lager:debug("Client ~s: Start KeepAlive with ~p seconds", [State#state.peer_name, TimeoutSec]), lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]),
KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
{noreply, State#state{ keepalive = KeepAlive }}; {noreply, State#state{ keepalive = KeepAlive }};
handle_info({keepalive, timeout}, State = #state{keepalive = KeepAlive}) -> handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) ->
case emqttd_keepalive:resume(KeepAlive) of case emqttd_keepalive:resume(KeepAlive) of
timeout -> timeout ->
lager:debug("Client ~s: Keepalive Timeout!", [State#state.peer_name]), lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]),
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
{resumed, KeepAlive1} -> {resumed, KeepAlive1} ->
lager:debug("Client ~s: Keepalive Resumed", [State#state.peer_name]), lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]),
{noreply, State#state{keepalive = KeepAlive1}} {noreply, State#state{keepalive = KeepAlive1}}
end; end;
handle_info(Info, State = #state{peer_name = PeerName}) -> handle_info(Info, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected info ~p",[PeerName, Info]), lager:critical("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]),
{stop, {badinfo, Info}, State}. {stop, {badinfo, Info}, State}.
terminate(Reason, #state{peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState}) -> terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state = ProtoState}) ->
lager:debug("Client ~s: ~p terminated, reason: ~p~n", [PeerName, self(), Reason]), lager:debug("Client ~s: ~p terminated, reason: ~p~n", [emqttd_net:format(Peername), self(), Reason]),
notify(disconnected, Reason, ProtoState), notify(disconnected, Reason, ProtoState),
emqttd_keepalive:cancel(KeepAlive), emqttd_keepalive:cancel(KeepAlive),
case {ProtoState, Reason} of case {ProtoState, Reason} of
@ -201,8 +201,9 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts,
end. end.
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
network_error(Reason, State = #state{peer_name = PeerName}) -> network_error(Reason, State = #state{peername = Peername}) ->
lager:warning("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]), lager:warning("Client ~s: MQTT detected network error '~p'",
[emqttd_net:format(Peername), Reason]),
stop({shutdown, conn_closed}, State). stop({shutdown, conn_closed}, State).
run_socket(State = #state{conn_state = blocked}) -> run_socket(State = #state{conn_state = blocked}) ->

View File

@ -79,10 +79,10 @@ cluster([SNode]) ->
end. end.
useradd([Username, Password]) -> useradd([Username, Password]) ->
?PRINT("~p", [emqttd_auth:add(list_to_binary(Username), list_to_binary(Password))]). ?PRINT("~p~n", [emqttd_auth_username:add_user(bin(Username), bin(Password))]).
userdel([Username]) -> userdel([Username]) ->
?PRINT("~p", [emqttd_auth:delete(list_to_binary(Username))]). ?PRINT("~p~n", [emqttd_auth_username:remove_user(bin(Username))]).
vm([]) -> vm([]) ->
[vm([Name]) || Name <- ["load", "memory", "process", "io"]]; [vm([Name]) || Name <- ["load", "memory", "process", "io"]];
@ -128,13 +128,13 @@ bridges(["list"]) ->
end, emqttd_bridge_sup:bridges()); end, emqttd_bridge_sup:bridges());
bridges(["start", SNode, Topic]) -> bridges(["start", SNode, Topic]) ->
case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic)) of
{ok, _} -> ?PRINT_MSG("bridge is started.~n"); {ok, _} -> ?PRINT_MSG("bridge is started.~n");
{error, Error} -> ?PRINT("error: ~p~n", [Error]) {error, Error} -> ?PRINT("error: ~p~n", [Error])
end; end;
bridges(["stop", SNode, Topic]) -> bridges(["stop", SNode, Topic]) ->
case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), bin(Topic)) of
ok -> ?PRINT_MSG("bridge is stopped.~n"); ok -> ?PRINT_MSG("bridge is stopped.~n");
{error, Error} -> ?PRINT("error: ~p~n", [Error]) {error, Error} -> ?PRINT("error: ~p~n", [Error])
end. end.
@ -184,3 +184,5 @@ loads() ->
ftos(F) -> ftos(F) ->
[S] = io_lib:format("~.2f", [F]), S. [S] = io_lib:format("~.2f", [F]), S.
bin(S) when is_list(S) -> list_to_binary(S);
bin(B) when is_binary(B) -> B.

View File

@ -30,7 +30,7 @@
-export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]). -export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]).
-export([peername/1, sockname/1, peer_string/1, connection_string/2]). -export([peername/1, sockname/1, format/2, format/1, connection_string/2]).
-include_lib("kernel/include/inet.hrl"). -include_lib("kernel/include/inet.hrl").
@ -200,16 +200,15 @@ setopts(Sock, Options) when is_port(Sock) ->
sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
peer_string(Sock) ->
case peername(Sock) of
{ok, {Addr, Port}} ->
{ok, lists:flatten(io_lib:format("~s:~p", [maybe_ntoab(Addr), Port]))};
Error ->
Error
end.
peername(Sock) when is_port(Sock) -> inet:peername(Sock). peername(Sock) when is_port(Sock) -> inet:peername(Sock).
format(sockname, SockName) ->
format(SockName);
format(peername, PeerName) ->
format(PeerName).
format({Addr, Port}) ->
lists:flatten(io_lib:format("~s:~p", [maybe_ntoab(Addr), Port])).
ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}); inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
ntoa(IP) -> ntoa(IP) ->

View File

@ -41,11 +41,12 @@
-record(proto_state, { -record(proto_state, {
transport, transport,
socket, socket,
peer_name, peername,
connected = false, %received CONNECT action? connected = false, %received CONNECT action?
proto_ver, proto_ver,
proto_name, proto_name,
%packet_id, %packet_id,
username,
client_id, client_id,
clean_sess, clean_sess,
session, %% session state or session pid session, %% session state or session pid
@ -59,7 +60,7 @@ init({Transport, Socket, Peername}, Opts) ->
#proto_state{ #proto_state{
transport = Transport, transport = Transport,
socket = Socket, socket = Socket,
peer_name = Peername, peername = Peername,
max_clientid_len = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN)}. max_clientid_len = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN)}.
client_id(#proto_state{client_id = ClientId}) -> ClientId. client_id(#proto_state{client_id = ClientId}) -> ClientId.
@ -90,9 +91,9 @@ received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
received(_Packet, State = #proto_state{connected = false}) -> received(_Packet, State = #proto_state{connected = false}) ->
{error, protocol_not_connected, State}; {error, protocol_not_connected, State};
received(Packet = ?PACKET(_Type), State = #proto_state{peer_name = PeerName, received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername,
client_id = ClientId}) -> client_id = ClientId}) ->
lager:debug("RECV from ~s@~s: ~s", [ClientId, PeerName, emqttd_packet:dump(Packet)]), lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]),
case validate_packet(Packet) of case validate_packet(Packet) of
ok -> ok ->
handle(Packet, State); handle(Packet, State);
@ -100,7 +101,7 @@ received(Packet = ?PACKET(_Type), State = #proto_state{peer_name = PeerName,
{error, Reason, State} {error, Reason, State}
end. end.
handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peer_name = PeerName}) -> handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = {Addr, _}}) ->
#mqtt_packet_connect{proto_ver = ProtoVer, #mqtt_packet_connect{proto_ver = ProtoVer,
username = Username, username = Username,
@ -109,33 +110,36 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peer_name = PeerName}
keep_alive = KeepAlive, keep_alive = KeepAlive,
client_id = ClientId} = Var, client_id = ClientId} = Var,
lager:debug("RECV from ~s@~s: ~s", [ClientId, PeerName, emqttd_packet:dump(Packet)]), lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]),
{ReturnCode1, State1} = State1 = State#proto_state{proto_ver = ProtoVer,
username = Username,
client_id = ClientId,
clean_sess = CleanSess},
{ReturnCode1, State2} =
case validate_connect(Var, State) of case validate_connect(Var, State) of
?CONNACK_ACCEPT -> ?CONNACK_ACCEPT ->
case emqttd_auth:check(Username, Password) of User = #mqtt_user{username = Username, ipaddr = Addr, clientid = ClientId},
true -> case emqttd_auth:login(User, Password) of
ok ->
ClientId1 = clientid(ClientId, State), ClientId1 = clientid(ClientId, State),
start_keepalive(KeepAlive), start_keepalive(KeepAlive),
emqttd_cm:register(ClientId1, self()), emqttd_cm:register(ClientId1, self()),
{?CONNACK_ACCEPT, State#proto_state{proto_ver = ProtoVer, {?CONNACK_ACCEPT, State1#proto_state{client_id = ClientId1,
client_id = ClientId1, will_msg = willmsg(Var)}};
clean_sess = CleanSess, {error, Reason}->
will_msg = willmsg(Var)}}; lager:error("~s@~s: username '~s' login failed - ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]),
false -> {?CONNACK_CREDENTIALS, State1}
lager:error("~s@~s: username '~s' login failed - no credentials", [ClientId, PeerName, Username]),
{?CONNACK_CREDENTIALS, State#proto_state{client_id = ClientId}}
end; end;
ReturnCode -> ReturnCode ->
{ReturnCode, State#proto_state{client_id = ClientId, {ReturnCode, State1}
clean_sess = CleanSess}}
end, end,
notify(connected, ReturnCode1, State1), notify(connected, ReturnCode1, State2),
send(?CONNACK_PACKET(ReturnCode1), State1), send(?CONNACK_PACKET(ReturnCode1), State2),
%%Starting session %%Starting session
{ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}), {ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}),
{ok, State1#proto_state{session = Session}}; {ok, State2#proto_state{session = Session}};
handle(Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), handle(Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload),
State = #proto_state{session = Session}) -> State = #proto_state{session = Session}) ->
@ -197,11 +201,11 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session =
{Message1, NewSession} = emqttd_session:store(Session, Message), {Message1, NewSession} = emqttd_session:store(Session, Message),
send(emqttd_message:to_packet(Message1), State#proto_state{session = NewSession}); send(emqttd_message:to_packet(Message1), State#proto_state{session = NewSession});
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) when is_record(Packet, mqtt_packet) -> send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername, client_id = ClientId}) when is_record(Packet, mqtt_packet) ->
lager:debug("SENT to ~s@~s: ~s", [ClientId, PeerName, emqttd_packet:dump(Packet)]), lager:debug("SENT to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]),
sent_stats(Packet), sent_stats(Packet),
Data = emqttd_serialiser:serialise(Packet), Data = emqttd_serialiser:serialise(Packet),
lager:debug("SENT to ~s: ~p", [PeerName, Data]), lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
emqttd_metrics:inc('bytes/sent', size(Data)), emqttd_metrics:inc('bytes/sent', size(Data)),
Transport:send(Sock, Data), Transport:send(Sock, Data),
{ok, State}. {ok, State}.
@ -212,17 +216,17 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_nam
redeliver({?PUBREL, PacketId}, State) -> redeliver({?PUBREL, PacketId}, State) ->
send(?PUBREL_PACKET(PacketId), State). send(?PUBREL_PACKET(PacketId), State).
shutdown(Error, #proto_state{peer_name = PeerName, client_id = ClientId, will_msg = WillMsg}) -> shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) ->
send_willmsg(WillMsg), send_willmsg(WillMsg),
try_unregister(ClientId, self()), try_unregister(ClientId, self()),
lager:debug("Protocol ~s@~s Shutdown: ~p", [ClientId, PeerName, Error]), lager:debug("Protocol ~s@~s Shutdown: ~p", [ClientId, emqttd_net:format(Peername), Error]),
ok. ok.
willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
emqttd_message:from_packet(Packet). emqttd_message:from_packet(Packet).
clientid(<<>>, #proto_state{peer_name = PeerName}) -> clientid(<<>>, #proto_state{peername = Peername}) ->
<<"eMQTT/", (base64:encode(PeerName))/binary>>; <<"eMQTT/", (base64:encode(emqttd_net:format(Peername)))/binary>>;
clientid(ClientId, _State) -> ClientId. clientid(ClientId, _State) -> ClientId.
@ -324,7 +328,7 @@ inc(?PINGRESP) ->
inc(_) -> inc(_) ->
ingore. ingore.
notify(connected, ReturnCode, #proto_state{peer_name = PeerName, notify(connected, ReturnCode, #proto_state{peername = Peername,
proto_ver = ProtoVer, proto_ver = ProtoVer,
client_id = ClientId, client_id = ClientId,
clean_sess = CleanSess}) -> clean_sess = CleanSess}) ->
@ -332,7 +336,7 @@ notify(connected, ReturnCode, #proto_state{peer_name = PeerName,
true -> false; true -> false;
false -> true false -> true
end, end,
Params = [{from, PeerName}, Params = [{from, emqttd_net:format(Peername)},
{protocol, ProtoVer}, {protocol, ProtoVer},
{session, Sess}, {session, Sess},
{connack, ReturnCode}], {connack, ReturnCode}],