diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 1347e1815..b3bfb242b 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -41,6 +41,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_server2 priorities +-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). + -record(state, {id, statsfun}). -define(CM_POOL, ?MODULE). @@ -101,8 +104,21 @@ init([Id, StatsFun]) -> gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), {ok, #state{id = Id, statsfun = StatsFun}}. +prioritise_call(_Req, _From, _Len, _State) -> + 1. + +prioritise_cast(Msg, _Len, _State) -> + case Msg of + {register, _Client} -> 2; + {unregister, _ClientId, _Pid} -> 3; + _ -> 1 + end. + +prioritise_info(_Msg, _Len, _State) -> + 1. + handle_call(Req, _From, State) -> - lager:error("unexpected request: ~p", [Req]), + lager:error("Unexpected request: ~p", [Req]), {reply, {error, unsupported_req}, State}. handle_cast({register, Client = #mqtt_client{client_id = ClientId, @@ -110,32 +126,45 @@ handle_cast({register, Client = #mqtt_client{client_id = ClientId, case ets:lookup(mqtt_client, ClientId) of [#mqtt_client{client_pid = Pid}] -> ignore; - [#mqtt_client{client_pid = OldPid}] -> - %% TODO: should cancel monitor - ?LOG(warning, "client ~p conflict with ~p", [Pid, OldPid], Client); - [] -> + [#mqtt_client{client_pid = _OldPid, client_mon = MRef}] -> + %% demonitor + erlang:demonitor(MRef, [flush]); + [] -> ok end, - ets:insert(mqtt_client, Client), + ets:insert(mqtt_client, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}), {noreply, setstats(State)}; handle_cast({unregister, ClientId, Pid}, State) -> case ets:lookup(mqtt_client, ClientId) of - [#mqtt_client{client_pid = Pid}] -> - ets:delete(mqtt_client, ClientId); - [_] -> - ignore; - [] -> - lager:warning("CM(~s): Cannot find registered pid ~p", [ClientId, Pid]) - end, - {noreply, setstats(State)}; + [#mqtt_client{client_pid = Pid, client_mon = MRef}] -> + erlang:demonitor(MRef, [flush]), + ets:delete(mqtt_client, ClientId), + {noreply, setstats(State)}; + [_] -> + {noreply, State}; + [] -> + lager:warning("CM(~s): Cannot find pid ~p", [ClientId, Pid]), + {noreply, State} + end; handle_cast(Msg, State) -> lager:error("Unexpected Msg: ~p", [Msg]), {noreply, State}. +handle_info({'DOWN', MRef, process, DownPid, Reason}, State) -> + MP = #mqtt_client{client_pid = DownPid, client_mon = MRef, _ = '_'}, + case ets:match_object(mqtt_client, MP) of + [Client] -> + ?LOG(warning, "client ~p DOWN for ~p", [DownPid, Reason], Client), + ets:delete_object(mqtt_client, Client); + [] -> + ignore + end, + {noreply, setstats(State)}; + handle_info(Info, State) -> - lager:error("Unexpected Msg: ~p", [Info]), + lager:error("Unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{id = Id}) -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index f06037238..98c6df3b8 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -378,6 +378,7 @@ handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId, client_pid = OldClientPid, + clean_sess = CleanSess, inflight_queue = InflightQ, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, @@ -405,10 +406,21 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = C [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)], Session1 = Session#session{client_pid = ClientPid, + clean_sess = false, awaiting_ack = #{}, awaiting_comp = #{}, expired_timer = undefined}, + %% CleanSess: true -> false? + if + CleanSess =:= true -> + ?LOG(warning, "CleanSess changed to false.", [], Session), + emqttd_sm:unregister_session(CleanSess, ClientId), + emqttd_sm:register_session(false, ClientId, sess_info(Session1)); + CleanSess =:= false -> + ok + end, + %% Redeliver inflight messages Session2 = lists:foldl(fun({_Id, Msg}, Sess) ->