From 1935b414c70f350f6f416182449c5e46ad009fb2 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 10 Oct 2015 13:12:00 +0800 Subject: [PATCH] issue #282 - improve sm --- include/emqttd.hrl | 7 +- src/emqttd_sm.erl | 139 ++++++++++++++++----------------------- src/emqttd_sm_helper.erl | 37 ++++++++--- 3 files changed, 87 insertions(+), 96 deletions(-) diff --git a/include/emqttd.hrl b/include/emqttd.hrl index 53a5ba77d..5c2f61019 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -105,10 +105,9 @@ %% MQTT Session %%------------------------------------------------------------------------------ -record(mqtt_session, { - client_id, - sess_pid, - persistent, - on_node + client_id :: binary(), + sess_pid :: pid(), + persistent :: boolean() }). -type mqtt_session() :: #mqtt_session{}. diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 99c99c83f..aeada1bc6 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -57,7 +57,6 @@ -define(SM_POOL, ?MODULE). -%% TODO... -define(SESSION_TIMEOUT, 60000). %%%============================================================================= @@ -158,7 +157,7 @@ prioritise_cast(_Msg, _Len, _State) -> 0. prioritise_info(_Msg, _Len, _State) -> - 1. + 2. %% persistent session handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> @@ -170,6 +169,7 @@ handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> {reply, resume_session(Session, ClientPid), State} end; +%% transient session handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> case lookup_session(ClientId) of undefined -> @@ -216,12 +216,11 @@ create_session(CleanSess, ClientId, ClientPid) -> {ok, SessPid} -> Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, - persistent = not CleanSess, - on_node = node()}, + persistent = not CleanSess}, case insert_session(Session) of - {aborted, {conflict, Node}} -> - %% conflict with othe node? - lager:critical("Session ~s conflict with node ~p!", [ClientId, Node]), + {aborted, {conflict, ConflictPid}} -> + %% Conflict with othe node? + lager:critical("Session(~s): Conflict with ~p!", [ClientId, ConflictPid]), {error, conflict}; {atomic, ok} -> erlang:monitor(process, SessPid), @@ -232,89 +231,65 @@ create_session(CleanSess, ClientId, ClientPid) -> end. insert_session(Session = #mqtt_session{client_id = ClientId}) -> - mnesia:transaction(fun() -> - case mnesia:wread({session, ClientId}) of - [] -> - mnesia:write(session, Session, write); - [#mqtt_session{on_node = Node}] -> - mnesia:abort({conflict, Node}) - end - end). + mnesia:transaction( + fun() -> + case mnesia:wread({session, ClientId}) of + [] -> + mnesia:write(session, Session, write); + [#mqtt_session{sess_pid = SessPid}] -> + mnesia:abort({conflict, SessPid}) + end + end). -%% local node +%% Local node resume_session(#mqtt_session{client_id = ClientId, - sess_pid = SessPid, - on_node = Node}, ClientPid) - when Node =:= node() -> - case is_process_alive(SessPid) of - true -> - emqttd_session:resume(SessPid, ClientId, ClientPid), + sess_pid = SessPid}, ClientPid) + when node(SessPid) =:= node() -> + + emqttd_session:resume(SessPid, ClientId, ClientPid), + {ok, SessPid}; + +%% Remote node +resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) -> + Node = node(SessPid), + case rpc:call(Node, emqttd_session, resume, [SessPid, ClientId, ClientPid]) of + ok -> {ok, SessPid}; - false -> - lager:critical("Session ~s@~p died unexpectedly!", [ClientId, SessPid]), - {error, session_died} - end; - -%% remote node -resume_session(Session = #mqtt_session{client_id = ClientId, - sess_pid = SessPid, - on_node = Node}, ClientPid) -> - case emqttd:is_running(Node) of - true -> - case rpc:call(Node, emqttd_session, resume, [SessPid, ClientId, ClientPid]) of - ok -> - {ok, SessPid}; - {badrpc, Reason} -> - lager:critical("Resume session ~s on remote node ~p failed for ~p", - [ClientId, Node, Reason]), - {error, Reason} - end; - false -> - lager:critical("Session ~s died for node ~p down!", [ClientId, Node]), + {badrpc, nodedown} -> + lager:critical("Session(~s): Died for node ~s down!", [ClientId, Node]), remove_session(Session), - {error, session_node_down} + {error, session_nodedown}; + {badrpc, Reason} -> + lager:critical("Session(~s): Failed to resume from node ~s for ~p", + [ClientId, Node, Reason]), + {error, Reason} end. -%% local node -destroy_session(Session = #mqtt_session{client_id = ClientId, - sess_pid = SessPid, - on_node = Node}) when Node =:= node() -> - case is_process_alive(SessPid) of - true -> - emqttd_session:destroy(SessPid, ClientId); - false -> - lager:critical("Session ~s@~p died unexpectedly!", [ClientId, SessPid]) - end, - case remove_session(Session) of - {atomic, ok} -> ok; - {aborted, Error} -> {error, Error} - end; +%% Local node +destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}) + when node(SessPid) =:= node() -> + emqttd_session:destroy(SessPid, ClientId), + remove_session(Session); -%% remote node +%% Remote node destroy_session(Session = #mqtt_session{client_id = ClientId, - sess_pid = SessPid, - on_node = Node}) -> - case emqttd:is_running(Node) of - true -> - case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of - ok -> - case remove_session(Session) of - {atomic, ok} -> ok; - {aborted, Error} -> {error, Error} - end; - {badrpc, Reason} -> - lager:critical("Destroy session ~s on remote node ~p failed for ~p", - [ClientId, Node, Reason]), - {error, list_to_atom("session_" ++ atom_to_list(Reason))} - end; - false -> - lager:error("Session ~s died for node ~p down!", [ClientId, Node]), - case remove_session(Session) of - {atomic, ok} -> ok; - {aborted, Error} -> {error, Error} - end - end. + sess_pid = SessPid}) -> + Node = node(SessPid), + case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of + ok -> + remove_session(Session); + {badrpc, nodedown} -> + lager:error("Session(~s): Died for node ~s down!", [ClientId, Node]), + remove_session(Session); + {badrpc, Reason} -> + lager:error("Session(~s): Failed to destory ~p on remote node ~p for ~s", + [ClientId, SessPid, Node, Reason]), + {error, Reason} + end. remove_session(Session) -> - mnesia:transaction(fun() -> mnesia:delete_object(session, Session, write) end). + case mnesia:transaction(fun mnesia:delete_object/3, [session, Session, write]) of + {atomic, ok} -> ok; + {aborted, Error} -> {error, Error} + end. diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index b3a109ecd..0c32c6530 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -25,14 +25,14 @@ %%% @end %%%----------------------------------------------------------------------------- -%% TODO: Monitor mnesia node down... - -module(emqttd_sm_helper). -author("Feng Lee "). -include("emqttd.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + %% API Function Exports -export([start_link/0]). @@ -42,7 +42,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {statsfun, ticker}). +-record(state, {stats_fun, tick_tref}). %%------------------------------------------------------------------------------ %% @doc Start a session helper @@ -53,9 +53,10 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> + %%mnesia:subscribe(system), + {ok, TRef} = timer:send_interval(1000, tick), StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), - {ok, TRef} = timer:send_interval(1000, self(), tick), - {ok, #state{statsfun = StatsFun, ticker = TRef}}. + {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}. handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -64,20 +65,36 @@ handle_cast(Msg, State) -> lager:critical("Unexpected Msg: ~p", [Msg]), {noreply, State}. +handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> + lager:error("!!!Mnesia node down: ~s", [Node]), + Fun = fun() -> + ClientIds = + mnesia:select(session, [{#mqtt_session{client_id = '$1', sess_pid = '$2'}, + [{'==', {node, '$2'}, Node}], + ['$1']}]), + lists:foreach(fun(Id) -> mnesia:delete({session, Id}) end, ClientIds) + end, + mnesia:async_dirty(Fun), + {noreply, State}; + handle_info(tick, State) -> - {noreply, setstats(State)}; + {noreply, setstats(State), hibernate}; handle_info(Info, State) -> lager:critical("Unexpected Info: ~p", [Info]), {noreply, State}. -terminate(_Reason, _State = #state{ticker = TRef}) -> - timer:cancel(TRef). +terminate(_Reason, _State = #state{tick_tref = TRef}) -> + timer:cancel(TRef), + mnesia:unsubscribe(system). code_change(_OldVsn, State, _Extra) -> {ok, State}. -setstats(State = #state{statsfun = StatsFun}) -> +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + +setstats(State = #state{stats_fun = StatsFun}) -> StatsFun(ets:info(mqtt_persistent_session, size)), State. -