Merge pull request #331 from emqtt/dev-feng

fix issue #282, and fix some 'critical' logs
This commit is contained in:
Feng Lee 2015-10-10 14:43:40 +08:00
commit f84a5d263a
10 changed files with 108 additions and 108 deletions

View File

@ -105,10 +105,9 @@
%% MQTT Session %% MQTT Session
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-record(mqtt_session, { -record(mqtt_session, {
client_id, client_id :: binary(),
sess_pid, sess_pid :: pid(),
persistent, persistent :: boolean()
on_node
}). }).
-type mqtt_session() :: #mqtt_session{}. -type mqtt_session() :: #mqtt_session{}.

View File

@ -257,7 +257,7 @@ handle_call({unhook, Hook, Name}, _From, State) ->
{reply, Reply, State}; {reply, Reply, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:critical("Unexpected request: ~p", [Req]), lager:error("Unexpected request: ~p", [Req]),
{reply, {error, badreq}, State}. {reply, {error, badreq}, State}.
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->

View File

@ -110,7 +110,7 @@ handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State}; {stop, {shutdown, kick}, ok, State};
handle_call(Req, _From, State = #state{peername = Peername}) -> handle_call(Req, _From, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]), lager:error("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
{reply, {error, unsupported_request}, State}. {reply, {error, unsupported_request}, State}.
handle_cast({subscribe, TopicTable}, State) -> handle_cast({subscribe, TopicTable}, State) ->
@ -120,7 +120,7 @@ handle_cast({unsubscribe, Topics}, State) ->
with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State);
handle_cast(Msg, State = #state{peername = Peername}) -> handle_cast(Msg, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]), lager:error("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
{noreply, State}. {noreply, State}.
handle_info(timeout, State) -> handle_info(timeout, State) ->
@ -152,7 +152,7 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
network_error(Reason, State); network_error(Reason, State);
handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peername}) -> handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]), lager:error("Client ~s: unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]),
{noreply, State}; {noreply, State};
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) -> handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) ->
@ -180,7 +180,7 @@ handle_info({keepalive, check}, State = #state{peername = Peername, keepalive =
end; end;
handle_info(Info, State = #state{peername = Peername}) -> handle_info(Info, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]), lager:error("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]),
{noreply, State}. {noreply, State}.
terminate(Reason, #state{peername = Peername, terminate(Reason, #state{peername = Peername,

View File

@ -129,11 +129,11 @@ handle_cast({unregister, ClientId, Pid}, State) ->
{noreply, setstats(State)}; {noreply, setstats(State)};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p", [Msg]), lager:error("Unexpected Msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info(Info, State) -> handle_info(Info, State) ->
lager:critical("Unexpected Msg: ~p", [Info]), lager:error("Unexpected Msg: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{id = Id}) -> terminate(_Reason, #state{id = Id}) ->

View File

@ -385,7 +385,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa
end; end;
handle_info(Info, State) -> handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p", [Info]), lager:error("Unexpected Info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->

View File

@ -162,7 +162,7 @@ handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p", [Msg]), lager:error("Unexpected Msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info(stats, State = #state{stats_fun = StatsFun}) -> handle_info(stats, State = #state{stats_fun = StatsFun}) ->
@ -174,7 +174,7 @@ handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p", [Info]), lager:error("Unexpected Info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) -> terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) ->

View File

@ -293,7 +293,7 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From,
end; end;
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:critical("Unexpected Request: ~p", [Req]), lager:error("Unexpected Request: ~p", [Req]),
{reply, ok, State}. {reply, ok, State}.
handle_cast({subscribe, TopicTable0, Callback}, Session = #session{ handle_cast({subscribe, TopicTable0, Callback}, Session = #session{
@ -469,7 +469,7 @@ handle_cast({pubcomp, PktId}, Session = #session{client_id = ClientId, awaiting_
end; end;
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p, State: ~p", [Msg, State]), lager:error("Unexpected Msg: ~p, State: ~p", [Msg, State]),
{noreply, State}. {noreply, State}.
%% Queue messages when client is offline %% Queue messages when client is offline
@ -570,7 +570,7 @@ handle_info(session_expired, Session = #session{client_id = ClientId}) ->
{stop, {shutdown, expired}, Session}; {stop, {shutdown, expired}, Session};
handle_info(Info, Session = #session{client_id = ClientId}) -> handle_info(Info, Session = #session{client_id = ClientId}) ->
lager:critical("Session(~s) unexpected info: ~p", [ClientId, Info]), lager:error("Session(~s) unexpected info: ~p", [ClientId, Info]),
{noreply, Session}. {noreply, Session}.
terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->

View File

@ -57,7 +57,6 @@
-define(SM_POOL, ?MODULE). -define(SM_POOL, ?MODULE).
%% TODO...
-define(SESSION_TIMEOUT, 60000). -define(SESSION_TIMEOUT, 60000).
%%%============================================================================= %%%=============================================================================
@ -158,7 +157,7 @@ prioritise_cast(_Msg, _Len, _State) ->
0. 0.
prioritise_info(_Msg, _Len, _State) -> prioritise_info(_Msg, _Len, _State) ->
1. 2.
%% persistent session %% persistent session
handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
@ -170,6 +169,7 @@ handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
{reply, resume_session(Session, ClientPid), State} {reply, resume_session(Session, ClientPid), State}
end; end;
%% transient session
handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
case lookup_session(ClientId) of case lookup_session(ClientId) of
undefined -> undefined ->
@ -187,7 +187,7 @@ handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p", [Msg]), lager:error("Unexpected Msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) ->
@ -198,7 +198,7 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) ->
{noreply, State}; {noreply, State};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p", [Info]), lager:error("Unexpected Info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{id = Id}) -> terminate(_Reason, #state{id = Id}) ->
@ -216,12 +216,11 @@ create_session(CleanSess, ClientId, ClientPid) ->
{ok, SessPid} -> {ok, SessPid} ->
Session = #mqtt_session{client_id = ClientId, Session = #mqtt_session{client_id = ClientId,
sess_pid = SessPid, sess_pid = SessPid,
persistent = not CleanSess, persistent = not CleanSess},
on_node = node()},
case insert_session(Session) of case insert_session(Session) of
{aborted, {conflict, Node}} -> {aborted, {conflict, ConflictPid}} ->
%% conflict with othe node? %% Conflict with othe node?
lager:critical("Session ~s conflict with node ~p!", [ClientId, Node]), lager:error("Session(~s): Conflict with ~p!", [ClientId, ConflictPid]),
{error, conflict}; {error, conflict};
{atomic, ok} -> {atomic, ok} ->
erlang:monitor(process, SessPid), erlang:monitor(process, SessPid),
@ -232,89 +231,71 @@ create_session(CleanSess, ClientId, ClientPid) ->
end. end.
insert_session(Session = #mqtt_session{client_id = ClientId}) -> insert_session(Session = #mqtt_session{client_id = ClientId}) ->
mnesia:transaction(fun() -> mnesia:transaction(
fun() ->
case mnesia:wread({session, ClientId}) of case mnesia:wread({session, ClientId}) of
[] -> [] ->
mnesia:write(session, Session, write); mnesia:write(session, Session, write);
[#mqtt_session{on_node = Node}] -> [#mqtt_session{sess_pid = SessPid}] ->
mnesia:abort({conflict, Node}) mnesia:abort({conflict, SessPid})
end end
end). end).
%% local node %% Local node
resume_session(#mqtt_session{client_id = ClientId, resume_session(#mqtt_session{client_id = ClientId,
sess_pid = SessPid, sess_pid = SessPid}, ClientPid)
on_node = Node}, ClientPid) when node(SessPid) =:= node() ->
when Node =:= node() ->
case is_process_alive(SessPid) of case is_process_alive(SessPid) of
true -> true ->
emqttd_session:resume(SessPid, ClientId, ClientPid), emqttd_session:resume(SessPid, ClientId, ClientPid),
{ok, SessPid}; {ok, SessPid};
false -> false ->
lager:critical("Session ~s@~p died unexpectedly!", [ClientId, SessPid]), lager:error("Session(~s): Cannot resume ~p, it seems already dead!", [ClientId, SessPid]),
{error, session_died} {error, session_died}
end; end;
%% remote node %% Remote node
resume_session(Session = #mqtt_session{client_id = ClientId, resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) ->
sess_pid = SessPid, Node = node(SessPid),
on_node = Node}, ClientPid) ->
case emqttd:is_running(Node) of
true ->
case rpc:call(Node, emqttd_session, resume, [SessPid, ClientId, ClientPid]) of case rpc:call(Node, emqttd_session, resume, [SessPid, ClientId, ClientPid]) of
ok -> ok ->
{ok, SessPid}; {ok, SessPid};
{badrpc, nodedown} ->
lager:error("Session(~s): Died for node ~s down!", [ClientId, Node]),
remove_session(Session),
{error, session_nodedown};
{badrpc, Reason} -> {badrpc, Reason} ->
lager:critical("Resume session ~s on remote node ~p failed for ~p", lager:error("Session(~s): Failed to resume from node ~s for ~p",
[ClientId, Node, Reason]), [ClientId, Node, Reason]),
{error, Reason} {error, Reason}
end;
false ->
lager:critical("Session ~s died for node ~p down!", [ClientId, Node]),
remove_session(Session),
{error, session_node_down}
end. end.
%% local node %% Local node
destroy_session(Session = #mqtt_session{client_id = ClientId, destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid})
sess_pid = SessPid, when node(SessPid) =:= node() ->
on_node = Node}) when Node =:= node() -> emqttd_session:destroy(SessPid, ClientId),
case is_process_alive(SessPid) of remove_session(Session);
true ->
emqttd_session:destroy(SessPid, ClientId);
false ->
lager:critical("Session ~s@~p died unexpectedly!", [ClientId, SessPid])
end,
case remove_session(Session) of
{atomic, ok} -> ok;
{aborted, Error} -> {error, Error}
end;
%% remote node %% Remote node
destroy_session(Session = #mqtt_session{client_id = ClientId, destroy_session(Session = #mqtt_session{client_id = ClientId,
sess_pid = SessPid, sess_pid = SessPid}) ->
on_node = Node}) -> Node = node(SessPid),
case emqttd:is_running(Node) of
true ->
case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of
ok -> ok ->
case remove_session(Session) of remove_session(Session);
{atomic, ok} -> ok; {badrpc, nodedown} ->
{aborted, Error} -> {error, Error} lager:error("Session(~s): Died for node ~s down!", [ClientId, Node]),
end; remove_session(Session);
{badrpc, Reason} -> {badrpc, Reason} ->
lager:critical("Destroy session ~s on remote node ~p failed for ~p", lager:error("Session(~s): Failed to destory ~p on remote node ~p for ~s",
[ClientId, Node, Reason]), [ClientId, SessPid, Node, Reason]),
{error, list_to_atom("session_" ++ atom_to_list(Reason))} {error, Reason}
end;
false ->
lager:error("Session ~s died for node ~p down!", [ClientId, Node]),
case remove_session(Session) of
{atomic, ok} -> ok;
{aborted, Error} -> {error, Error}
end
end. end.
remove_session(Session) -> remove_session(Session) ->
mnesia:transaction(fun() -> mnesia:delete_object(session, Session, write) end). case mnesia:transaction(fun mnesia:delete_object/3, [session, Session, write]) of
{atomic, ok} -> ok;
{aborted, Error} -> {error, Error}
end.

