Merge pull request #371 from emqtt/0.13

fix issue #357 - Cannot kick transient client out when clientId colli…
This commit is contained in:
Feng Lee 2015-11-06 21:31:12 +08:00
commit bdea67cf2c
2 changed files with 56 additions and 15 deletions

View File

@ -41,6 +41,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% gen_server2 priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
-record(state, {id, statsfun}). -record(state, {id, statsfun}).
-define(CM_POOL, ?MODULE). -define(CM_POOL, ?MODULE).
@ -101,8 +104,21 @@ init([Id, StatsFun]) ->
gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}),
{ok, #state{id = Id, statsfun = StatsFun}}. {ok, #state{id = Id, statsfun = StatsFun}}.
prioritise_call(_Req, _From, _Len, _State) ->
1.
prioritise_cast(Msg, _Len, _State) ->
case Msg of
{register, _Client} -> 2;
{unregister, _ClientId, _Pid} -> 3;
_ -> 1
end.
prioritise_info(_Msg, _Len, _State) ->
1.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:error("unexpected request: ~p", [Req]), lager:error("Unexpected request: ~p", [Req]),
{reply, {error, unsupported_req}, State}. {reply, {error, unsupported_req}, State}.
handle_cast({register, Client = #mqtt_client{client_id = ClientId, handle_cast({register, Client = #mqtt_client{client_id = ClientId,
@ -110,32 +126,45 @@ handle_cast({register, Client = #mqtt_client{client_id = ClientId,
case ets:lookup(mqtt_client, ClientId) of case ets:lookup(mqtt_client, ClientId) of
[#mqtt_client{client_pid = Pid}] -> [#mqtt_client{client_pid = Pid}] ->
ignore; ignore;
[#mqtt_client{client_pid = OldPid}] -> [#mqtt_client{client_pid = _OldPid, client_mon = MRef}] ->
%% TODO: should cancel monitor %% demonitor
?LOG(warning, "client ~p conflict with ~p", [Pid, OldPid], Client); erlang:demonitor(MRef, [flush]);
[] -> [] ->
ok ok
end, end,
ets:insert(mqtt_client, Client), ets:insert(mqtt_client, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}),
{noreply, setstats(State)}; {noreply, setstats(State)};
handle_cast({unregister, ClientId, Pid}, State) -> handle_cast({unregister, ClientId, Pid}, State) ->
case ets:lookup(mqtt_client, ClientId) of case ets:lookup(mqtt_client, ClientId) of
[#mqtt_client{client_pid = Pid}] -> [#mqtt_client{client_pid = Pid, client_mon = MRef}] ->
ets:delete(mqtt_client, ClientId); erlang:demonitor(MRef, [flush]),
[_] -> ets:delete(mqtt_client, ClientId),
ignore;
[] ->
lager:warning("CM(~s): Cannot find registered pid ~p", [ClientId, Pid])
end,
{noreply, setstats(State)}; {noreply, setstats(State)};
[_] ->
{noreply, State};
[] ->
lager:warning("CM(~s): Cannot find pid ~p", [ClientId, Pid]),
{noreply, State}
end;
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("Unexpected Msg: ~p", [Msg]), lager:error("Unexpected Msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, Reason}, State) ->
MP = #mqtt_client{client_pid = DownPid, client_mon = MRef, _ = '_'},
case ets:match_object(mqtt_client, MP) of
[Client] ->
?LOG(warning, "client ~p DOWN for ~p", [DownPid, Reason], Client),
ets:delete_object(mqtt_client, Client);
[] ->
ignore
end,
{noreply, setstats(State)};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("Unexpected Msg: ~p", [Info]), lager:error("Unexpected Info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{id = Id}) -> terminate(_Reason, #state{id = Id}) ->

View File

@ -378,6 +378,7 @@ handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId, handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId,
client_pid = OldClientPid, client_pid = OldClientPid,
clean_sess = CleanSess,
inflight_queue = InflightQ, inflight_queue = InflightQ,
awaiting_ack = AwaitingAck, awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp, awaiting_comp = AwaitingComp,
@ -405,10 +406,21 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = C
[cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)], [cancel_timer(TRef) || TRef <- maps:values(AwaitingComp)],
Session1 = Session#session{client_pid = ClientPid, Session1 = Session#session{client_pid = ClientPid,
clean_sess = false,
awaiting_ack = #{}, awaiting_ack = #{},
awaiting_comp = #{}, awaiting_comp = #{},
expired_timer = undefined}, expired_timer = undefined},
%% CleanSess: true -> false?
if
CleanSess =:= true ->
?LOG(warning, "CleanSess changed to false.", [], Session),
emqttd_sm:unregister_session(CleanSess, ClientId),
emqttd_sm:register_session(false, ClientId, sess_info(Session1));
CleanSess =:= false ->
ok
end,
%% Redeliver inflight messages %% Redeliver inflight messages
Session2 = Session2 =
lists:foldl(fun({_Id, Msg}, Sess) -> lists:foldl(fun({_Id, Msg}, Sess) ->