From d09a0787ea2dc6999aec5bba6817d6c79d2b79b4 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 24 Jun 2015 22:05:00 +0800 Subject: [PATCH] fix sessions --- src/emqttd_app.erl | 3 +- src/emqttd_cm.erl | 171 ---------------------------------------- src/emqttd_cm_sup.erl | 59 -------------- src/emqttd_protocol.erl | 20 ++--- src/emqttd_sm.erl | 11 +-- src/emqttd_sm_sup.erl | 5 +- 6 files changed, 16 insertions(+), 253 deletions(-) delete mode 100644 src/emqttd_cm.erl delete mode 100644 src/emqttd_cm_sup.erl diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index f9434b385..ac2dd14e8 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -70,13 +70,12 @@ print_vsn() -> start_servers(Sup) -> Servers = [{"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, - {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", {supervisor, emqttd_sm_sup}}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, + {"emqttd broker", emqttd_broker}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, - {"emqttd broker", emqttd_broker}, {"emqttd alarm", emqttd_alarm}, {"emqttd mode supervisor", emqttd_mod_sup}, {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl deleted file mode 100644 index 1c3effa31..000000000 --- a/src/emqttd_cm.erl +++ /dev/null @@ -1,171 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% MQTT Client Manager -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_cm). - --author("Feng Lee "). - --include("emqttd.hrl"). - --behaviour(gen_server). - --define(SERVER, ?MODULE). - -%% API Exports --export([start_link/2, pool/0, table/0]). - --export([lookup/1, register/1, unregister/1]). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {id, tab, statsfun}). - --define(CM_POOL, cm_pool). - --define(CLIENT_TAB, mqtt_client). - -%%%============================================================================= -%%% API -%%%============================================================================= - -%%------------------------------------------------------------------------------ -%% @doc Start client manager -%% @end -%%------------------------------------------------------------------------------ --spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when - Id :: pos_integer(), - StatsFun :: fun(). -start_link(Id, StatsFun) -> - gen_server:start_link(?MODULE, [Id, StatsFun], []). - -pool() -> ?CM_POOL. - -table() -> ?CLIENT_TAB. - -%%------------------------------------------------------------------------------ -%% @doc Lookup client pid with clientId -%% @end -%%------------------------------------------------------------------------------ --spec lookup(ClientId :: binary()) -> mqtt_client() | undefined. -lookup(ClientId) when is_binary(ClientId) -> - case ets:lookup(?CLIENT_TAB, ClientId) of - [Client] -> Client; - [] -> undefined - end. - -%%------------------------------------------------------------------------------ -%% @doc Register clientId with pid. -%% @end -%%------------------------------------------------------------------------------ --spec register(Client :: mqtt_client()) -> ok. -register(Client = #mqtt_client{clientid = ClientId}) -> - CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), - gen_server:call(CmPid, {register, Client}, infinity). - -%%------------------------------------------------------------------------------ -%% @doc Unregister clientId with pid. -%% @end -%%------------------------------------------------------------------------------ --spec unregister(ClientId :: binary()) -> ok. -unregister(ClientId) when is_binary(ClientId) -> - CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId), - gen_server:cast(CmPid, {unregister, ClientId, self()}). - -%%%============================================================================= -%%% gen_server callbacks -%%%============================================================================= - -init([Id, StatsFun]) -> - gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), - {ok, #state{id = Id, statsfun = StatsFun}}. - -handle_call({register, Client = #mqtt_client{clientid = ClientId, client_pid = Pid}}, _From, State) -> - case ets:lookup(?CLIENT_TAB, ClientId) of - [#mqtt_client{client_pid = Pid}] -> - lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), - ignore; - [#mqtt_client{client_pid = OldPid, client_mon = MRef}] -> - lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), - OldPid ! {stop, duplicate_id, Pid}, - erlang:demonitor(MRef), - ets:insert(?CLIENT_TAB, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}); - [] -> - ets:insert(?CLIENT_TAB, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}) - end, - {reply, ok, setstats(State)}; - -handle_call(Req, _From, State) -> - lager:error("unexpected request: ~p", [Req]), - {reply, {error, badreq}, State}. - -handle_cast({unregister, ClientId, Pid}, State) -> - case ets:lookup(?CLIENT_TAB, ClientId) of - [#mqtt_client{client_pid = Pid, client_mon = MRef}] -> - erlang:demonitor(MRef, [flush]), - ets:delete(?CLIENT_TAB, ClientId); - [_] -> - ignore; - [] -> - lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid]) - end, - {noreply, setstats(State)}; - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({'DOWN', MRef, process, DownPid, Reason}, State) -> - case ets:match_object(?CLIENT_TAB, {mqtt_client, '$1', '_', '_', DownPid, MRef, '_', '_'}) of - [] -> - ignore; - Clients -> - lists:foreach( - fun(Client = #mqtt_client{clientid = ClientId}) -> - ets:delete_object(?CLIENT_TAB, Client), - lager:error("Client ~s is Down: ~p", [ClientId, Reason]), - emqttd_broker:foreach_hooks(client_disconnected, [Reason, ClientId]) - end, Clients) - end, - {noreply, setstats(State)}; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, #state{id = Id}) -> - gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%============================================================================= -%%% Internal functions -%%%============================================================================= - -setstats(State = #state{statsfun = StatsFun}) -> - StatsFun(ets:info(?CLIENT_TAB, size)), State. - - diff --git a/src/emqttd_cm_sup.erl b/src/emqttd_cm_sup.erl deleted file mode 100644 index bf9500630..000000000 --- a/src/emqttd_cm_sup.erl +++ /dev/null @@ -1,59 +0,0 @@ -%%%----------------------------------------------------------------------------- -%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. -%%% -%%% Permission is hereby granted, free of charge, to any person obtaining a copy -%%% of this software and associated documentation files (the "Software"), to deal -%%% in the Software without restriction, including without limitation the rights -%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%%% copies of the Software, and to permit persons to whom the Software is -%%% furnished to do so, subject to the following conditions: -%%% -%%% The above copyright notice and this permission notice shall be included in all -%%% copies or substantial portions of the Software. -%%% -%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%%% SOFTWARE. -%%%----------------------------------------------------------------------------- -%%% @doc -%%% emqttd client manager supervisor. -%%% -%%% @end -%%%----------------------------------------------------------------------------- --module(emqttd_cm_sup). - --author("Feng Lee "). - --include("emqttd.hrl"). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init([]) -> - ets:new(emqttd_cm:table(), [ordered_set, named_table, public, {keypos, 2}, - {write_concurrency, true}]), - Schedulers = erlang:system_info(schedulers), - gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]), - StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), - Children = lists:map( - fun(I) -> - Name = {emqttd_cm, I}, - gproc_pool:add_worker(emqttd_cm:pool(), Name, I), - {Name, {emqttd_cm, start_link, [I, StatsFun]}, - permanent, 10000, worker, [emqttd_cm]} - end, lists:seq(1, Schedulers)), - {ok, {{one_for_all, 10, 100}, Children}}. - - diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index dbd6bfa72..9be6b41fc 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -63,11 +63,10 @@ %%------------------------------------------------------------------------------ init(Peername, SendFun, Opts) -> MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), - #proto_state{ - peername = Peername, - sendfun = SendFun, - max_clientid_len = MaxLen, - client_pid = self()}. + #proto_state{peername = Peername, + sendfun = SendFun, + max_clientid_len = MaxLen, + client_pid = self()}. info(#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, @@ -145,9 +144,6 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} %% Generate clientId if null State2 = State1#proto_state{clientid = clientid(ClientId, State1)}, - %% Register the client to cm - emqttd_cm:register(client(State2)), - %%Starting session {ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)), @@ -166,7 +162,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} {ReturnCode, State1} end, %% Run hooks - emqttd_broker:foreach_hooks(client_connected, [ReturnCode1, client(State3)]), + emqttd_broker:foreach_hooks('client.connected', [ReturnCode1, client(State3)]), %% Send connack send(?CONNACK_PACKET(ReturnCode1), State3); @@ -293,8 +289,7 @@ shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p", [ClientId, emqttd_net:format(Peername), Error]), send_willmsg(ClientId, WillMsg), - try_unregister(ClientId), - emqttd_broker:foreach_hooks(client_disconnected, [Error, ClientId]). + emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]). willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> emqttd_message:from_packet(Packet). @@ -387,9 +382,6 @@ validate_qos(undefined) -> true; validate_qos(Qos) when Qos =< ?QOS_2 -> true; validate_qos(_) -> false. -try_unregister(undefined) -> ok; -try_unregister(ClientId) -> emqttd_cm:unregister(ClientId). - %% publish ACL is cached in process dictionary. check_acl(publish, Topic, State) -> case get({acl, publish, Topic}) of diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index bbebf3db5..fca18aa76 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -53,7 +53,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {id, statsfun}). +-record(state, {id, client_statsfun, sess_statsfun}). -define(SM_POOL, sm_pool). @@ -67,11 +67,12 @@ %% @doc Start a session manager %% @end %%------------------------------------------------------------------------------ --spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when +-spec start_link(Id, ClientStatsFun, SessStatsFun) -> {ok, pid()} | ignore | {error, any()} when Id :: pos_integer(), - StatsFun :: fun(). -start_link(Id, StatsFun) -> - gen_server:start_link(?MODULE, [Id, StatsFun], []). + ClientStatsFun :: fun(), + SessStatsFun :: fun(). +start_link(Id, ClientStatsFun, SessStatsFun) -> + gen_server:start_link(?MODULE, [Id, ClientStatsFun, SessStatsFun], []). %%------------------------------------------------------------------------------ %% @doc Pool name. diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index fc50956d1..00babaf94 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -46,12 +46,13 @@ init([]) -> {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), - StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), + ClientStatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), + SessStatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), Children = lists:map( fun(I) -> Name = {emqttd_sm, I}, gproc_pool:add_worker(emqttd_sm:pool(), Name, I), - {Name, {emqttd_sm, start_link, [I, StatsFun]}, + {Name, {emqttd_sm, start_link, [I, ClientStatsFun, SessStatsFun]}, permanent, 10000, worker, [emqttd_sm]} end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}.