From d3616a27015c4f414e63fe3dedc9ce96fa2b223a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 28 May 2015 21:55:20 +0800 Subject: [PATCH] client presence management --- apps/emqttd/include/emqttd.hrl | 7 +- apps/emqttd/src/emqttd_access_rule.erl | 4 +- apps/emqttd/src/emqttd_auth_clientid.erl | 8 +- apps/emqttd/src/emqttd_client.erl | 32 ++---- apps/emqttd/src/emqttd_cm.erl | 6 +- apps/emqttd/src/emqttd_mod_autosub.erl | 22 +++- apps/emqttd/src/emqttd_mod_presence.erl | 44 ++++++-- apps/emqttd/src/emqttd_protocol.erl | 135 ++++++++++++----------- apps/emqttd/src/emqttd_util.erl | 10 +- apps/emqttd/src/emqttd_vm.erl | 11 ++ 10 files changed, 162 insertions(+), 117 deletions(-) diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index 9c2ab934a..182959df6 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -77,9 +77,12 @@ %% MQTT Client %%------------------------------------------------------------------------------ -record(mqtt_client, { - clientid :: binary(), + clientid :: binary() | undefined, username :: binary() | undefined, - ipaddr :: inet:ip_address() + ipaddress :: inet:ip_address(), + clean_sess :: boolean(), + proto_ver :: 3 | 4, + client_pid :: pid() }). -type mqtt_client() :: #mqtt_client{}. diff --git a/apps/emqttd/src/emqttd_access_rule.erl b/apps/emqttd/src/emqttd_access_rule.erl index 8f7761822..a492136cf 100644 --- a/apps/emqttd/src/emqttd_access_rule.erl +++ b/apps/emqttd/src/emqttd_access_rule.erl @@ -114,9 +114,9 @@ match_who(#mqtt_client{clientid = ClientId}, {client, ClientId}) -> true; match_who(#mqtt_client{username = Username}, {user, Username}) -> true; -match_who(#mqtt_client{ipaddr = undefined}, {ipaddr, _Tup}) -> +match_who(#mqtt_client{ipaddress = undefined}, {ipaddr, _Tup}) -> false; -match_who(#mqtt_client{ipaddr = IP}, {ipaddr, {_CDIR, Start, End}}) -> +match_who(#mqtt_client{ipaddress = IP}, {ipaddr, {_CDIR, Start, End}}) -> I = esockd_access:atoi(IP), I >= Start andalso I =< End; match_who(_Client, _Who) -> diff --git a/apps/emqttd/src/emqttd_auth_clientid.erl b/apps/emqttd/src/emqttd_auth_clientid.erl index 2f3d51617..e5171a239 100644 --- a/apps/emqttd/src/emqttd_auth_clientid.erl +++ b/apps/emqttd/src/emqttd_auth_clientid.erl @@ -101,10 +101,10 @@ init(Opts) -> check(#mqtt_client{clientid = undefined}, _Password, []) -> {error, "ClientId undefined"}; -check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, []) -> - check_clientid_only(ClientId, IpAddr); -check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, [{password, no}|_]) -> - check_clientid_only(ClientId, IpAddr); +check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, []) -> + check_clientid_only(ClientId, IpAddress); +check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) -> + check_clientid_only(ClientId, IpAddress); check(_Client, undefined, [{password, yes}|_]) -> {error, "Password undefined"}; check(#mqtt_client{clientid = ClientId}, Password, [{password, yes}|_]) -> diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 0fd1db69a..3d8e170b4 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -87,11 +87,13 @@ handle_call(info, _From, State = #state{conn_name=ConnName, proto_state = ProtoState}) -> {reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State}; -handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. +handle_call(Req, _From, State = #state{peername = Peername}) -> + lager:critical("Client ~s: unexpected request - ~p",[emqttd_net:format(Peername), Req]), + {reply, {error, unsupported_request}, State}. -handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. +handle_cast(Msg, State = #state{peername = Peername}) -> + lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), + {noreply, State}. handle_info(timeout, State) -> stop({shutdown, timeout}, State); @@ -102,7 +104,7 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState %% need transfer data??? %% emqttd_client:transfer(NewPid, Data), lager:error("Shutdown for duplicate clientid: ~s, conn:~s", - [emqttd_protocol:clientid(ProtoState), ConnName]), + [emqttd_protocol:clientid(ProtoState), ConnName]), stop({shutdown, duplicate_id}, State); %%TODO: ok?? @@ -158,17 +160,16 @@ handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive handle_info(Info, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]), - {stop, {badinfo, Info}, State}. + {noreply, State}. terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state = ProtoState}) -> - lager:info("Client ~s: ~p terminated, reason: ~p~n", [emqttd_net:format(Peername), self(), Reason]), - notify(disconnected, Reason, ProtoState), + lager:info("Client ~s terminated, reason: ~p", [emqttd_net:format(Peername), Reason]), emqttd_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of {undefined, _} -> ok; {_, {shutdown, Error}} -> emqttd_protocol:shutdown(Error, ProtoState); - {_, Reason} -> + {_, Reason} -> emqttd_protocol:shutdown(Reason, ProtoState) end. @@ -231,7 +232,7 @@ control_throttle(State = #state{conn_state = Flow, {_, _} -> run_socket(State) end. -stop(Reason, State ) -> +stop(Reason, State) -> {stop, Reason, State}. received_stats(?PACKET(Type)) -> @@ -253,14 +254,3 @@ inc(?DISCONNECT) -> inc(_) -> ignore. -%%TODO: should be moved to emqttd_protocol... for event emitted when protocol shutdown... -notify(disconnected, _Reason, undefined) -> ingore; - -notify(disconnected, {shutdown, Reason}, ProtoState) -> - %emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason}); - ok; - -notify(disconnected, Reason, ProtoState) -> - %emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason}). - ok. - diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 7758f8ec0..1c109536e 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -28,6 +28,8 @@ -author("Feng Lee "). +-include("emqttd.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). @@ -69,10 +71,10 @@ table() -> ?CLIENT_TAB. %% @doc Lookup client pid with clientId %% @end %%------------------------------------------------------------------------------ --spec lookup(ClientId :: binary()) -> pid() | undefined. +-spec lookup(ClientId :: binary()) -> mqtt_client() | undefined. lookup(ClientId) when is_binary(ClientId) -> case ets:lookup(?CLIENT_TAB, ClientId) of - [{_, Pid, _}] -> Pid; + [Client] -> Client; [] -> undefined end. diff --git a/apps/emqttd/src/emqttd_mod_autosub.erl b/apps/emqttd/src/emqttd_mod_autosub.erl index 0ed1be5e1..a28db1642 100644 --- a/apps/emqttd/src/emqttd_mod_autosub.erl +++ b/apps/emqttd/src/emqttd_mod_autosub.erl @@ -29,22 +29,32 @@ -author("Feng Lee "). +-include_lib("emqtt/include/emqtt.hrl"). + +-include_lib("emqtt/include/emqtt_packet.hrl"). + +-include("emqttd.hrl"). + -behaviour(emqttd_gen_mod). --export([load/1, subscribe/2, unload/1]). +-export([load/1, client_connected/3, unload/1]). -record(state, {topics}). load(Opts) -> Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], - emqttd_broker:hook(client_connected, {?MODULE, subscribe}, - {?MODULE, subscribe, [Topics]}), + emqttd_broker:hook(client_connected, {?MODULE, client_connected}, + {?MODULE, client_connected, [Topics]}), {ok, #state{topics = Topics}}. -subscribe({Client, ClientId}, Topics) -> +client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) -> F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end, - [Client ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics]. + [ClientPid ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics]; + +client_connected(_ConnAck, _Client, _Topics) -> + ignore. unload(_Opts) -> - emqttd_broker:unhook(client_connected, {?MODULE, subscribe}). + emqttd_broker:unhook(client_connected, {?MODULE, client_connected}). + diff --git a/apps/emqttd/src/emqttd_mod_presence.erl b/apps/emqttd/src/emqttd_mod_presence.erl index ca66177e7..6432e0189 100644 --- a/apps/emqttd/src/emqttd_mod_presence.erl +++ b/apps/emqttd/src/emqttd_mod_presence.erl @@ -28,26 +28,52 @@ -include_lib("emqtt/include/emqtt.hrl"). +-include("emqttd.hrl"). + -export([load/1, unload/1]). --export([client_connected/2, client_disconnected/2]). +-export([client_connected/3, client_disconnected/3]). load(Opts) -> emqttd_broker:hook(client_connected, {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}), emqttd_broker:hook(client_disconnected, {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}), {ok, Opts}. -client_connected({Client, ClientId}, _Opts) -> - Topic = emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])), - Payload = iolist_to_binary(mochijson2:encode([{ts, emqttd_util:timestamp()}])), - emqttd_pubsub:publish(presence, #mqtt_message{topic = Topic, payload = Payload}). +client_connected(ConnAck, #mqtt_client{clientid = ClientId, + username = Username, + ipaddress = IpAddress, + clean_sess = CleanSess, + proto_ver = ProtoVer}, _Opts) -> + Sess = case CleanSess of + true -> false; + false -> true + end, + Json = mochijson2:encode([{username, Username}, + {ipaddress, emqttd_net:ntoa(IpAddress)}, + {session, Sess}, + {protocol, ProtoVer}, + {connack, ConnAck}, + {ts, emqttd_vm:timestamp()}]), + Message = #mqtt_message{topic = topic(connected, ClientId), + payload = iolist_to_binary(Json)}, + emqttd_pubsub:publish(presence, Message). -client_disconnected({ClientId, Reason}, _Opts) -> - Topic = emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])), - Payload = iolist_to_binary(mochijson2:encode([{reason, Reason}, {ts, emqttd_util:timestamp()}])), - emqttd_pubsub:publish(presence, #mqtt_message{topic = Topic, payload = Payload}). +client_disconnected(Reason, ClientId, _Opts) -> + Json = mochijson2:encode([{reason, reason(Reason)}, {ts, emqttd_vm:timestamp()}]), + emqttd_pubsub:publish(presence, #mqtt_message{topic = topic(disconnected, ClientId), + payload = iolist_to_binary(Json)}). unload(_Opts) -> emqttd_broker:unhook(client_connected, {?MODULE, client_connected}), emqttd_broker:unhook(client_disconnected, {?MODULE, client_disconnected}). + +topic(connected, ClientId) -> + emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])); +topic(disconnected, ClientId) -> + emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])). + +reason(Reason) when is_atom(Reason) -> Reason; +reason({Error, _}) when is_atom(Error) -> Error; +reason(_) -> internal_error. + diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 30013ac3e..4afa85fc3 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -29,19 +29,18 @@ -author("Feng Lee "). -include_lib("emqtt/include/emqtt.hrl"). + -include_lib("emqtt/include/emqtt_packet.hrl"). -include("emqttd.hrl"). %% API --export([init/3, clientid/1]). +-export([init/3, info/1, clientid/1, client/1]). -export([received/2, send/2, redeliver/2, shutdown/2]). -export([handle/2]). --export([info/1]). - %% Protocol State -record(proto_state, { peername, @@ -49,30 +48,29 @@ connected = false, %received CONNECT action? proto_ver, proto_name, - %packet_id, username, clientid, clean_sess, - session, %% session state or session pid + session, %% session state or session pid will_msg, - max_clientid_len = ?MAX_CLIENTID_LEN + max_clientid_len = ?MAX_CLIENTID_LEN, + client_pid }). -type proto_state() :: #proto_state{}. +%%------------------------------------------------------------------------------ +%% @doc Init protocol +%% @end +%%------------------------------------------------------------------------------ init(Peername, SendFun, Opts) -> MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), #proto_state{ peername = Peername, sendfun = SendFun, - max_clientid_len = MaxLen}. + max_clientid_len = MaxLen, + client_pid = self()}. -clientid(#proto_state{clientid = ClientId}) -> ClientId. - -client(#proto_state{peername = {Addr, _Port}, clientid = ClientId, username = Username}) -> - #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr}. - -%%SHOULD be registered in emqttd_cm info(#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, clientid = ClientId, @@ -80,11 +78,27 @@ info(#proto_state{proto_ver = ProtoVer, will_msg = WillMsg}) -> [{proto_ver, ProtoVer}, {proto_name, ProtoName}, - {clientid, ClientId}, + {clientid, ClientId}, {clean_sess, CleanSess}, {will_msg, WillMsg}]. -%%CONNECT – Client requests a connection to a Server +clientid(#proto_state{clientid = ClientId}) -> + ClientId. + +client(#proto_state{peername = {Addr, _Port}, + clientid = ClientId, + username = Username, + clean_sess = CleanSess, + proto_ver = ProtoVer, + client_pid = Pid}) -> + #mqtt_client{clientid = ClientId, + username = Username, + ipaddress = Addr, + clean_sess = CleanSess, + proto_ver = ProtoVer, + client_pid = Pid}. + +%% CONNECT – Client requests a connection to a Server %%A Client can only send the CONNECT Packet once over a Network Connection. -spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}. @@ -107,42 +121,45 @@ received(Packet = ?PACKET(_Type), State) -> {error, Reason, State} end. -handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = {Addr, _}}) -> +handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername = {Addr, _}}) -> #mqtt_packet_connect{proto_ver = ProtoVer, + proto_name = ProtoName, username = Username, password = Password, clean_sess = CleanSess, keep_alive = KeepAlive, - clientid = ClientId} = Var, + clientid = ClientId} = Var, - trace(recv, Packet, State#proto_state{clientid = ClientId}), %%TODO: fix later... + State1 = State0#proto_state{proto_ver = ProtoVer, + proto_name = ProtoName, + username = Username, + clientid = ClientId, + clean_sess = CleanSess}, - State1 = State#proto_state{proto_ver = ProtoVer, - username = Username, - clientid = ClientId, - clean_sess = CleanSess}, - {ReturnCode1, State2} = - case validate_connect(Var, State) of + trace(recv, Packet, State1), + + {ReturnCode1, State3} = + case validate_connect(Var, State1) of ?CONNACK_ACCEPT -> - Client = #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr}, - case emqttd_access_control:auth(Client, Password) of + case emqttd_access_control:auth(client(State1), Password) of ok -> - %% Generate one if null - ClientId1 = clientid(ClientId, State), - %% Register clientId - emqttd_cm:register(ClientId1), + %% Generate clientId if null + State2 = State1#proto_state{clientid = clientid(ClientId, State1)}, + + %% Register the client to cm + emqttd_cm:register(client(State2)), + %%Starting session - {ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}), + {ok, Session} = emqttd_session:start({CleanSess, clientid(State2), self()}), + %% Start keepalive start_keepalive(KeepAlive), - %% Run hooks - emqttd_broker:foreach_hooks(client_connected, [{self(), ClientId1}]), - {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1, - session = Session, - will_msg = willmsg(Var)}}; + + %% ACCEPT + {?CONNACK_ACCEPT, State2#proto_state{session = Session, will_msg = willmsg(Var)}}; {error, Reason}-> - lager:error("~s@~s: username '~s' login failed - ~s", + lager:error("~s@~s: username '~s', login failed - ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]), {?CONNACK_CREDENTIALS, State1} @@ -150,9 +167,10 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = ReturnCode -> {ReturnCode, State1} end, - %%TODO: this is not right... - notify(connected, ReturnCode1, State2), - send(?CONNACK_PACKET(ReturnCode1), State2); + %% Run hooks + emqttd_broker:foreach_hooks(client_connected, [ReturnCode1, client(State3)]), + %% Send connack + send(?CONNACK_PACKET(ReturnCode1), State3); handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> @@ -251,7 +269,6 @@ send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) -> %% message from session send({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when is_pid(SessPid) -> send(emqtt_message:to_packet(Message), State); - %% message(qos1, qos2) not from session send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session}) when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> @@ -279,12 +296,21 @@ trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> redeliver({?PUBREL, PacketId}, State) -> send(?PUBREL_PACKET(PacketId), State). +shutdown(duplicate_id, _State) -> + quiet; %% + +shutdown(normal, #proto_state{peername = Peername, clientid = ClientId}) -> + lager:info([{client, ClientId}], "Client ~s@~s: normal shutdown", + [ClientId, emqttd_net:format(Peername)]), + try_unregister(ClientId), + emqttd_broker:foreach_hooks(client_disconnected, [normal, ClientId]); + shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> - send_willmsg(ClientId, WillMsg), - try_unregister(ClientId, self()), - lager:info([{client, ClientId}], "Protocol ~s@~s Shutdown: ~p", + lager:info([{client, ClientId}], "Protocol ~s@~s: Shutdown for ~p", [ClientId, emqttd_net:format(Peername), Error]), - ok. + send_willmsg(ClientId, WillMsg), + try_unregister(ClientId), + emqttd_broker:foreach_hooks(client_disconnected, [Error, ClientId]). willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> emqtt_message:from_packet(Packet). @@ -377,8 +403,8 @@ validate_qos(undefined) -> true; validate_qos(Qos) when Qos =< ?QOS_2 -> true; validate_qos(_) -> false. -try_unregister(undefined, _) -> ok; -try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId). +try_unregister(undefined) -> ok; +try_unregister(ClientId) -> emqttd_cm:unregister(ClientId). %% publish ACL is cached in process dictionary. check_acl(publish, Topic, State) -> @@ -411,18 +437,3 @@ inc(?PINGRESP) -> inc(_) -> ingore. -notify(connected, ReturnCode, #proto_state{peername = Peername, - proto_ver = ProtoVer, - clientid = ClientId, - clean_sess = CleanSess}) -> - Sess = case CleanSess of - true -> false; - false -> true - end, - Params = [{from, emqttd_net:format(Peername)}, - {protocol, ProtoVer}, - {session, Sess}, - {connack, ReturnCode}]. - %emqttd_event:notify({connected, ClientId, Params}). - - diff --git a/apps/emqttd/src/emqttd_util.erl b/apps/emqttd/src/emqttd_util.erl index eca61b4b0..7ec6459e3 100644 --- a/apps/emqttd/src/emqttd_util.erl +++ b/apps/emqttd/src/emqttd_util.erl @@ -30,8 +30,7 @@ -export([apply_module_attributes/1, all_module_attributes/1, - cancel_timer/1, - timestamp/0, microsecs/0]). + cancel_timer/1]). -export([integer_to_binary/1]). @@ -91,11 +90,4 @@ cancel_timer(Ref) -> integer_to_binary(I) when is_integer(I) -> list_to_binary(integer_to_list(I)). -timestamp() -> - {MegaSecs, Secs, _MicroSecs} = os:timestamp(), - MegaSecs * 1000000 + Secs. - -microsecs() -> - {Mega, Sec, Micro} = erlang:now(), - (Mega * 1000000 + Sec) * 1000000 + Micro. diff --git a/apps/emqttd/src/emqttd_vm.erl b/apps/emqttd/src/emqttd_vm.erl index 1e4624a03..217cf0092 100644 --- a/apps/emqttd/src/emqttd_vm.erl +++ b/apps/emqttd/src/emqttd_vm.erl @@ -24,12 +24,23 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_vm). -author("Feng Lee "). +-export([timestamp/0, microsecs/0]). + -export([loads/0]). +timestamp() -> + {MegaSecs, Secs, _MicroSecs} = os:timestamp(), + MegaSecs * 1000000 + Secs. + +microsecs() -> + {Mega, Sec, Micro} = erlang:now(), + (Mega * 1000000 + Sec) * 1000000 + Micro. + loads() -> [{load1, ftos(cpu_sup:avg1()/256)}, {load5, ftos(cpu_sup:avg5()/256)},