From f1632ef2dfc1f4f9eef1e0a66263a3315ae7eb49 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Apr 2015 23:20:19 +0800 Subject: [PATCH] fix issues #54 --- apps/emqttd/src/emqttd_app.erl | 2 +- apps/emqttd/src/emqttd_cm.erl | 116 +++++++++---------------- apps/emqttd/src/emqttd_cm_sup.erl | 62 +++++++++++++ apps/emqttd/src/emqttd_session.erl | 50 +++++++++-- apps/emqttd/src/emqttd_session_sup.erl | 2 +- apps/emqttd/src/emqttd_utils.erl | 8 +- 6 files changed, 155 insertions(+), 85 deletions(-) create mode 100644 apps/emqttd/src/emqttd_cm_sup.erl diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index e7e116dd6..aaf12246a 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -75,7 +75,7 @@ start_servers(Sup) -> {"emqttd config", emqttd_config}, {"emqttd event", emqttd_event}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, - {"emqttd client manager", emqttd_cm}, + {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", emqttd_sm}, {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts}, diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 6b9d34848..87396ea55 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -33,24 +33,17 @@ -define(SERVER, ?MODULE). %% API Exports --export([start_link/0]). +-export([start_link/2]). -export([lookup/1, register/1, unregister/1]). -%% Stats --export([getstats/0]). - %% gen_server Function Exports --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). -record(state, {tab}). --define(CLIENT_TAB, mqtt_client). +-define(POOL, cm). %%%============================================================================= %%% API @@ -62,9 +55,11 @@ %% %% @end %%------------------------------------------------------------------------------ --spec start_link() -> {ok, pid()} | ignore | {error, any()}. -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec start_link(Id, TabId) -> {ok, pid()} | ignore | {error, any()} when + Id :: pos_integer(), + TabId :: ets:tid(). +start_link(Id, TabId) -> + gen_server:start_link(?MODULE, [Id, TabId], []). %%------------------------------------------------------------------------------ %% @doc @@ -74,7 +69,7 @@ start_link() -> %%------------------------------------------------------------------------------ -spec lookup(ClientId :: binary()) -> pid() | undefined. lookup(ClientId) when is_binary(ClientId) -> - case ets:lookup(?CLIENT_TAB, ClientId) of + case ets:lookup(emqttd_cm_sup:table(), ClientId) of [{_, Pid, _}] -> Pid; [] -> undefined end. @@ -85,12 +80,8 @@ lookup(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec register(ClientId :: binary()) -> ok. register(ClientId) when is_binary(ClientId) -> - Pid = self(), - %% this is atomic - case ets:insert_new(?CLIENT_TAB, {ClientId, Pid, undefined}) of - true -> gen_server:cast(?SERVER, {monitor, ClientId, Pid}); - false -> gen_server:cast(?SERVER, {register, ClientId, Pid}) - end. + CmPid = gproc_pool:pick_worker(?POOL, ClientId), + gen_server:call(CmPid, {register, ClientId, self()}, infinity). %%------------------------------------------------------------------------------ %% @doc @@ -100,54 +91,46 @@ register(ClientId) when is_binary(ClientId) -> %%------------------------------------------------------------------------------ -spec unregister(ClientId :: binary()) -> ok. unregister(ClientId) when is_binary(ClientId) -> - gen_server:cast(?SERVER, {unregister, ClientId, self()}). - -%%------------------------------------------------------------------------------ -%% @doc -%% Get statistics of client manager. -%% -%% @end -%%------------------------------------------------------------------------------ -getstats() -> - [{Name, emqttd_broker:getstat(Name)} || - Name <- ['clients/count', 'clients/max']]. + CmPid = gproc_pool:pick_worker(?POOL, ClientId), + gen_server:cast(CmPid, {unregister, ClientId, self()}). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([]) -> - TabId = ets:new(?CLIENT_TAB, [set, - named_table, - public, - {write_concurrency, true}]), +init([Id, TabId]) -> + gproc_pool:connect_worker(?POOL, {?MODULE, Id}), {ok, #state{tab = TabId}}. +handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> + case ets:lookup(Tab, ClientId) of + [{_, Pid, _}] -> + lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), + ignore; + [{_, OldPid, MRef}] -> + lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), + %%TODO: tell session old client is down here? + case emqttd_session:lookup(ClientId) of + undefined -> ok; + SessPid -> emqttd_session:client_down(SessPid, {OldPid, duplicate_id}) + end, + OldPid ! {stop, duplicate_id, Pid}, + erlang:demonitor(MRef), + ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}); + [] -> + ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}) + end, + {reply, ok, State}; + handle_call(Req, _From, State) -> lager:error("unexpected request: ~p", [Req]), {reply, {error, badreq}, State}. -handle_cast({register, ClientId, Pid}, State=#state{tab = Tab}) -> - case registerd(Tab, {ClientId, Pid}) of - true -> - ignore; - false -> - ets:insert(Tab, {ClientId, Pid, erlang:monitor(process, Pid)}) - end, - {noreply, setstats(State)}; - -handle_cast({monitor, ClientId, Pid}, State = #state{tab = Tab}) -> - case ets:update_element(Tab, ClientId, {3, erlang:monitor(process, Pid)}) of - true -> ok; - false -> lager:error("failed to monitor clientId '~s' with pid ~p", [ClientId, Pid]) - end, - {noreply, setstats(State)}; - -handle_cast({unregister, ClientId, Pid}, State) -> - case ets:lookup(?CLIENT_TAB, ClientId) of +handle_cast({unregister, ClientId, Pid}, State = #state{tab = TabId}) -> + case ets:lookup(TabId, ClientId) of [{_, Pid, MRef}] -> erlang:demonitor(MRef, [flush]), - ets:delete(?CLIENT_TAB, ClientId); + ets:delete(TabId, ClientId); [_] -> ignore; [] -> @@ -158,8 +141,8 @@ handle_cast({unregister, ClientId, Pid}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(?CLIENT_TAB, {'_', DownPid, MRef}), +handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{tab = TabId}) -> + ets:match_delete(TabId, {'_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> @@ -174,23 +157,10 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= -registerd(Tab, {ClientId, Pid}) -> - case ets:lookup(Tab, ClientId) of - [{_, Pid, _}] -> - lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]), - true; - [{_, OldPid, MRef}] -> - lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]), - OldPid ! {stop, duplicate_id, Pid}, - erlang:demonitor(MRef), - false; - [] -> - false - end. -setstats(State) -> +setstats(State = #state{tab = TabId}) -> emqttd_broker:setstats('clients/count', 'clients/max', - ets:info(?CLIENT_TAB, size)), State. + ets:info(TabId, size)), State. diff --git a/apps/emqttd/src/emqttd_cm_sup.erl b/apps/emqttd/src/emqttd_cm_sup.erl new file mode 100644 index 000000000..853349b55 --- /dev/null +++ b/apps/emqttd/src/emqttd_cm_sup.erl @@ -0,0 +1,62 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd client manager supervisor. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_cm_sup). + +-author('feng@emqtt.io'). + +-include("emqttd.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0, table/0]). + +%% Supervisor callbacks +-export([init/1]). + +-define(CLIENT_TAB, mqtt_client). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +table() -> ?CLIENT_TAB. + +init([]) -> + TabId = ets:new(?CLIENT_TAB, [set, named_table, public, + {write_concurrency, true}]), + Schedulers = erlang:system_info(schedulers), + gproc_pool:new(cm, hash, [{size, Schedulers}]), + Children = lists:map( + fun(I) -> + Name = {emqttd_cm, I}, + gproc_pool:add_worker(cm, Name, I), + {Name, {emqttd_cm, start_link, [I, TabId]}, + permanent, 10000, worker, [emqttd_cm]} + end, lists:seq(1, Schedulers)), + {ok, {{one_for_all, 10, 100}, Children}}. + + diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index a2a9a3ecd..77c0ae7a3 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -42,7 +42,7 @@ -export([store/2]). %% Start gen_server --export([start_link/3]). +-export([start_link/3, client_down/2]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -252,9 +252,20 @@ initial_state(ClientId, ClientPid) -> State = initial_state(ClientId), State#session_state{client_pid = ClientPid}. +%%------------------------------------------------------------------------------ +%% @doc Start a session process. +%% @end +%%------------------------------------------------------------------------------ start_link(SessOpts, ClientId, ClientPid) -> gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []). +%%------------------------------------------------------------------------------ +%% @doc Notify the session process that client will be DOWN. +%% @end +%%------------------------------------------------------------------------------ +client_down(SessPid, {ClientPid, Reason}) -> + gen_server:cast(SessPid, {'DOWN', ClientPid, Reason}). + %%%============================================================================= %%% gen_server callbacks %%%============================================================================= @@ -279,7 +290,8 @@ handle_call({unsubscribe, Topics}, _From, State) -> {reply, ok, NewState}; handle_call(Req, _From, State) -> - {stop, {badreq, Req}, State}. + lager:error("Unexpected request: ~p", [Req]), + {reply, error, State}. handle_cast({resume, ClientId, ClientPid}, State = #session_state{ clientid = ClientId, @@ -336,8 +348,20 @@ handle_cast({destroy, ClientId}, State = #session_state{clientid = ClientId}) -> lager:warning("Session ~s destroyed", [ClientId]), {stop, normal, State}; +handle_cast({resume, ClientId, ClientPid}, State) -> + lager:error("Cannot resume session ~p with pid ~p: ~p", + [ClientId, ClientPid, State]), + {noreply, State}; + +handle_cast({'DOWN', ClientPid, Reason}, State = #session_state{clientid = ClientId, + client_pid = ClientPid}) -> + lager:error("Session: client ~s@~p is down for ~p", [ClientId, ClientPid, Reason]), + unlink(ClientPid), + {noreply, start_expire_timer(State#session_state{client_pid = undefined})}; + handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. + lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) -> F = fun(Message, S) -> dispatch(Message, S) end, @@ -347,18 +371,21 @@ handle_info({dispatch, {_From, Message}}, State) -> {noreply, dispatch(Message, State)}; handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId, - client_pid = ClientPid, - expires = Expires}) -> - lager:warning("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]), - Timer = erlang:send_after(Expires * 1000, self(), session_expired), - {noreply, State#session_state{client_pid = undefined, expire_timer = Timer}}; + client_pid = ClientPid}) -> + lager:error("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]), + {noreply, start_expire_timer(State#session_state{client_pid = undefined})}; + +handle_info({'EXIT', ClientPid, _Reason}, State = #session_state{client_pid = OtherClientPid}) -> + lager:error("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid, OtherClientPid]), + {noreply, State}; handle_info(session_expired, State = #session_state{clientid = ClientId}) -> lager:warning("Session ~s expired!", [ClientId]), {stop, {shutdown, expired}, State}; handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. + lager:critical("Unexpected Info: ~p, State: ~p", [Info, State]), + {noreply, State}. terminate(_Reason, _State) -> ok. @@ -393,4 +420,9 @@ next_msg_id(State = #session_state{message_id = 16#ffff}) -> next_msg_id(State = #session_state{message_id = MsgId}) -> State#session_state{message_id = MsgId + 1}. +start_expire_timer(State = #session_state{expires = Expires, + expire_timer = OldTimer}) -> + emqttd_utils:cancel_timer(OldTimer), + Timer = erlang:send_after(Expires * 1000, self(), session_expired), + State#session_state{expire_timer = Timer}. diff --git a/apps/emqttd/src/emqttd_session_sup.erl b/apps/emqttd/src/emqttd_session_sup.erl index 723efc532..f891d389f 100644 --- a/apps/emqttd/src/emqttd_session_sup.erl +++ b/apps/emqttd/src/emqttd_session_sup.erl @@ -48,7 +48,7 @@ start_session(ClientId, ClientPid) -> %%% Supervisor callbacks %%%============================================================================= init([SessOpts]) -> - {ok, {{simple_one_for_one, 0, 1}, + {ok, {{simple_one_for_one, 10, 10}, [{session, {emqttd_session, start_link, [SessOpts]}, transient, 10000, worker, [emqttd_session]}]}}. diff --git a/apps/emqttd/src/emqttd_utils.erl b/apps/emqttd/src/emqttd_utils.erl index c5ceecef9..316f1854b 100644 --- a/apps/emqttd/src/emqttd_utils.erl +++ b/apps/emqttd/src/emqttd_utils.erl @@ -27,7 +27,8 @@ -module(emqttd_utils). -export([apply_module_attributes/1, - all_module_attributes/1]). + all_module_attributes/1, + cancel_timer/1]). %% only {F, Args}... apply_module_attributes(Name) -> @@ -76,3 +77,8 @@ ignore_lib_apps(Apps) -> hipe, esockd, mochiweb], [App || App = {Name, _, _} <- Apps, not lists:member(Name, LibApps)]. + +cancel_timer(undefined) -> + undefined; +cancel_timer(Ref) -> + catch erlang:cancel_timer(Ref).