fix sessions

This commit is contained in:
Feng Lee 2015-06-24 22:05:00 +08:00
parent 0f68186472
commit d09a0787ea
6 changed files with 16 additions and 253 deletions

View File

@ -70,13 +70,12 @@ print_vsn() ->
start_servers(Sup) ->
Servers = [{"emqttd trace", emqttd_trace},
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
{"emqttd client manager", {supervisor, emqttd_cm_sup}},
{"emqttd session manager", {supervisor, emqttd_sm_sup}},
{"emqttd session supervisor", {supervisor, emqttd_session_sup}},
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
{"emqttd broker", emqttd_broker},
{"emqttd stats", emqttd_stats},
{"emqttd metrics", emqttd_metrics},
{"emqttd broker", emqttd_broker},
{"emqttd alarm", emqttd_alarm},
{"emqttd mode supervisor", emqttd_mod_sup},
{"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}},

View File

@ -1,171 +0,0 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% MQTT Client Manager
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_cm).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% API Exports
-export([start_link/2, pool/0, table/0]).
-export([lookup/1, register/1, unregister/1]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {id, tab, statsfun}).
-define(CM_POOL, cm_pool).
-define(CLIENT_TAB, mqtt_client).
%%%=============================================================================
%%% API
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc Start client manager
%% @end
%%------------------------------------------------------------------------------
-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
Id :: pos_integer(),
StatsFun :: fun().
start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, StatsFun], []).
pool() -> ?CM_POOL.
table() -> ?CLIENT_TAB.
%%------------------------------------------------------------------------------
%% @doc Lookup client pid with clientId
%% @end
%%------------------------------------------------------------------------------
-spec lookup(ClientId :: binary()) -> mqtt_client() | undefined.
lookup(ClientId) when is_binary(ClientId) ->
case ets:lookup(?CLIENT_TAB, ClientId) of
[Client] -> Client;
[] -> undefined
end.
%%------------------------------------------------------------------------------
%% @doc Register clientId with pid.
%% @end
%%------------------------------------------------------------------------------
-spec register(Client :: mqtt_client()) -> ok.
register(Client = #mqtt_client{clientid = ClientId}) ->
CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:call(CmPid, {register, Client}, infinity).
%%------------------------------------------------------------------------------
%% @doc Unregister clientId with pid.
%% @end
%%------------------------------------------------------------------------------
-spec unregister(ClientId :: binary()) -> ok.
unregister(ClientId) when is_binary(ClientId) ->
CmPid = gproc_pool:pick_worker(?CM_POOL, ClientId),
gen_server:cast(CmPid, {unregister, ClientId, self()}).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([Id, StatsFun]) ->
gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}),
{ok, #state{id = Id, statsfun = StatsFun}}.
handle_call({register, Client = #mqtt_client{clientid = ClientId, client_pid = Pid}}, _From, State) ->
case ets:lookup(?CLIENT_TAB, ClientId) of
[#mqtt_client{client_pid = Pid}] ->
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
ignore;
[#mqtt_client{client_pid = OldPid, client_mon = MRef}] ->
lager:error("clientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]),
OldPid ! {stop, duplicate_id, Pid},
erlang:demonitor(MRef),
ets:insert(?CLIENT_TAB, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)});
[] ->
ets:insert(?CLIENT_TAB, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)})
end,
{reply, ok, setstats(State)};
handle_call(Req, _From, State) ->
lager:error("unexpected request: ~p", [Req]),
{reply, {error, badreq}, State}.
handle_cast({unregister, ClientId, Pid}, State) ->
case ets:lookup(?CLIENT_TAB, ClientId) of
[#mqtt_client{client_pid = Pid, client_mon = MRef}] ->
erlang:demonitor(MRef, [flush]),
ets:delete(?CLIENT_TAB, ClientId);
[_] ->
ignore;
[] ->
lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid])
end,
{noreply, setstats(State)};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, Reason}, State) ->
case ets:match_object(?CLIENT_TAB, {mqtt_client, '$1', '_', '_', DownPid, MRef, '_', '_'}) of
[] ->
ignore;
Clients ->
lists:foreach(
fun(Client = #mqtt_client{clientid = ClientId}) ->
ets:delete_object(?CLIENT_TAB, Client),
lager:error("Client ~s is Down: ~p", [ClientId, Reason]),
emqttd_broker:foreach_hooks(client_disconnected, [Reason, ClientId])
end, Clients)
end,
{noreply, setstats(State)};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{id = Id}) ->
gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
setstats(State = #state{statsfun = StatsFun}) ->
StatsFun(ets:info(?CLIENT_TAB, size)), State.

View File

@ -1,59 +0,0 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd client manager supervisor.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_cm_sup).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
ets:new(emqttd_cm:table(), [ordered_set, named_table, public, {keypos, 2},
{write_concurrency, true}]),
Schedulers = erlang:system_info(schedulers),
gproc_pool:new(emqttd_cm:pool(), hash, [{size, Schedulers}]),
StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'),
Children = lists:map(
fun(I) ->
Name = {emqttd_cm, I},
gproc_pool:add_worker(emqttd_cm:pool(), Name, I),
{Name, {emqttd_cm, start_link, [I, StatsFun]},
permanent, 10000, worker, [emqttd_cm]}
end, lists:seq(1, Schedulers)),
{ok, {{one_for_all, 10, 100}, Children}}.

