From 174226c0b018b38b07774833b1343c457b6a9358 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Tue, 7 Apr 2015 17:12:12 +0800 Subject: [PATCH] authentication with clientid --- apps/emqttd/src/emqttd.app.src | 2 +- apps/emqttd/src/emqttd_auth_clientid.erl | 60 ++++++++++++++++++---- apps/emqttd/src/emqttd_auth_username.erl | 21 ++++---- apps/emqttd/src/emqttd_client.erl | 37 +++++++------- apps/emqttd/src/emqttd_ctl.erl | 10 ++-- apps/emqttd/src/emqttd_net.erl | 17 +++---- apps/emqttd/src/emqttd_protocol.erl | 64 +++++++++++++----------- 7 files changed, 129 insertions(+), 82 deletions(-) diff --git a/apps/emqttd/src/emqttd.app.src b/apps/emqttd/src/emqttd.app.src index 3c0bbd24c..26053005b 100644 --- a/apps/emqttd/src/emqttd.app.src +++ b/apps/emqttd/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.5.4"}, + {vsn, "0.6.0"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/apps/emqttd/src/emqttd_auth_clientid.erl b/apps/emqttd/src/emqttd_auth_clientid.erl index bc9c5c132..4f45d02b2 100644 --- a/apps/emqttd/src/emqttd_auth_clientid.erl +++ b/apps/emqttd/src/emqttd_auth_clientid.erl @@ -41,7 +41,7 @@ -define(AUTH_CLIENTID_TABLE, mqtt_auth_clientid). --record(?AUTH_CLIENTID_TABLE, {clientid, password = undefined}). +-record(?AUTH_CLIENTID_TABLE, {clientid, ipaddr, password}). add_clientid(ClientId) when is_binary(ClientId) -> R = #mqtt_auth_clientid{clientid = ClientId}, @@ -62,17 +62,24 @@ remove_clientid(ClientId) -> init(Opts) -> mnesia:create_table(?AUTH_CLIENTID_TABLE, [ - {type, set}, - {disc_copies, [node()]}, + {ram_copies, [node()]}, {attributes, record_info(fields, ?AUTH_CLIENTID_TABLE)}]), - mnesia:add_table_copy(?AUTH_CLIENTID_TABLE, node(), disc_copies), + mnesia:add_table_copy(?AUTH_CLIENTID_TABLE, node(), ram_copies), + case proplists:get_value(file, Opts) of + undefined -> ok; + File -> load(File) + end, {ok, Opts}. -check(#mqtt_user{clientid = ClientId}, _Password, []) -> - check_clientid_only(ClientId); -check(#mqtt_user{clientid = ClientId}, _Password, [{password, no}|_]) -> - check_clientid_only(ClientId); -check(#mqtt_user{clientid = ClientId}, Password, [{password, yes}|_]) -> +check(#mqtt_user{clientid = undefined}, _Password, []) -> + {error, "ClientId undefined"}; +check(#mqtt_user{clientid = ClientId, ipaddr = IpAddr}, _Password, []) -> + check_clientid_only(ClientId, IpAddr); +check(#mqtt_user{clientid = ClientId, ipaddr = IpAddr}, _Password, [{password, no}|_]) -> + check_clientid_only(ClientId, IpAddr); +check(_User, undefined, [{password, yes}|_]) -> + {error, "Password undefined"}; +check(#mqtt_user{clientid = ClientId}, Password, [{password, yes}|_]) -> case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of [] -> {error, "ClientId Not Found"}; [#?AUTH_CLIENTID_TABLE{password = Password}] -> ok; %% TODO: plaintext?? @@ -85,10 +92,41 @@ description() -> "ClientId authentication module". %%% Internal functions %%%============================================================================= -check_clientid_only(ClientId) -> +load(File) -> + {ok, Fd} = file:open(File, [read]), + load(Fd, file:read_line(Fd), []). + +load(Fd, {ok, Line}, Clients) when is_list(Line) -> + Clients1 = + case string:tokens(Line, " ") of + [ClientIdS] -> + ClientId = list_to_binary(string:strip(ClientIdS, right, $\n)), + [#mqtt_auth_clientid{clientid = ClientId} | Clients]; + [ClientId, IpAddr0] -> + IpAddr = string:strip(IpAddr0, right, $\n), + Range = esockd_access:range(IpAddr), + [#mqtt_auth_clientid{clientid = list_to_binary(ClientId), + ipaddr = {IpAddr, Range}}|Clients]; + BadLine -> + lager:error("BadLine in clients.config: ~s", [BadLine]), + Clients + end, + load(Fd, file:read_line(Fd), Clients1); + +load(Fd, eof, Clients) -> + mnesia:transaction(fun() -> [mnesia:write(C) || C<- Clients] end), + file:close(Fd). + +check_clientid_only(ClientId, IpAddr) -> case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of [] -> {error, "ClientId Not Found"}; - _ -> ok + [#?AUTH_CLIENTID_TABLE{ipaddr = undefined}] -> ok; + [#?AUTH_CLIENTID_TABLE{ipaddr = {_, {Start, End}}}] -> + I = esockd_access:atoi(IpAddr), + case I >= Start andalso I =< End of + true -> ok; + false -> {error, "ClientId with wrong IP address"} + end end. diff --git a/apps/emqttd/src/emqttd_auth_username.erl b/apps/emqttd/src/emqttd_auth_username.erl index c6d1c5b38..187578b66 100644 --- a/apps/emqttd/src/emqttd_auth_username.erl +++ b/apps/emqttd/src/emqttd_auth_username.erl @@ -62,18 +62,21 @@ all_users() -> %%%============================================================================= init(Opts) -> mnesia:create_table(?AUTH_USERNAME_TABLE, [ - {type, set}, - {disc_copies, [node()]}, + {ram_copies, [node()]}, {attributes, record_info(fields, ?AUTH_USERNAME_TABLE)}]), - mnesia:add_table_copy(?AUTH_USERNAME_TABLE, node(), disc_copies), + mnesia:add_table_copy(?AUTH_USERNAME_TABLE, node(), ram_copies), {ok, Opts}. +check(#mqtt_user{username = undefined}, _Password, _Opts) -> + {error, "Username undefined"}; +check(_User, undefined, _Opts) -> + {error, "Password undefined"}; check(#mqtt_user{username = Username}, Password, _Opts) -> case mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username) of [] -> {error, "Username Not Found"}; - [#?AUTH_USERNAME_TABLE{password = <>}] -> - case Hash =:= hash(Salt, Password) of + [#?AUTH_USERNAME_TABLE{password = <>}] -> + case Hash =:= md5_hash(Salt, Password) of true -> ok; false -> {error, "Password Not Right"} end @@ -86,11 +89,11 @@ description() -> "Username password authentication module". %%%============================================================================= hash(Password) -> - hash(salt(), Password). + SaltBin = salt(), + <>. -hash(SaltBin, Password) -> - Hash = erlang:md5(<>), - <>. +md5_hash(SaltBin, Password) -> + erlang:md5(<>). salt() -> {A1,A2,A3} = now(), diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 1b60b69cc..a7401aeb7 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -46,7 +46,7 @@ %%Client State... -record(state, {transport, socket, - peer_name, + peername, conn_name, await_recv, conn_state, @@ -66,14 +66,14 @@ info(Pid) -> init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> %transform if ssl. {ok, NewSock} = esockd_connection:accept(SockArgs), - {ok, Peername} = emqttd_net:peer_string(Sock), + {ok, Peername} = emqttd_net:peername(Sock), {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), lager:info("Connect from ~s", [ConnStr]), ParserState = emqttd_parser:init(PacketOpts), ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts), State = control_throttle(#state{transport = Transport, socket = NewSock, - peer_name = Peername, + peername = Peername, conn_name = ConnStr, await_recv = false, conn_state = running, @@ -118,8 +118,8 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = handle_info({inet_reply, _Ref, ok}, State) -> {noreply, State, hibernate}; -handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peer_name = PeerName, socket = Sock}) -> - lager:debug("RECV from ~s: ~p", [PeerName, Data]), +handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) -> + lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]), emqttd_metrics:inc('bytes/received', size(Data)), process_received_bytes(Data, control_throttle(State #state{await_recv = false})); @@ -127,31 +127,31 @@ handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peer_name = Pee handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> network_error(Reason, State); -handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peer_name = PeerName}) -> - lager:critical("Client ~s: unexpected inet_reply '~p'", [PeerName, Reason]), +handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peername}) -> + lager:critical("Client ~s: unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]), {noreply, State}; -handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) -> - lager:debug("Client ~s: Start KeepAlive with ~p seconds", [State#state.peer_name, TimeoutSec]), +handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> + lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]), KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}), {noreply, State#state{ keepalive = KeepAlive }}; -handle_info({keepalive, timeout}, State = #state{keepalive = KeepAlive}) -> +handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) -> case emqttd_keepalive:resume(KeepAlive) of timeout -> - lager:debug("Client ~s: Keepalive Timeout!", [State#state.peer_name]), + lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]), stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); {resumed, KeepAlive1} -> - lager:debug("Client ~s: Keepalive Resumed", [State#state.peer_name]), + lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]), {noreply, State#state{keepalive = KeepAlive1}} end; -handle_info(Info, State = #state{peer_name = PeerName}) -> - lager:critical("Client ~s: unexpected info ~p",[PeerName, Info]), +handle_info(Info, State = #state{peername = Peername}) -> + lager:critical("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]), {stop, {badinfo, Info}, State}. -terminate(Reason, #state{peer_name = PeerName, keepalive = KeepAlive, proto_state = ProtoState}) -> - lager:debug("Client ~s: ~p terminated, reason: ~p~n", [PeerName, self(), Reason]), +terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state = ProtoState}) -> + lager:debug("Client ~s: ~p terminated, reason: ~p~n", [emqttd_net:format(Peername), self(), Reason]), notify(disconnected, Reason, ProtoState), emqttd_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of @@ -201,8 +201,9 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts, end. %%---------------------------------------------------------------------------- -network_error(Reason, State = #state{peer_name = PeerName}) -> - lager:warning("Client ~s: MQTT detected network error '~p'", [PeerName, Reason]), +network_error(Reason, State = #state{peername = Peername}) -> + lager:warning("Client ~s: MQTT detected network error '~p'", + [emqttd_net:format(Peername), Reason]), stop({shutdown, conn_closed}, State). run_socket(State = #state{conn_state = blocked}) -> diff --git a/apps/emqttd/src/emqttd_ctl.erl b/apps/emqttd/src/emqttd_ctl.erl index db6071827..23106844c 100644 --- a/apps/emqttd/src/emqttd_ctl.erl +++ b/apps/emqttd/src/emqttd_ctl.erl @@ -79,10 +79,10 @@ cluster([SNode]) -> end. useradd([Username, Password]) -> - ?PRINT("~p", [emqttd_auth:add(list_to_binary(Username), list_to_binary(Password))]). + ?PRINT("~p~n", [emqttd_auth_username:add_user(bin(Username), bin(Password))]). userdel([Username]) -> - ?PRINT("~p", [emqttd_auth:delete(list_to_binary(Username))]). + ?PRINT("~p~n", [emqttd_auth_username:remove_user(bin(Username))]). vm([]) -> [vm([Name]) || Name <- ["load", "memory", "process", "io"]]; @@ -128,13 +128,13 @@ bridges(["list"]) -> end, emqttd_bridge_sup:bridges()); bridges(["start", SNode, Topic]) -> - case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), list_to_binary(Topic)) of + case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic)) of {ok, _} -> ?PRINT_MSG("bridge is started.~n"); {error, Error} -> ?PRINT("error: ~p~n", [Error]) end; bridges(["stop", SNode, Topic]) -> - case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), list_to_binary(Topic)) of + case emqttd_bridge_sup:stop_bridge(list_to_atom(SNode), bin(Topic)) of ok -> ?PRINT_MSG("bridge is stopped.~n"); {error, Error} -> ?PRINT("error: ~p~n", [Error]) end. @@ -184,3 +184,5 @@ loads() -> ftos(F) -> [S] = io_lib:format("~.2f", [F]), S. +bin(S) when is_list(S) -> list_to_binary(S); +bin(B) when is_binary(B) -> B. diff --git a/apps/emqttd/src/emqttd_net.erl b/apps/emqttd/src/emqttd_net.erl index 7049118de..199fcd4fd 100644 --- a/apps/emqttd/src/emqttd_net.erl +++ b/apps/emqttd/src/emqttd_net.erl @@ -30,7 +30,7 @@ -export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]). --export([peername/1, sockname/1, peer_string/1, connection_string/2]). +-export([peername/1, sockname/1, format/2, format/1, connection_string/2]). -include_lib("kernel/include/inet.hrl"). @@ -200,16 +200,15 @@ setopts(Sock, Options) when is_port(Sock) -> sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). -peer_string(Sock) -> - case peername(Sock) of - {ok, {Addr, Port}} -> - {ok, lists:flatten(io_lib:format("~s:~p", [maybe_ntoab(Addr), Port]))}; - Error -> - Error - end. - peername(Sock) when is_port(Sock) -> inet:peername(Sock). +format(sockname, SockName) -> + format(SockName); +format(peername, PeerName) -> + format(PeerName). +format({Addr, Port}) -> + lists:flatten(io_lib:format("~s:~p", [maybe_ntoab(Addr), Port])). + ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}); ntoa(IP) -> diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index d49926d33..7629a46e3 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -41,11 +41,12 @@ -record(proto_state, { transport, socket, - peer_name, + peername, connected = false, %received CONNECT action? proto_ver, proto_name, %packet_id, + username, client_id, clean_sess, session, %% session state or session pid @@ -59,7 +60,7 @@ init({Transport, Socket, Peername}, Opts) -> #proto_state{ transport = Transport, socket = Socket, - peer_name = Peername, + peername = Peername, max_clientid_len = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN)}. client_id(#proto_state{client_id = ClientId}) -> ClientId. @@ -90,9 +91,9 @@ received(?PACKET(?CONNECT), State = #proto_state{connected = true}) -> received(_Packet, State = #proto_state{connected = false}) -> {error, protocol_not_connected, State}; -received(Packet = ?PACKET(_Type), State = #proto_state{peer_name = PeerName, +received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername, client_id = ClientId}) -> - lager:debug("RECV from ~s@~s: ~s", [ClientId, PeerName, emqttd_packet:dump(Packet)]), + lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]), case validate_packet(Packet) of ok -> handle(Packet, State); @@ -100,7 +101,7 @@ received(Packet = ?PACKET(_Type), State = #proto_state{peer_name = PeerName, {error, Reason, State} end. -handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peer_name = PeerName}) -> +handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = {Addr, _}}) -> #mqtt_packet_connect{proto_ver = ProtoVer, username = Username, @@ -109,33 +110,36 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peer_name = PeerName} keep_alive = KeepAlive, client_id = ClientId} = Var, - lager:debug("RECV from ~s@~s: ~s", [ClientId, PeerName, emqttd_packet:dump(Packet)]), + lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]), - {ReturnCode1, State1} = + State1 = State#proto_state{proto_ver = ProtoVer, + username = Username, + client_id = ClientId, + clean_sess = CleanSess}, + {ReturnCode1, State2} = case validate_connect(Var, State) of ?CONNACK_ACCEPT -> - case emqttd_auth:check(Username, Password) of - true -> + User = #mqtt_user{username = Username, ipaddr = Addr, clientid = ClientId}, + case emqttd_auth:login(User, Password) of + ok -> ClientId1 = clientid(ClientId, State), start_keepalive(KeepAlive), emqttd_cm:register(ClientId1, self()), - {?CONNACK_ACCEPT, State#proto_state{proto_ver = ProtoVer, - client_id = ClientId1, - clean_sess = CleanSess, - will_msg = willmsg(Var)}}; - false -> - lager:error("~s@~s: username '~s' login failed - no credentials", [ClientId, PeerName, Username]), - {?CONNACK_CREDENTIALS, State#proto_state{client_id = ClientId}} + {?CONNACK_ACCEPT, State1#proto_state{client_id = ClientId1, + will_msg = willmsg(Var)}}; + {error, Reason}-> + lager:error("~s@~s: username '~s' login failed - ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]), + {?CONNACK_CREDENTIALS, State1} + end; ReturnCode -> - {ReturnCode, State#proto_state{client_id = ClientId, - clean_sess = CleanSess}} + {ReturnCode, State1} end, - notify(connected, ReturnCode1, State1), - send(?CONNACK_PACKET(ReturnCode1), State1), + notify(connected, ReturnCode1, State2), + send(?CONNACK_PACKET(ReturnCode1), State2), %%Starting session {ok, Session} = emqttd_session:start({CleanSess, ClientId, self()}), - {ok, State1#proto_state{session = Session}}; + {ok, State2#proto_state{session = Session}}; handle(Packet = ?PUBLISH_PACKET(?QOS_0, _Topic, _PacketId, _Payload), State = #proto_state{session = Session}) -> @@ -197,11 +201,11 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = {Message1, NewSession} = emqttd_session:store(Session, Message), send(emqttd_message:to_packet(Message1), State#proto_state{session = NewSession}); -send(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_name = PeerName, client_id = ClientId}) when is_record(Packet, mqtt_packet) -> - lager:debug("SENT to ~s@~s: ~s", [ClientId, PeerName, emqttd_packet:dump(Packet)]), +send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername, client_id = ClientId}) when is_record(Packet, mqtt_packet) -> + lager:debug("SENT to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]), sent_stats(Packet), Data = emqttd_serialiser:serialise(Packet), - lager:debug("SENT to ~s: ~p", [PeerName, Data]), + lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]), emqttd_metrics:inc('bytes/sent', size(Data)), Transport:send(Sock, Data), {ok, State}. @@ -212,17 +216,17 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peer_nam redeliver({?PUBREL, PacketId}, State) -> send(?PUBREL_PACKET(PacketId), State). -shutdown(Error, #proto_state{peer_name = PeerName, client_id = ClientId, will_msg = WillMsg}) -> +shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) -> send_willmsg(WillMsg), try_unregister(ClientId, self()), - lager:debug("Protocol ~s@~s Shutdown: ~p", [ClientId, PeerName, Error]), + lager:debug("Protocol ~s@~s Shutdown: ~p", [ClientId, emqttd_net:format(Peername), Error]), ok. willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> emqttd_message:from_packet(Packet). -clientid(<<>>, #proto_state{peer_name = PeerName}) -> - <<"eMQTT/", (base64:encode(PeerName))/binary>>; +clientid(<<>>, #proto_state{peername = Peername}) -> + <<"eMQTT/", (base64:encode(emqttd_net:format(Peername)))/binary>>; clientid(ClientId, _State) -> ClientId. @@ -324,7 +328,7 @@ inc(?PINGRESP) -> inc(_) -> ingore. -notify(connected, ReturnCode, #proto_state{peer_name = PeerName, +notify(connected, ReturnCode, #proto_state{peername = Peername, proto_ver = ProtoVer, client_id = ClientId, clean_sess = CleanSess}) -> @@ -332,7 +336,7 @@ notify(connected, ReturnCode, #proto_state{peer_name = PeerName, true -> false; false -> true end, - Params = [{from, PeerName}, + Params = [{from, emqttd_net:format(Peername)}, {protocol, ProtoVer}, {session, Sess}, {connack, ReturnCode}],