From fa25d6089361f4fccd492bed0267aebde75fb93d Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 10 Sep 2015 11:18:30 +0800 Subject: [PATCH] fix issue#280 --- src/emqttd_client.erl | 12 ++- src/emqttd_ctl.erl | 9 +- src/emqttd_session.erl | 191 +++++++++++++++++++-------------------- src/emqttd_sm.erl | 71 ++++++--------- src/emqttd_sm_helper.erl | 83 +++++++++++++++++ src/emqttd_sm_sup.erl | 16 ++-- src/emqttd_sup.erl | 3 +- 7 files changed, 231 insertions(+), 154 deletions(-) create mode 100644 src/emqttd_sm_helper.erl diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index f78422ac6..ed133856b 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -157,9 +157,17 @@ handle_info(Info, State = #state{peername = Peername}) -> lager:critical("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]), {noreply, State}. -terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state = ProtoState}) -> - lager:info("Client ~s terminated, reason: ~p", [emqttd_net:format(Peername), Reason]), +terminate(Reason, #state{peername = Peername, + transport = Transport, + socket = Socket, + keepalive = KeepAlive, + proto_state = ProtoState}) -> + lager:info("Client(~s) terminated, reason: ~p", [emqttd_net:format(Peername), Reason]), emqttd_keepalive:cancel(KeepAlive), + if + Reason == {shutdown, conn_closed} -> ok; + true -> Transport:fast_close(Socket) + end, case {ProtoState, Reason} of {undefined, _} -> ok; {_, {shutdown, Error}} -> diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index aaca6be26..febe8a764 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -176,10 +176,11 @@ sessions(["show", ClientId0]) -> listeners([]) -> lists:foreach(fun({{Protocol, Port}, Pid}) -> - ?PRINT("listener ~s:~p~n", [Protocol, Port]), - ?PRINT(" acceptors: ~p~n", [esockd:get_acceptors(Pid)]), - ?PRINT(" max_clients: ~p~n", [esockd:get_max_clients(Pid)]), - ?PRINT(" current_clients: ~p~n", [esockd:get_current_clients(Pid)]) + ?PRINT("listener ~s:~w~n", [Protocol, Port]), + ?PRINT(" acceptors: ~w~n", [esockd:get_acceptors(Pid)]), + ?PRINT(" max_clients: ~w~n", [esockd:get_max_clients(Pid)]), + ?PRINT(" current_clients: ~w~n", [esockd:get_current_clients(Pid)]), + ?PRINT(" shutdown_count: ~p~n", [esockd:get_shutdown_count(Pid)]) end, esockd:listeners()). bridges(["list"]) -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 9a29ce5da..8314874f3 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -78,9 +78,12 @@ %% ClientId: Identifier of Session client_id :: binary(), - %% Client Pid linked with session + %% Client Pid bind with session client_pid :: pid(), + %% Client Monitor + client_mon :: reference(), + %% Last packet id of the session packet_id = 1, @@ -136,6 +139,8 @@ timestamp}). +-define(PUBSUB_TIMEOUT, 60000). + %%------------------------------------------------------------------------------ %% @doc Start a session. %% @end @@ -149,69 +154,69 @@ start_link(CleanSess, ClientId, ClientPid) -> %% @end %%------------------------------------------------------------------------------ -spec resume(pid(), mqtt_client_id(), pid()) -> ok. -resume(Session, ClientId, ClientPid) -> - gen_server2:cast(Session, {resume, ClientId, ClientPid}). +resume(SessPid, ClientId, ClientPid) -> + gen_server2:cast(SessPid, {resume, ClientId, ClientPid}). %%------------------------------------------------------------------------------ %% @doc Destroy a session. %% @end %%------------------------------------------------------------------------------ -spec destroy(pid(), mqtt_client_id()) -> ok. -destroy(Session, ClientId) -> - gen_server2:call(Session, {destroy, ClientId}). +destroy(SessPid, ClientId) -> + gen_server2:cast(SessPid, {destroy, ClientId}). %%------------------------------------------------------------------------------ %% @doc Subscribe Topics %% @end %%------------------------------------------------------------------------------ -spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}. -subscribe(Session, TopicTable) -> - gen_server2:call(Session, {subscribe, TopicTable}, infinity). +subscribe(SessPid, TopicTable) -> + gen_server2:call(SessPid, {subscribe, TopicTable}, ?PUBSUB_TIMEOUT). %%------------------------------------------------------------------------------ %% @doc Publish message %% @end %%------------------------------------------------------------------------------ -spec publish(pid(), mqtt_message()) -> ok. -publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) -> +publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) -> %% publish qos0 directly emqttd_pubsub:publish(Msg); -publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) -> +publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) -> %% publish qos1 directly, and client will puback automatically emqttd_pubsub:publish(Msg); -publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) -> +publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) -> %% publish qos2 by session - gen_server2:call(Session, {publish, Msg}). + gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT). %%------------------------------------------------------------------------------ %% @doc PubAck message %% @end %%------------------------------------------------------------------------------ -spec puback(pid(), mqtt_packet_id()) -> ok. -puback(Session, PktId) -> - gen_server2:cast(Session, {puback, PktId}). +puback(SessPid, PktId) -> + gen_server2:cast(SessPid, {puback, PktId}). -spec pubrec(pid(), mqtt_packet_id()) -> ok. -pubrec(Session, PktId) -> - gen_server2:cast(Session, {pubrec, PktId}). +pubrec(SessPid, PktId) -> + gen_server2:cast(SessPid, {pubrec, PktId}). -spec pubrel(pid(), mqtt_packet_id()) -> ok. -pubrel(Session, PktId) -> - gen_server2:cast(Session, {pubrel, PktId}). +pubrel(SessPid, PktId) -> + gen_server2:cast(SessPid, {pubrel, PktId}). -spec pubcomp(pid(), mqtt_packet_id()) -> ok. -pubcomp(Session, PktId) -> - gen_server2:cast(Session, {pubcomp, PktId}). +pubcomp(SessPid, PktId) -> + gen_server2:cast(SessPid, {pubcomp, PktId}). %%------------------------------------------------------------------------------ %% @doc Unsubscribe Topics %% @end %%------------------------------------------------------------------------------ -spec unsubscribe(pid(), [binary()]) -> ok. -unsubscribe(Session, Topics) -> - gen_server2:call(Session, {unsubscribe, Topics}, infinity). +unsubscribe(SessPid, Topics) -> + gen_server2:call(SessPid, {unsubscribe, Topics}, ?PUBSUB_TIMEOUT). %%%============================================================================= %%% gen_server callbacks @@ -219,8 +224,7 @@ unsubscribe(Session, Topics) -> init([CleanSess, ClientId, ClientPid]) -> process_flag(trap_exit, true), - true = link(ClientPid), - QEnv = emqttd:env(mqtt, queue), + QEnv = emqttd:env(mqtt, queue), SessEnv = emqttd:env(mqtt, session), Session = #session{ clean_sess = CleanSess, @@ -241,12 +245,13 @@ init([CleanSess, ClientId, ClientPid]) -> collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, emqttd_sm:register_session(CleanSess, ClientId, info(Session)), + %% monitor client + MRef = erlang:monitor(process, ClientPid), %% start statistics - {ok, start_collector(Session), hibernate}. + {ok, start_collector(Session#session{client_mon = MRef}), hibernate}. prioritise_call(Msg, _From, _Len, _State) -> case Msg of - {destroy, _} -> 9; {unsubscribe, _} -> 2; {subscribe, _} -> 1; _ -> 0 @@ -254,6 +259,7 @@ prioritise_call(Msg, _From, _Len, _State) -> prioritise_cast(Msg, _Len, _State) -> case Msg of + {destroy, _} -> 10; {resume, _, _} -> 9; {pubrel, _PktId} -> 8; {pubcomp, _PktId} -> 8; @@ -264,6 +270,7 @@ prioritise_cast(Msg, _Len, _State) -> prioritise_info(Msg, _Len, _State) -> case Msg of + {'DOWN', _, process, _, _} -> 10; {'EXIT', _, _} -> 10; session_expired -> 10; {timeout, _, _} -> 5; @@ -273,7 +280,7 @@ prioritise_info(Msg, _Len, _State) -> end. handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId, - subscriptions = Subscriptions}) -> + subscriptions = Subscriptions}) -> TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), @@ -282,18 +289,18 @@ handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = Clie emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), - lager:info([{client, ClientId}], "Session ~s subscribe ~p, Granted QoS: ~p", + lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p", [ClientId, TopicTable, GrantedQos]), Subscriptions1 = lists:foldl(fun({Topic, Qos}, Acc) -> case lists:keyfind(Topic, 1, Acc) of {Topic, Qos} -> - lager:warning([{client, ClientId}], "Session ~s " - "resubscribe ~p: qos = ~p", [ClientId, Topic, Qos]), Acc; + lager:warning([{client, ClientId}], "Session(~s): " + "resubscribe ~s, qos = ~w", [ClientId, Topic, Qos]), Acc; {Topic, OldQos} -> - lager:warning([{client, ClientId}], "Session ~s " - "resubscribe ~p: old qos=~p, new qos=~p", [ClientId, Topic, OldQos, Qos]), + lager:warning([{client, ClientId}], "Session(~s): " + "resubscribe ~s, old qos=~w, new qos=~w", [ClientId, Topic, OldQos, Qos]), lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); false -> %%TODO: the design is ugly, rewrite later...:( @@ -314,7 +321,7 @@ handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = Client %% unsubscribe from topic tree ok = emqttd_pubsub:unsubscribe(Topics), - lager:info([{client, ClientId}], "Session ~s unsubscribe ~p", [ClientId, Topics]), + lager:info([{client, ClientId}], "Session(~s) unsubscribe ~p", [ClientId, Topics]), Subscriptions1 = lists:foldl(fun(Topic, Acc) -> @@ -322,7 +329,7 @@ handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = Client {Topic, _Qos} -> lists:keydelete(Topic, 1, Acc); false -> - lager:warning([{client, ClientId}], "Session ~s not subscribe ~s", [ClientId, Topic]), Acc + lager:warning([{client, ClientId}], "Session(~s) not subscribe ~s", [ClientId, Topic]), Acc end end, Subscriptions, Topics), @@ -338,36 +345,46 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; false -> - lager:critical([{client, ClientId}], "Session ~s dropped Qos2 message " + lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message " "for too many awaiting_rel: ~p", [ClientId, Msg]), {reply, {error, dropped}, Session} end; -handle_call({destroy, ClientId}, _From, Session = #session{client_id = ClientId}) -> - lager:warning("Session ~s destroyed", [ClientId]), - {stop, {shutdown, destroy}, ok, Session}; - handle_call(Req, _From, State) -> lager:critical("Unexpected Request: ~p", [Req]), - {reply, {error, badreq}, State}. + {reply, ok, State}. + +handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> + lager:warning([{client, ClientId}], "Session(~s) destroyed", [ClientId]), + {stop, {shutdown, destroy}, Session}; handle_cast({resume, ClientId, ClientPid}, Session) -> #session{client_id = ClientId, client_pid = OldClientPid, + client_mon = MRef, inflight_queue = InflightQ, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, expired_timer = ETimer} = Session, - lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]), + lager:info([{client, ClientId}], "Session(~s) resumed by ~p", [ClientId, ClientPid]), %% cancel expired timer cancel_timer(ETimer), - kick(ClientId, ClientPid, OldClientPid), - - true = link(ClientPid), + %% Kickout old client + if + OldClientPid == undefined -> + ok; + OldClientPid == ClientPid -> + ok; %% ?? + true -> + lager:error([{client, ClientId}], "Session(~s): ~p kickout ~p", + [ClientId, ClientPid, OldClientPid]), + OldClientPid ! {stop, duplicate_id, ClientPid}, + erlang:demonitor(MRef, [flush]) + end, %% Redeliver PUBREL [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)], @@ -379,6 +396,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)], Session1 = Session#session{client_pid = ClientPid, + client_mon = erlang:monitor(process, ClientPid), awaiting_ack = #{}, awaiting_comp = #{}, expired_timer = undefined}, @@ -399,7 +417,7 @@ handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_a cancel_timer(TRef), noreply(dequeue(acked(PktId, Session))); error -> - lager:critical("Session(~s): cannot find PUBACK '~p'!", [ClientId, PktId]), + lager:error([{client, ClientId}], "Session(~s) cannot find PUBACK ~w", [ClientId, PktId]), noreply(Session) end; @@ -416,7 +434,7 @@ handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId, Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}), noreply(dequeue(Session1)); error -> - lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]), + lager:error([{client, ClientId}], "Session(~s) cannot find PUBREC ~w", [ClientId, PktId]), noreply(Session) end; @@ -429,7 +447,7 @@ handle_cast({pubrel, PktId}, Session = #session{client_id = ClientId, emqttd_pubsub:publish(Msg), noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); error -> - lager:error("Session ~s cannot find PUBREL: pktid=~p!", [ClientId, PktId]), + lager:error([{client, ClientId}], "Session(~s) cannot find PUBREL ~w", [ClientId, PktId]), noreply(Session) end; @@ -440,7 +458,7 @@ handle_cast({pubcomp, PktId}, Session = #session{client_id = ClientId, awaiting_ cancel_timer(TRef), noreply(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}); error -> - lager:error("Session ~s cannot find PUBCOMP: PktId=~p", [ClientId, PktId]), + lager:error("Session(~s) cannot find PUBCOMP ~w", [ClientId, PktId]), noreply(Session) end; @@ -467,8 +485,6 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, Session = #session{messa true -> {noreply, deliver(Msg, Session)}; false -> - %%TODO: should emit alarm? - %%lager:error([{client, ClientId}], "Session ~s inflight queue is full!", [ClientId]), {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}} end; @@ -487,35 +503,35 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = {ok, {{Retries, Timeout}, _TRef}} -> TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), - {noreply, Session#session{awaiting_ack = AwaitingAck1}}; + {noreply, Session#session{awaiting_ack = AwaitingAck1}, hibernate}; error -> % TODO: too many logs when overloaded... % lager:error([{client, ClientId}], "Session ~s " % "Cannot find Awaiting Ack:~p", [ClientId, PktId]), - {noreply, Session} + {noreply, Session, hibernate} end; handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId, awaiting_rel = AwaitingRel}) -> case maps:find(PktId, AwaitingRel) of {ok, {Msg, _TRef}} -> - lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n" + lager:error([{client, ClientId}], "Session(~s) AwaitingRel Timout!~n" "Drop Message:~p", [ClientId, Msg]), noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); error -> - lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: PktId=~p", [ClientId, PktId]), - {noreply, Session} + lager:error([{client, ClientId}], "Session(~s) cannot find AwaitingRel ~w", [ClientId, PktId]), + {noreply, Session, hibernate} end; handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = ClientId, awaiting_comp = Awaiting}) -> case maps:find(PktId, Awaiting) of {ok, _TRef} -> - lager:error([{client, ClientId}], "Session ~s " + lager:error([{client, ClientId}], "Session(~s) " "Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]), noreply(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}); error -> - lager:error([{client, ClientId}], "Session ~s " + lager:error([{client, ClientId}], "Session(~s) " "Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]), noreply(Session) end; @@ -524,32 +540,29 @@ handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = emqttd_sm:register_session(CleanSess, ClientId, info(Session)), {noreply, start_collector(Session), hibernate}; -handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, - client_pid = ClientPid}) -> +handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = true, + client_pid = ClientPid}) -> {stop, normal, Session}; -handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, - client_id = ClientId, - client_pid = ClientPid, - expired_after = Expires}) -> - lager:info("Session ~s unlink with client ~p: reason=~p", - [ClientId, ClientPid, Reason]), +handle_info({'DOWN', _MRef, process, ClientPid, _}, Session = #session{clean_sess = false, + client_pid = ClientPid, + expired_after = Expires}) -> TRef = timer(Expires, session_expired), - noreply(Session#session{client_pid = undefined, expired_timer = TRef}); + noreply(Session#session{client_pid = undefined, client_mon = undefined, expired_timer = TRef}); -handle_info({'EXIT', Pid, _Reason}, Session = #session{client_id = ClientId, - client_pid = ClientPid}) -> - - lager:error("Session ~s received unexpected EXIT:" - " client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]), - {noreply, Session}; +handle_info({'DOWN', _MRef, process, Pid, Reason}, Session = #session{client_id = ClientId, + client_pid = ClientPid}) -> + lager:error([{client, ClientId}], "Session(~s): unexpected DOWN: " + "client_pid=~p, down_pid=~p, reason=~p", + [ClientId, ClientPid, Pid, Reason]), + noreply(Session); handle_info(session_expired, Session = #session{client_id = ClientId}) -> - lager:error("Session ~s expired, shutdown now!", [ClientId]), + lager:error("Session(~s) expired, shutdown now.", [ClientId]), {stop, {shutdown, expired}, Session}; handle_info(Info, Session = #session{client_id = ClientId}) -> - lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]), + lager:critical("Session(~s) unexpected info: ~p", [ClientId, Info]), {noreply, Session}. terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> @@ -562,19 +575,6 @@ code_change(_OldVsn, Session, _Extra) -> %%% Internal functions %%%============================================================================= -%%------------------------------------------------------------------------------ -%% Kick duplicated client -%%------------------------------------------------------------------------------ - -kick(_ClientId, _ClientPid, undefined) -> - ok; -kick(_ClientId, ClientPid, ClientPid) -> - ok; -kick(ClientId, ClientPid, OldClientPid) -> - lager:warning("Session '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, ClientPid, OldClientPid]), - unlink(OldClientPid), - OldClientPid ! {stop, duplicate_id, ClientPid}. - %%------------------------------------------------------------------------------ %% Check inflight and awaiting_rel %%------------------------------------------------------------------------------ @@ -688,15 +688,14 @@ info(#session{clean_sess = CleanSess, awaiting_comp = AwaitingComp, timestamp = CreatedAt}) -> Stats = emqttd_mqueue:stats(MessageQueue), - [{pid, self()}, - {clean_sess, CleanSess}, - {subscriptions, Subscriptions}, - {max_inflight, MaxInflight}, + [{clean_sess, CleanSess}, + {subscriptions, Subscriptions}, + {max_inflight, MaxInflight}, {inflight_queue, length(InflightQueue)}, - {message_queue, proplists:get_value(len, Stats)}, - {message_dropped, proplists:get_value(dropped, Stats)}, - {awaiting_rel, maps:size(AwaitingRel)}, - {awaiting_ack, maps:size(AwaitingAck)}, - {awaiting_comp, maps:size(AwaitingComp)}, - {created_at, CreatedAt}]. + {message_queue, proplists:get_value(len, Stats)}, + {message_dropped,proplists:get_value(dropped, Stats)}, + {awaiting_rel, maps:size(AwaitingRel)}, + {awaiting_ack, maps:size(AwaitingAck)}, + {awaiting_comp, maps:size(AwaitingComp)}, + {created_at, CreatedAt}]. diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 6d52c40ab..f2c268a03 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -38,7 +38,7 @@ -copy_mnesia({mnesia, [copy]}). %% API Function Exports --export([start_link/2, pool/0]). +-export([start_link/1, pool/0]). -export([start_session/2, lookup_session/1]). @@ -53,7 +53,7 @@ %% gen_server2 priorities -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). --record(state, {id, statsfun}). +-record(state, {id}). -define(SM_POOL, sm_pool). @@ -64,11 +64,11 @@ mnesia(boot) -> %% global session... ok = emqttd_mnesia:create_table(session, [ - {type, ordered_set}, - {ram_copies, [node()]}, - {record_name, mqtt_session}, - {attributes, record_info(fields, mqtt_session)}, - {index, [sess_pid]}]); + {type, ordered_set}, + {ram_copies, [node()]}, + {record_name, mqtt_session}, + {attributes, record_info(fields, mqtt_session)}, + {index, [sess_pid]}]); mnesia(copy) -> ok = emqttd_mnesia:copy_table(session). @@ -81,11 +81,9 @@ mnesia(copy) -> %% @doc Start a session manager %% @end %%------------------------------------------------------------------------------ --spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when - Id :: pos_integer(), - StatsFun :: fun(). -start_link(Id, StatsFun) -> - gen_server2:start_link(?MODULE, [Id, StatsFun], []). +-spec start_link(Id :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}. +start_link(Id) -> + gen_server2:start_link(?MODULE, [Id], []). %%------------------------------------------------------------------------------ %% @doc Pool name. @@ -119,14 +117,10 @@ lookup_session(ClientId) -> %%------------------------------------------------------------------------------ -spec register_session(CleanSess, ClientId, Info) -> ok when CleanSess :: boolean(), - ClientId :: binary(), - Info :: [tuple()]. -register_session(true, ClientId, Info) -> - ets:insert(mqtt_transient_session, {ClientId, Info}); - -register_session(false, ClientId, Info) -> - SM = gproc_pool:pick_worker(?SM_POOL, ClientId), - gen_server2:cast(SM, {register, ClientId, Info}). + ClientId :: binary(), + Info :: [tuple()]. +register_session(CleanSess, ClientId, Info) -> + ets:insert(sesstab(CleanSess), {{ClientId, self()}, Info}). %%------------------------------------------------------------------------------ %% @doc Unregister a session. @@ -135,21 +129,24 @@ register_session(false, ClientId, Info) -> -spec unregister_session(CleanSess, ClientId) -> ok when CleanSess :: boolean(), ClientId :: binary(). -unregister_session(true, ClientId) -> - ets:delete(mqtt_transient_session, ClientId); -unregister_session(false, ClientId) -> - SM = gproc_pool:pick_worker(?SM_POOL, ClientId), - gen_server2:cast(SM, {unregister, ClientId}). +unregister_session(CleanSess, ClientId) -> + ets:delete(sesstab(CleanSess), {ClientId, self()}). -call(SM, Req) -> gen_server2:call(SM, Req, infinity). +sesstab(true) -> + mqtt_transient_session; +sesstab(false) -> + mqtt_persistent_session. + +call(SM, Req) -> + gen_server2:call(SM, Req, infinity). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([Id, StatsFun]) -> +init([Id]) -> gproc_pool:connect_worker(?SM_POOL, {?MODULE, Id}), - {ok, #state{id = Id, statsfun = StatsFun}}. + {ok, #state{id = Id}}. prioritise_call(_Msg, _From, _Len, _State) -> 1. @@ -186,15 +183,6 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> handle_call(_Request, _From, State) -> {reply, ok, State}. -%% persistent session -handle_cast({register, ClientId, Info}, State) -> - ets:insert(mqtt_persistent_session, {ClientId, Info}), - {noreply, setstats(State)}; - -handle_cast({unregister, ClientId}, State) -> - ets:delete(mqtt_persistent_session, ClientId), - {noreply, setstats(State)}; - handle_cast(Msg, State) -> lager:critical("Unexpected Msg: ~p", [Msg]), {noreply, State}. @@ -204,7 +192,7 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> [mnesia:delete_object(session, Sess, write) || Sess <- mnesia:index_read(session, DownPid, #mqtt_session.sess_pid)] end), - {noreply, setstats(State)}; + {noreply, State}; handle_info(Info, State) -> lager:critical("Unexpected Info: ~p", [Info]), @@ -325,10 +313,5 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, end. remove_session(Session) -> - mnesia:transaction(fun() -> - mnesia:delete_object(session, Session, write) - end). - -setstats(State = #state{statsfun = StatsFun}) -> - StatsFun(ets:info(mqtt_persistent_session, size)), State. + mnesia:transaction(fun() -> mnesia:delete_object(session, Session, write) end). diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl new file mode 100644 index 000000000..d498cfac3 --- /dev/null +++ b/src/emqttd_sm_helper.erl @@ -0,0 +1,83 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd session helper. +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +%% TODO: Monitor mnesia node down... + +-module(emqttd_sm_helper). + +-author("Feng Lee "). + +-include("emqttd.hrl"). + +%% API Function Exports +-export([start_link/0]). + +-behaviour(gen_server). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {statsfun, ticker}). + +%%------------------------------------------------------------------------------ +%% @doc Start a session helper +%% @end +%%------------------------------------------------------------------------------ +-spec start_link() -> {ok, pid()} | ignore | {error, any()}. +start_link() -> + gen_server:start_link(?MODULE, [], []). + +init([]) -> + StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), + {ok, TRef} = timer:send_interval(1000, self(), tick), + {ok, #state{statsfun = StatsFun, ticker = TRef}}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(Msg, State) -> + lager:critical("Unexpected Msg: ~p", [Msg]), + {noreply, State}. + +handle_info(tick, State) -> + {noreply, setstats(State)}; + +handle_info(Info, State) -> + lager:critical("Unexpected Info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State = #state{ticker = TRef}) -> + timer:cancel(TRef). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +setstats(State = #state{statsfun = StatsFun}) -> + StatsFun(ets:info(mqtt_persistent_session, size)), State. + + diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index 468016795..2c49ef01e 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -31,6 +31,9 @@ -include("emqttd.hrl"). +-define(CHILD(Mod), {Mod, {Mod, start_link, []}, + permanent, 5000, worker, [Mod]}). + %% API -export([start_link/0]). @@ -46,15 +49,14 @@ init([]) -> init_session_ets(), Schedulers = erlang:system_info(schedulers), gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), - StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), - Children = lists:map( + Managers = lists:map( fun(I) -> - Name = {emqttd_sm, I}, - gproc_pool:add_worker(emqttd_sm:pool(), Name, I), - {Name, {emqttd_sm, start_link, [I, StatsFun]}, - permanent, 10000, worker, [emqttd_sm]} + Name = {emqttd_sm, I}, + gproc_pool:add_worker(emqttd_sm:pool(), Name, I), + {Name, {emqttd_sm, start_link, [I]}, + permanent, 10000, worker, [emqttd_sm]} end, lists:seq(1, Schedulers)), - {ok, {{one_for_all, 10, 100}, Children}}. + {ok, {{one_for_all, 10, 100}, [?CHILD(emqttd_sm_helper) | Managers]}}. init_session_ets() -> Tables = [mqtt_transient_session, mqtt_persistent_session], diff --git a/src/emqttd_sup.erl b/src/emqttd_sup.erl index 961dd91fd..ceeb1fc12 100644 --- a/src/emqttd_sup.erl +++ b/src/emqttd_sup.erl @@ -40,7 +40,8 @@ -export([init/1]). %% Helper macro for declaring children of supervisor --define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, permanent, 5000, Type, [Mod]}). +-define(CHILD(Mod, Type), {Mod, {Mod, start_link, []}, + permanent, 5000, Type, [Mod]}). %%%============================================================================= %%% API