issue #282 - improve sm

This commit is contained in:
Feng Lee 2015-10-10 13:12:00 +08:00
parent d63f043566
commit 1935b414c7
3 changed files with 87 additions and 96 deletions

View File

@ -105,10 +105,9 @@
%% MQTT Session %% MQTT Session
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-record(mqtt_session, { -record(mqtt_session, {
client_id, client_id :: binary(),
sess_pid, sess_pid :: pid(),
persistent, persistent :: boolean()
on_node
}). }).
-type mqtt_session() :: #mqtt_session{}. -type mqtt_session() :: #mqtt_session{}.

View File

@ -57,7 +57,6 @@
-define(SM_POOL, ?MODULE). -define(SM_POOL, ?MODULE).
%% TODO...
-define(SESSION_TIMEOUT, 60000). -define(SESSION_TIMEOUT, 60000).
%%%============================================================================= %%%=============================================================================
@ -158,7 +157,7 @@ prioritise_cast(_Msg, _Len, _State) ->
0. 0.
prioritise_info(_Msg, _Len, _State) -> prioritise_info(_Msg, _Len, _State) ->
1. 2.
%% persistent session %% persistent session
handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
@ -170,6 +169,7 @@ handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
{reply, resume_session(Session, ClientPid), State} {reply, resume_session(Session, ClientPid), State}
end; end;
%% transient session
handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
case lookup_session(ClientId) of case lookup_session(ClientId) of
undefined -> undefined ->
@ -216,12 +216,11 @@ create_session(CleanSess, ClientId, ClientPid) ->
{ok, SessPid} -> {ok, SessPid} ->
Session = #mqtt_session{client_id = ClientId, Session = #mqtt_session{client_id = ClientId,
sess_pid = SessPid, sess_pid = SessPid,
persistent = not CleanSess, persistent = not CleanSess},
on_node = node()},
case insert_session(Session) of case insert_session(Session) of
{aborted, {conflict, Node}} -> {aborted, {conflict, ConflictPid}} ->
%% conflict with othe node? %% Conflict with othe node?
lager:critical("Session ~s conflict with node ~p!", [ClientId, Node]), lager:critical("Session(~s): Conflict with ~p!", [ClientId, ConflictPid]),
{error, conflict}; {error, conflict};
{atomic, ok} -> {atomic, ok} ->
erlang:monitor(process, SessPid), erlang:monitor(process, SessPid),
@ -232,89 +231,65 @@ create_session(CleanSess, ClientId, ClientPid) ->
end. end.
insert_session(Session = #mqtt_session{client_id = ClientId}) -> insert_session(Session = #mqtt_session{client_id = ClientId}) ->
mnesia:transaction(fun() -> mnesia:transaction(
case mnesia:wread({session, ClientId}) of fun() ->
[] -> case mnesia:wread({session, ClientId}) of
mnesia:write(session, Session, write); [] ->
[#mqtt_session{on_node = Node}] -> mnesia:write(session, Session, write);
mnesia:abort({conflict, Node}) [#mqtt_session{sess_pid = SessPid}] ->
end mnesia:abort({conflict, SessPid})
end). end
end).
%% local node %% Local node
resume_session(#mqtt_session{client_id = ClientId, resume_session(#mqtt_session{client_id = ClientId,
sess_pid = SessPid, sess_pid = SessPid}, ClientPid)
on_node = Node}, ClientPid) when node(SessPid) =:= node() ->
when Node =:= node() ->
case is_process_alive(SessPid) of emqttd_session:resume(SessPid, ClientId, ClientPid),
true -> {ok, SessPid};
emqttd_session:resume(SessPid, ClientId, ClientPid),
%% Remote node
resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) ->
Node = node(SessPid),
case rpc:call(Node, emqttd_session, resume, [SessPid, ClientId, ClientPid]) of
ok ->
{ok, SessPid}; {ok, SessPid};
false -> {badrpc, nodedown} ->
lager:critical("Session ~s@~p died unexpectedly!", [ClientId, SessPid]), lager:critical("Session(~s): Died for node ~s down!", [ClientId, Node]),
{error, session_died}
end;
%% remote node
resume_session(Session = #mqtt_session{client_id = ClientId,
sess_pid = SessPid,
on_node = Node}, ClientPid) ->
case emqttd:is_running(Node) of
true ->
case rpc:call(Node, emqttd_session, resume, [SessPid, ClientId, ClientPid]) of
ok ->
{ok, SessPid};
{badrpc, Reason} ->
lager:critical("Resume session ~s on remote node ~p failed for ~p",
[ClientId, Node, Reason]),
{error, Reason}
end;
false ->
lager:critical("Session ~s died for node ~p down!", [ClientId, Node]),
remove_session(Session), remove_session(Session),
{error, session_node_down} {error, session_nodedown};
{badrpc, Reason} ->
lager:critical("Session(~s): Failed to resume from node ~s for ~p",
[ClientId, Node, Reason]),
{error, Reason}
end. end.
%% local node %% Local node
destroy_session(Session = #mqtt_session{client_id = ClientId, destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid})
sess_pid = SessPid, when node(SessPid) =:= node() ->
on_node = Node}) when Node =:= node() -> emqttd_session:destroy(SessPid, ClientId),
case is_process_alive(SessPid) of remove_session(Session);
true ->
emqttd_session:destroy(SessPid, ClientId);
false ->
lager:critical("Session ~s@~p died unexpectedly!", [ClientId, SessPid])
end,
case remove_session(Session) of
{atomic, ok} -> ok;
{aborted, Error} -> {error, Error}
end;
%% remote node %% Remote node
destroy_session(Session = #mqtt_session{client_id = ClientId, destroy_session(Session = #mqtt_session{client_id = ClientId,
sess_pid = SessPid, sess_pid = SessPid}) ->
on_node = Node}) -> Node = node(SessPid),
case emqttd:is_running(Node) of case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of
true -> ok ->
case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of remove_session(Session);
ok -> {badrpc, nodedown} ->
case remove_session(Session) of lager:error("Session(~s): Died for node ~s down!", [ClientId, Node]),
{atomic, ok} -> ok; remove_session(Session);
{aborted, Error} -> {error, Error} {badrpc, Reason} ->
end; lager:error("Session(~s): Failed to destory ~p on remote node ~p for ~s",
{badrpc, Reason} -> [ClientId, SessPid, Node, Reason]),
lager:critical("Destroy session ~s on remote node ~p failed for ~p", {error, Reason}
[ClientId, Node, Reason]), end.
{error, list_to_atom("session_" ++ atom_to_list(Reason))}
end;
false ->
lager:error("Session ~s died for node ~p down!", [ClientId, Node]),
case remove_session(Session) of
{atomic, ok} -> ok;
{aborted, Error} -> {error, Error}
end
end.
remove_session(Session) -> remove_session(Session) ->
mnesia:transaction(fun() -> mnesia:delete_object(session, Session, write) end). case mnesia:transaction(fun mnesia:delete_object/3, [session, Session, write]) of
{atomic, ok} -> ok;
{aborted, Error} -> {error, Error}
end.