View File

@ -25,14 +25,14 @@
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%% TODO: Monitor mnesia node down...
-module(emqttd_sm_helper). -module(emqttd_sm_helper).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%% API Function Exports %% API Function Exports
-export([start_link/0]). -export([start_link/0]).
@ -42,7 +42,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {statsfun, ticker}). -record(state, {stats_fun, tick_tref}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Start a session helper %% @doc Start a session helper
@ -53,31 +53,51 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) -> init([]) ->
mnesia:subscribe(system),
{ok, TRef} = timer:send_interval(1000, tick),
StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
{ok, TRef} = timer:send_interval(1000, self(), tick), {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}.
{ok, #state{statsfun = StatsFun, ticker = TRef}}.
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:critical("Unexpected Msg: ~p", [Msg]), lager:error("Unexpected Msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
lager:error("!!!Mnesia node down: ~s", [Node]),
Fun = fun() ->
ClientIds =
mnesia:select(session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'},
[{'==', {node, '$2'}, Node}],
['$1']}]),
lists:foreach(fun(ClientId) -> mnesia:delete({session, ClientId}) end, ClientIds)
end,
mnesia:async_dirty(Fun),
{noreply, State};
handle_info({mnesia_system_event, {mnesia_up, _Node}}, State) ->
{noreply, State};
handle_info(tick, State) -> handle_info(tick, State) ->
{noreply, setstats(State)}; {noreply, setstats(State), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:critical("Unexpected Info: ~p", [Info]), lager:error("Unexpected Info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State = #state{ticker = TRef}) -> terminate(_Reason, _State = #state{tick_tref = TRef}) ->
timer:cancel(TRef). timer:cancel(TRef),
mnesia:unsubscribe(system).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
setstats(State = #state{statsfun = StatsFun}) -> %%%=============================================================================
%%% Internal functions
%%%=============================================================================
setstats(State = #state{stats_fun = StatsFun}) ->
StatsFun(ets:info(mqtt_persistent_session, size)), State. StatsFun(ets:info(mqtt_persistent_session, size)), State.

View File

@ -206,7 +206,7 @@ handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto
stop({shutdown, websocket_closed}, State); stop({shutdown, websocket_closed}, State);
handle_info(Info, State = #client_state{request = Req}) -> handle_info(Info, State = #client_state{request = Req}) ->
lager:critical("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]), lager:error("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]),
noreply(State). noreply(State).
terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) -> terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) ->