View File

@ -63,8 +63,7 @@
%%------------------------------------------------------------------------------
init(Peername, SendFun, Opts) ->
MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
#proto_state{
peername = Peername,
#proto_state{peername = Peername,
sendfun = SendFun,
max_clientid_len = MaxLen,
client_pid = self()}.
@ -145,9 +144,6 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
%% Generate clientId if null
State2 = State1#proto_state{clientid = clientid(ClientId, State1)},
%% Register the client to cm
emqttd_cm:register(client(State2)),
%%Starting session
{ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)),
@ -166,7 +162,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
{ReturnCode, State1}
end,
%% Run hooks
emqttd_broker:foreach_hooks(client_connected, [ReturnCode1, client(State3)]),
emqttd_broker:foreach_hooks('client.connected', [ReturnCode1, client(State3)]),
%% Send connack
send(?CONNACK_PACKET(ReturnCode1), State3);
@ -293,8 +289,7 @@ shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg
lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p",
[ClientId, emqttd_net:format(Peername), Error]),
send_willmsg(ClientId, WillMsg),
try_unregister(ClientId),
emqttd_broker:foreach_hooks(client_disconnected, [Error, ClientId]).
emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]).
willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
emqttd_message:from_packet(Packet).
@ -387,9 +382,6 @@ validate_qos(undefined) -> true;
validate_qos(Qos) when Qos =< ?QOS_2 -> true;
validate_qos(_) -> false.
try_unregister(undefined) -> ok;
try_unregister(ClientId) -> emqttd_cm:unregister(ClientId).
%% publish ACL is cached in process dictionary.
check_acl(publish, Topic, State) ->
case get({acl, publish, Topic}) of

View File

@ -53,7 +53,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {id, statsfun}).
-record(state, {id, client_statsfun, sess_statsfun}).
-define(SM_POOL, sm_pool).
@ -67,11 +67,12 @@
%% @doc Start a session manager
%% @end
%%------------------------------------------------------------------------------
-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
-spec start_link(Id, ClientStatsFun, SessStatsFun) -> {ok, pid()} | ignore | {error, any()} when
Id :: pos_integer(),
StatsFun :: fun().
start_link(Id, StatsFun) ->
gen_server:start_link(?MODULE, [Id, StatsFun], []).
ClientStatsFun :: fun(),
SessStatsFun :: fun().
start_link(Id, ClientStatsFun, SessStatsFun) ->
gen_server:start_link(?MODULE, [Id, ClientStatsFun, SessStatsFun], []).
%%------------------------------------------------------------------------------
%% @doc Pool name.

View File

@ -46,12 +46,13 @@ init([]) ->
{write_concurrency, true}]),
Schedulers = erlang:system_info(schedulers),
gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]),
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
ClientStatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'),
SessStatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
Children = lists:map(
fun(I) ->
Name = {emqttd_sm, I},
gproc_pool:add_worker(emqttd_sm:pool(), Name, I),
{Name, {emqttd_sm, start_link, [I, StatsFun]},
{Name, {emqttd_sm, start_link, [I, ClientStatsFun, SessStatsFun]},
permanent, 10000, worker, [emqttd_sm]}
end, lists:seq(1, Schedulers)),
{ok, {{one_for_all, 10, 100}, Children}}.