From 4ed62d018e4cc7566da9fc5af5a627b742e41ce0 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 30 Oct 2015 21:20:37 +0800 Subject: [PATCH] refactor log --- src/emqttd_cm.erl | 21 +++-- src/emqttd_pubsub.erl | 4 +- src/emqttd_session.erl | 208 +++++++++++++++++++++-------------------- src/emqttd_sm.erl | 36 +++---- 4 files changed, 136 insertions(+), 133 deletions(-) diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index db198c176..9f5462b57 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -37,8 +37,6 @@ -behaviour(gen_server2). --define(SERVER, ?MODULE). - %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -47,6 +45,9 @@ -define(CM_POOL, ?MODULE). +-define(LOG(Level, Format, Args, Client), + lager:Level("CM(~s): " ++ Format, [Client#mqtt_client.client_id|Args])). + %%%============================================================================= %%% API %%%============================================================================= @@ -102,15 +103,16 @@ init([Id, StatsFun]) -> handle_call(Req, _From, State) -> lager:error("unexpected request: ~p", [Req]), - {reply, {error, badreq}, State}. + {reply, {error, unsupported_req}, State}. -handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, State) -> +handle_cast({register, Client = #mqtt_client{client_id = ClientId, + client_pid = Pid}}, State) -> case ets:lookup(mqtt_client, ClientId) of [#mqtt_client{client_pid = Pid}] -> - lager:error("ClientId '~s' has been registered with ~p", [ClientId, Pid]), ignore; [#mqtt_client{client_pid = OldPid}] -> - lager:warning("ClientId '~s' is duplicated: pid=~p, oldpid=~p", [ClientId, Pid, OldPid]); + %% TODO: should cancel monitor + ?LOG(warning, "client ~p conflict with ~p", [Pid, OldPid], Client); [] -> ok end, @@ -121,10 +123,10 @@ handle_cast({unregister, ClientId, Pid}, State) -> case ets:lookup(mqtt_client, ClientId) of [#mqtt_client{client_pid = Pid}] -> ets:delete(mqtt_client, ClientId); - [_] -> + [_] -> ignore; [] -> - lager:error("Cannot find clientId '~s' with ~p", [ClientId, Pid]) + ?LOG(error, "Cannot find registered: ~p", [Pid], State) end, {noreply, setstats(State)}; @@ -137,7 +139,8 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #state{id = Id}) -> - gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), ok. + gproc_pool:disconnect_worker(?CM_POOL, {?MODULE, Id}), + ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 478db721d..decb23d1f 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_pubsub). -author("Feng Lee "). @@ -48,8 +47,7 @@ publish/1]). %% Local node --export([dispatch/2, - match/1]). +-export([dispatch/2, match/1]). -behaviour(gen_server2). diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 12583fb5d..a1b4ab47a 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -44,7 +44,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_session). -author("Feng Lee "). @@ -53,16 +52,15 @@ -include("emqttd_protocol.hrl"). +-behaviour(gen_server2). + %% Session API --export([start_link/3, resume/3, destroy/2]). +-export([start_link/3, resume/3, info/1, destroy/2]). %% PubSub APIs --export([publish/2, - puback/2, pubrec/2, pubrel/2, pubcomp/2, +-export([publish/2, puback/2, pubrec/2, pubrel/2, pubcomp/2, subscribe/2, subscribe/3, unsubscribe/2]). --behaviour(gen_server2). - %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -135,6 +133,10 @@ -define(PUBSUB_TIMEOUT, 60000). +-define(LOG(Level, Format, Args, State), + lager:Level([{client, State#session.client_id}], + "Session(~s): " ++ Format, [State#session.client_id | Args])). + %%------------------------------------------------------------------------------ %% @doc Start a session. %% @end @@ -151,6 +153,13 @@ start_link(CleanSess, ClientId, ClientPid) -> resume(SessPid, ClientId, ClientPid) -> gen_server2:cast(SessPid, {resume, ClientId, ClientPid}). +%%------------------------------------------------------------------------------ +%% @doc Session Info. +%% @end +%%------------------------------------------------------------------------------ +info(SessPid) -> + gen_server2:call(SessPid, info). + %%------------------------------------------------------------------------------ %% @doc Destroy a session. %% @end @@ -171,7 +180,7 @@ subscribe(SessPid, TopicTable) -> subscribe(SessPid, PacketId, TopicTable) -> From = self(), AckFun = fun(GrantedQos) -> - From ! {suback, PacketId, GrantedQos} + From ! {suback, PacketId, GrantedQos} end, gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}). @@ -246,12 +255,15 @@ init([CleanSess, ClientId, ClientPid]) -> expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600, collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, - emqttd_sm:register_session(CleanSess, ClientId, info(Session)), + emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)), %% start statistics {ok, start_collector(Session), hibernate}. prioritise_call(Msg, _From, _Len, _State) -> - case Msg of _ -> 0 end. + case Msg of + info -> 10; + _ -> 0 + end. prioritise_cast(Msg, _Len, _State) -> case Msg of @@ -269,13 +281,16 @@ prioritise_cast(Msg, _Len, _State) -> prioritise_info(Msg, _Len, _State) -> case Msg of {'EXIT', _, _} -> 10; - session_expired -> 10; + expired -> 10; {timeout, _, _} -> 5; collect_info -> 2; {dispatch, _} -> 1; _ -> 0 end. +handle_call(info, _From, State) -> + {reply, sess_info(State), State}; + handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, Session = #session{client_id = ClientId, awaiting_rel = AwaitingRel, @@ -292,11 +307,11 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, end; handle_call(Req, _From, State) -> - lager:error("Unexpected Request: ~p", [Req]), - {reply, ok, State}. + ?LOG(critical, "Unexpected Request: ~p", [Req], State), + {reply, {error, unsupported_req}, State}. -handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{ - client_id = ClientId, subscriptions = Subscriptions}) -> +handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, + subscriptions = Subscriptions}) -> TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), @@ -312,18 +327,16 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{ emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), - lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p", - [ClientId, TopicTable, GrantedQos]), + ?LOG(info, "Subscribe ~p, Granted QoS: ~p", [TopicTable, GrantedQos], Session), Subscriptions1 = lists:foldl(fun({Topic, Qos}, Acc) -> case lists:keyfind(Topic, 1, Acc) of {Topic, Qos} -> - lager:warning([{client, ClientId}], "Session(~s): " - "resubscribe ~s, qos = ~w", [ClientId, Topic, Qos]), Acc; + ?LOG(warning, "resubscribe ~s, qos = ~w", [Topic, Qos], Session), + Acc; {Topic, OldQos} -> - lager:warning([{client, ClientId}], "Session(~s): " - "resubscribe ~s, old qos=~w, new qos=~w", [ClientId, Topic, OldQos, Qos]), + ?LOG(warning, "resubscribe ~s, old qos=~w, new qos=~w", [Topic, OldQos, Qos], Session), lists:keyreplace(Topic, 1, Acc, {Topic, Qos}); false -> %%TODO: the design is ugly, rewrite later...:( @@ -353,41 +366,31 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId, {Topic, _Qos} -> lists:keydelete(Topic, 1, Acc); false -> - lager:warning([{client, ClientId}], "Session(~s) not subscribe ~s", [ClientId, Topic]), Acc + Acc end end, Subscriptions, Topics), noreply(Session#session{subscriptions = Subscriptions1}); handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> - lager:warning([{client, ClientId}], "Session(~s) destroyed", [ClientId]), + ?LOG(warning, "destroyed", [], Session), {stop, {shutdown, destroy}, Session}; -handle_cast({resume, ClientId, ClientPid}, Session) -> +handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId, + client_pid = OldClientPid, + inflight_queue = InflightQ, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, + expired_timer = ETimer} = Session) -> - #session{client_id = ClientId, - client_pid = OldClientPid, - inflight_queue = InflightQ, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp, - expired_timer = ETimer} = Session, + ?LOG(info, "resumed by ~p", [ClientPid], Session), - lager:info([{client, ClientId}], "Session(~s) resumed by ~p", [ClientId, ClientPid]), - - %% cancel expired timer + %% Cancel expired timer cancel_timer(ETimer), - %% Kickout old client - if - OldClientPid == undefined -> - ok; - OldClientPid == ClientPid -> - ok; %% ?? - true -> - lager:error([{client, ClientId}], "Session(~s): ~p kickout ~p", - [ClientId, ClientPid, OldClientPid]), - unlink(OldClientPid), - OldClientPid ! {stop, duplicate_id, ClientPid} + case kick(ClientId, OldClientPid, ClientPid) of + ok -> ?LOG(warning, "~p kickout ~p", [ClientPid, OldClientPid], Session); + ignore -> ok end, true = link(ClientPid), @@ -416,19 +419,18 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> noreply(dequeue(Session2)); %% PUBACK -handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck}) -> +handle_cast({puback, PktId}, Session = #session{awaiting_ack = AwaitingAck}) -> case maps:find(PktId, AwaitingAck) of {ok, TRef} -> cancel_timer(TRef), noreply(dequeue(acked(PktId, Session))); error -> - lager:error([{client, ClientId}], "Session(~s) cannot find PUBACK ~w", [ClientId, PktId]), + ?LOG(error, "Cannot find PUBACK: ~p", [PktId], Session), noreply(Session) end; %% PUBREC -handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId, - awaiting_ack = AwaitingAck, +handle_cast({pubrec, PktId}, Session = #session{awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, await_rel_timeout = Timeout}) -> case maps:find(PktId, AwaitingAck) of @@ -439,37 +441,36 @@ handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId, Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}), noreply(dequeue(Session1)); error -> - lager:error([{client, ClientId}], "Session(~s) cannot find PUBREC ~w", [ClientId, PktId]), + ?LOG(error, "Cannot find PUBREC: ~p", [PktId], Session), noreply(Session) end; %% PUBREL -handle_cast({pubrel, PktId}, Session = #session{client_id = ClientId, - awaiting_rel = AwaitingRel}) -> +handle_cast({pubrel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:find(PktId, AwaitingRel) of {ok, {Msg, TRef}} -> cancel_timer(TRef), emqttd_pubsub:publish(Msg), noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); error -> - lager:error([{client, ClientId}], "Session(~s) cannot find PUBREL ~w", [ClientId, PktId]), + ?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session), noreply(Session) end; %% PUBCOMP -handle_cast({pubcomp, PktId}, Session = #session{client_id = ClientId, awaiting_comp = AwaitingComp}) -> +handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp}) -> case maps:find(PktId, AwaitingComp) of {ok, TRef} -> cancel_timer(TRef), noreply(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}); error -> - lager:error("Session(~s) cannot find PUBCOMP ~w", [ClientId, PktId]), + ?LOG(error, "Cannot find PUBCOMP: ~p", [PktId], Session), noreply(Session) end; handle_cast(Msg, State) -> - lager:error("Unexpected Msg: ~p, State: ~p", [Msg, State]), - {noreply, State}. + ?LOG(critical, "Unexpected Msg: ~p", [Msg], State), + noreply(State). %% Queue messages when client is offline handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, @@ -483,14 +484,15 @@ handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, ClientPid ! {deliver, Msg}, noreply(Session); -handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, Session = #session{message_queue = MsgQ}) +handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, + Session = #session{message_queue = MsgQ}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case check_inflight(Session) of - true -> - {noreply, deliver(Msg, Session)}; + true -> + noreply(deliver(Msg, Session)); false -> - {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}} + noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) end; handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, @@ -498,81 +500,70 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = unde %% just remove awaiting noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}); -handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, - inflight_queue = InflightQ, +handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ, awaiting_ack = AwaitingAck}) -> - lager:info("Awaiting Ack Timeout: ~p:", [PktId]), case maps:find(PktId, AwaitingAck) of {ok, _TRef} -> case lists:keyfind(PktId, 1, InflightQ) of {_, Msg} -> noreply(redeliver(Msg, Session)); false -> - lager:error([{client, ClientId}], "Session(~s):" - "Awaiting timeout but Cannot find PktId :~p", [ClientId, PktId]), + ?LOG(error, "AwaitingAck timeout but Cannot find PktId: ~p", [PktId], Session), noreply(dequeue(Session)) end; error -> - lager:error([{client, ClientId}], "Session(~s):" - "Cannot find Awaiting Ack:~p", [ClientId, PktId]), + ?LOG(error, "Cannot find AwaitingAck: ~p", [PktId], Session), noreply(Session) end; -handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId, - awaiting_rel = AwaitingRel}) -> +handle_info({timeout, awaiting_rel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:find(PktId, AwaitingRel) of - {ok, {Msg, _TRef}} -> - lager:error([{client, ClientId}], "Session(~s) AwaitingRel Timout!~n" - "Drop Message:~p", [ClientId, Msg]), + {ok, {_Msg, _TRef}} -> + ?LOG(error, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session), noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}); error -> - lager:error([{client, ClientId}], "Session(~s) cannot find AwaitingRel ~w", [ClientId, PktId]), - {noreply, Session, hibernate} + ?LOG(error, "Cannot find AwaitingRel: ~p", [PktId], Session), + noreply(Session) end; -handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = ClientId, - awaiting_comp = Awaiting}) -> +handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = Awaiting}) -> case maps:find(PktId, Awaiting) of {ok, _TRef} -> - lager:error([{client, ClientId}], "Session(~s) " - "Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]), + ?LOG(error, "Awaiting PUBCOMP Timout: ~p", [PktId], Session), noreply(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}); error -> - lager:error([{client, ClientId}], "Session(~s) " - "Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]), + ?LOG(error, "Cannot find Awaiting PUBCOMP: ~p", [PktId], Session), noreply(Session) end; handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) -> - emqttd_sm:register_session(CleanSess, ClientId, info(Session)), - {noreply, start_collector(Session), hibernate}; + emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)), + noreply(start_collector(Session)); handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, client_pid = ClientPid}) -> {stop, normal, Session}; handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, - client_id = ClientId, client_pid = ClientPid, expired_after = Expires}) -> - lager:info("Session(~s): unlink with client ~p: reason=~p", - [ClientId, ClientPid, Reason]), - TRef = timer(Expires, session_expired), + ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], Session), + TRef = timer(Expires, expired), + erlang:garbage_collect(), %%TODO: ??? noreply(Session#session{client_pid = undefined, expired_timer = TRef}); -handle_info({'EXIT', Pid, Reason}, Session = #session{client_id = ClientId, - client_pid = ClientPid}) -> +handle_info({'EXIT', Pid, Reason}, Session = #session{client_pid = ClientPid}) -> - lager:error("Session(~s): Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", - [ClientId, ClientPid, Pid, Reason]), + ?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", + [ClientPid, Pid, Reason], Session), noreply(Session); -handle_info(session_expired, Session = #session{client_id = ClientId}) -> - lager:error("Session(~s) expired, shutdown now.", [ClientId]), +handle_info(expired, Session) -> + ?LOG(info, "expired, shutdown now.", [], Session), {stop, {shutdown, expired}, Session}; -handle_info(Info, Session = #session{client_id = ClientId}) -> - lager:error("Session(~s) unexpected info: ~p", [ClientId, Info]), +handle_info(Info, Session) -> + ?LOG(critical, "Unexpected info: ~p", [Info], Session), {noreply, Session}. terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> @@ -585,6 +576,17 @@ code_change(_OldVsn, Session, _Extra) -> %%% Internal functions %%%============================================================================= +%%------------------------------------------------------------------------------ +%% Kick old client out +%%------------------------------------------------------------------------------ +kick(_ClientId, undefined, _Pid) -> + ignore; +kick(_ClientId, Pid, Pid) -> + ignore; +kick(ClientId, OldPid, Pid) -> + unlink(OldPid), + OldPid ! {shutdown, conflict, {ClientId, Pid}}. + %%------------------------------------------------------------------------------ %% Check inflight and awaiting_rel %%------------------------------------------------------------------------------ @@ -658,7 +660,7 @@ acked(PktId, Session = #session{client_id = ClientId, {_, Msg} -> emqttd_broker:foreach_hooks('message.acked', [ClientId, Msg]); false -> - lager:error("Session(~s): Cannot find acked message: ~p", [PktId]) + ?LOG(error, "Cannot find acked pktid: ~p", [PktId], Session) end, Session#session{awaiting_ack = maps:remove(PktId, Awaiting), inflight_queue = lists:keydelete(PktId, 1, InflightQ)}. @@ -687,15 +689,15 @@ start_collector(Session = #session{collect_interval = Interval}) -> TRef = erlang:send_after(timer:seconds(Interval), self(), collect_info), Session#session{collect_timer = TRef}. -info(#session{clean_sess = CleanSess, - subscriptions = Subscriptions, - inflight_queue = InflightQueue, - max_inflight = MaxInflight, - message_queue = MessageQueue, - awaiting_rel = AwaitingRel, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp, - timestamp = CreatedAt}) -> +sess_info(#session{clean_sess = CleanSess, + subscriptions = Subscriptions, + inflight_queue = InflightQueue, + max_inflight = MaxInflight, + message_queue = MessageQueue, + awaiting_rel = AwaitingRel, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, + timestamp = CreatedAt}) -> Stats = emqttd_mqueue:stats(MessageQueue), [{clean_sess, CleanSess}, {subscriptions, Subscriptions}, diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 719ac0ca1..d62e85c86 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -57,7 +57,10 @@ -define(SM_POOL, ?MODULE). --define(SESSION_TIMEOUT, 60000). +-define(CALL_TIMEOUT, 60000). + +-define(LOG(Level, Format, Args, Session), + lager:Level("SM(~s): " ++ Format, [Session#mqtt_session.client_id | Args])). %%%============================================================================= %%% Mnesia callbacks @@ -113,7 +116,7 @@ start_session(CleanSess, ClientId) -> lookup_session(ClientId) -> case mnesia:dirty_read(session, ClientId) of [Session] -> Session; - [] -> undefined + [] -> undefined end. %%------------------------------------------------------------------------------ @@ -137,13 +140,11 @@ register_session(CleanSess, ClientId, Info) -> unregister_session(CleanSess, ClientId) -> ets:delete(sesstab(CleanSess), {ClientId, self()}). -sesstab(true) -> - mqtt_transient_session; -sesstab(false) -> - mqtt_persistent_session. +sesstab(true) -> mqtt_transient_session; +sesstab(false) -> mqtt_persistent_session. call(SM, Req) -> - gen_server2:call(SM, Req, ?SESSION_TIMEOUT). %%infinity). + gen_server2:call(SM, Req, ?CALL_TIMEOUT). %%infinity). %%%============================================================================= %%% gen_server callbacks @@ -223,8 +224,8 @@ create_session(CleanSess, ClientId, ClientPid) -> case insert_session(Session) of {aborted, {conflict, ConflictPid}} -> %% Conflict with othe node? - lager:error("Session(~s): Conflict with ~p!", [ClientId, ConflictPid]), - {error, conflict}; + lager:error("SM(~s): Conflict with ~p", [ClientId, ConflictPid]), + {error, mnesia_conflict}; {atomic, ok} -> erlang:monitor(process, SessPid), {ok, SessPid} @@ -245,8 +246,8 @@ insert_session(Session = #mqtt_session{client_id = ClientId}) -> end). %% Local node -resume_session(#mqtt_session{client_id = ClientId, - sess_pid = SessPid}, ClientPid) +resume_session(Session = #mqtt_session{client_id = ClientId, + sess_pid = SessPid}, ClientPid) when node(SessPid) =:= node() -> case is_process_alive(SessPid) of @@ -254,7 +255,7 @@ resume_session(#mqtt_session{client_id = ClientId, emqttd_session:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; false -> - lager:error("Session(~s): Cannot resume ~p, it seems already dead!", [ClientId, SessPid]), + ?LOG(error, "Cannot resume ~p which seems already dead!", [SessPid], Session), {error, session_died} end; @@ -265,12 +266,11 @@ resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid} ok -> {ok, SessPid}; {badrpc, nodedown} -> - lager:error("Session(~s): Died for node ~s down!", [ClientId, Node]), + ?LOG(error, "Session died for node '~s' down", [Node], Session), remove_session(Session), {error, session_nodedown}; {badrpc, Reason} -> - lager:error("Session(~s): Failed to resume from node ~s for ~p", - [ClientId, Node, Reason]), + ?LOG(error, "Failed to resume from node ~s for ~p", [Node, Reason], Session), {error, Reason} end. @@ -288,11 +288,11 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, ok -> remove_session(Session); {badrpc, nodedown} -> - lager:error("Session(~s): Died for node ~s down!", [ClientId, Node]), + ?LOG(error, "Node '~s' down", [Node], Session), remove_session(Session); {badrpc, Reason} -> - lager:error("Session(~s): Failed to destory ~p on remote node ~p for ~s", - [ClientId, SessPid, Node, Reason]), + ?LOG(error, "Failed to destory ~p on remote node ~p for ~s", + [SessPid, Node, Reason], Session), {error, Reason} end.