This commit is contained in:
Feng 2015-11-11 09:46:23 +08:00
parent 4d2e2168d1
commit 4fb018203e
3 changed files with 58 additions and 40 deletions

View File

@ -91,7 +91,6 @@
-record(mqtt_client, { -record(mqtt_client, {
client_id :: binary() | undefined, client_id :: binary() | undefined,
client_pid :: pid(), client_pid :: pid(),
client_mon :: reference(),
username :: binary() | undefined, username :: binary() | undefined,
peername :: {inet:ip_address(), integer()}, peername :: {inet:ip_address(), integer()},
clean_sess :: boolean(), clean_sess :: boolean(),

View File

@ -33,7 +33,7 @@
%% API Exports %% API Exports
-export([start_link/2, pool/0]). -export([start_link/2, pool/0]).
-export([lookup/1, register/1, unregister/1]). -export([lookup/1, lookup_proc/1, register/1, unregister/1]).
-behaviour(gen_server2). -behaviour(gen_server2).
@ -44,7 +44,7 @@
%% gen_server2 priorities %% gen_server2 priorities
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]). -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3]).
-record(state, {id, statsfun}). -record(state, {id, statsfun, monitors}).
-define(CM_POOL, ?MODULE). -define(CM_POOL, ?MODULE).
@ -68,7 +68,7 @@ start_link(Id, StatsFun) ->
pool() -> ?CM_POOL. pool() -> ?CM_POOL.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Lookup client pid with clientId %% @doc Lookup client by clientId
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec lookup(ClientId :: binary()) -> mqtt_client() | undefined. -spec lookup(ClientId :: binary()) -> mqtt_client() | undefined.
@ -78,6 +78,18 @@ lookup(ClientId) when is_binary(ClientId) ->
[] -> undefined [] -> undefined
end. end.
%%------------------------------------------------------------------------------
%% @doc Lookup client pid by clientId
%% @end
%%------------------------------------------------------------------------------
-spec lookup_proc(ClientId :: binary()) -> pid() | undefined.
lookup_proc(ClientId) when is_binary(ClientId) ->
try ets:lookup_element(mqtt_client, ClientId, #mqtt_client.client_pid) of
Pid -> Pid
catch
error:badarg -> undefined
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Register clientId with pid. %% @doc Register clientId with pid.
%% @end %% @end
@ -102,7 +114,7 @@ unregister(ClientId) when is_binary(ClientId) ->
init([Id, StatsFun]) -> init([Id, StatsFun]) ->
gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}), gproc_pool:connect_worker(?CM_POOL, {?MODULE, Id}),
{ok, #state{id = Id, statsfun = StatsFun}}. {ok, #state{id = Id, statsfun = StatsFun, monitors = dict:new()}}.
prioritise_call(_Req, _From, _Len, _State) -> prioritise_call(_Req, _From, _Len, _State) ->
1. 1.
@ -110,7 +122,7 @@ prioritise_call(_Req, _From, _Len, _State) ->
prioritise_cast(Msg, _Len, _State) -> prioritise_cast(Msg, _Len, _State) ->
case Msg of case Msg of
{register, _Client} -> 2; {register, _Client} -> 2;
{unregister, _ClientId, _Pid} -> 3; {unregister, _ClientId, _Pid} -> 9;
_ -> 1 _ -> 1
end. end.
@ -123,28 +135,20 @@ handle_call(Req, _From, State) ->
handle_cast({register, Client = #mqtt_client{client_id = ClientId, handle_cast({register, Client = #mqtt_client{client_id = ClientId,
client_pid = Pid}}, State) -> client_pid = Pid}}, State) ->
case ets:lookup(mqtt_client, ClientId) of case lookup_proc(ClientId) of
[#mqtt_client{client_pid = Pid}] -> Pid ->
ignore; {noreply, State};
[#mqtt_client{client_pid = _OldPid, client_mon = MRef}] -> _None ->
%% demonitor ets:insert(mqtt_client, Client),
erlang:demonitor(MRef, [flush]); {noreply, setstats(monitor_client(ClientId, Pid, State))}
[] -> end;
ok
end,
ets:insert(mqtt_client, Client#mqtt_client{client_mon = erlang:monitor(process, Pid)}),
{noreply, setstats(State)};
handle_cast({unregister, ClientId, Pid}, State) -> handle_cast({unregister, ClientId, Pid}, State) ->
case ets:lookup(mqtt_client, ClientId) of case lookup_proc(ClientId) of
[#mqtt_client{client_pid = Pid, client_mon = MRef}] -> Pid ->
erlang:demonitor(MRef, [flush]),
ets:delete(mqtt_client, ClientId), ets:delete(mqtt_client, ClientId),
{noreply, setstats(State)}; {noreply, setstats(State)};
[_] -> undefined ->
{noreply, State};
[] ->
lager:warning("CM(~s): Cannot find pid ~p", [ClientId, Pid]),
{noreply, State} {noreply, State}
end; end;
@ -152,16 +156,20 @@ handle_cast(Msg, State) ->
lager:error("Unexpected Msg: ~p", [Msg]), lager:error("Unexpected Msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, Reason}, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State = #state{monitors = MonDict}) ->
MP = #mqtt_client{client_pid = DownPid, client_mon = MRef, _ = '_'}, case dict:find(MRef, MonDict) of
case ets:match_object(mqtt_client, MP) of {ok, {ClientId, DownPid}} ->
[Client] -> case lookup_proc(ClientId) of
?LOG(warning, "client ~p DOWN for ~p", [DownPid, Reason], Client), DownPid ->
ets:delete_object(mqtt_client, Client); ets:delete(mqtt_client, ClientId);
[] -> _ ->
ignore ignore
end, end,
{noreply, setstats(State)}; {noreply, setstats(erase_monitor(MRef, State))};
error ->
lager:error("MRef of client ~p not found", [DownPid]),
{noreply, State}
end;
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("Unexpected Info: ~p", [Info]), lager:error("Unexpected Info: ~p", [Info]),
@ -178,6 +186,13 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
monitor_client(ClientId, Pid, State = #state{monitors = Monintors}) ->
MRef = erlang:monitor(process, Pid),
State#state{monitors = dict:store(MRef, {ClientId, Pid}, Monintors)}.
erase_monitor(MRef, State = #state{monitors = Monintors}) ->
State#state{monitors = dict:erase(MRef, Monintors)}.
setstats(State = #state{statsfun = StatsFun}) -> setstats(State = #state{statsfun = StatsFun}) ->
StatsFun(ets:info(mqtt_client, size)), State. StatsFun(ets:info(mqtt_client, size)), State.

View File

@ -282,14 +282,18 @@ redeliver({?PUBREL, PacketId}, State) ->
shutdown(_Error, #proto_state{client_id = undefined}) -> shutdown(_Error, #proto_state{client_id = undefined}) ->
ignore; ignore;
shutdown(conflict, #proto_state{client_id = ClientId}) -> shutdown(conflict, #proto_state{client_id = _ClientId}) ->
emqttd_cm:unregister(ClientId); %% let it down
%% emqttd_cm:unregister(ClientId);
ignore;
shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) -> shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) ->
?LOG(info, "Shutdown for ~p", [Error], State), ?LOG(info, "Shutdown for ~p", [Error], State),
send_willmsg(ClientId, WillMsg), send_willmsg(ClientId, WillMsg),
emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]), emqttd_broker:foreach_hooks('client.disconnected', [Error, ClientId]),
emqttd_cm:unregister(ClientId). %% let it down
%% emqttd_cm:unregister(ClientId).
ok.
willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
emqttd_message:from_packet(Packet). emqttd_message:from_packet(Packet).