View File

@ -25,14 +25,14 @@
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%% TODO: Monitor mnesia node down...
-module(emqttd_sm_helper). -module(emqttd_sm_helper).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%% API Function Exports %% API Function Exports
-export([start_link/0]). -export([start_link/0]).
@ -42,7 +42,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {statsfun, ticker}). -record(state, {stats_fun, tick_tref}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Start a session helper %% @doc Start a session helper
@ -53,9 +53,10 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) -> init([]) ->
%%mnesia:subscribe(system),
{ok, TRef} = timer:send_interval(1000, tick),
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
{ok, TRef} = timer:send_interval(1000, self(), tick), {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}.
{ok, #state{statsfun = StatsFun, ticker = TRef}}.
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
@ -64,20 +65,36 @@ handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p", [Msg]), lager:critical("Unexpected Msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
lager:error("!!!Mnesia node down: ~s", [Node]),
Fun = fun() ->
ClientIds =
mnesia:select(session, [{#mqtt_session{client_id = '$1', sess_pid = '$2'},
[{'==', {node, '$2'}, Node}],
['$1']}]),
lists:foreach(fun(Id) -> mnesia:delete({session, Id}) end, ClientIds)
end,
mnesia:async_dirty(Fun),
{noreply, State};
handle_info(tick, State) -> handle_info(tick, State) ->
{noreply, setstats(State)}; {noreply, setstats(State), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p", [Info]), lager:critical("Unexpected Info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State = #state{ticker = TRef}) -> terminate(_Reason, _State = #state{tick_tref = TRef}) ->
timer:cancel(TRef). timer:cancel(TRef),
mnesia:unsubscribe(system).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
setstats(State = #state{statsfun = StatsFun}) -> %%%=============================================================================
%%% Internal functions
%%%=============================================================================
setstats(State = #state{stats_fun = StatsFun}) ->
StatsFun(ets:info(mqtt_persistent_session, size)), State. StatsFun(ets:info(mqtt_persistent_session, size)), State.