This commit is contained in:
Feng Lee 2015-01-05 13:04:53 +08:00
parent ec96c155bb
commit 5beb38cd68
5 changed files with 70 additions and 25 deletions

View File

@ -25,8 +25,6 @@
-define(MQTT_PROTO_MAJOR, 3).
-define(MQTT_PROTO_MINOR, 1).
-define(CLIENT_ID_MAXLEN, 23).
%% frame types
-define(CONNECT, 1).

View File

@ -29,7 +29,7 @@
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, false}
{nodelay, true}
]).
listen(Listeners) when is_list(Listeners) ->

View File

@ -173,9 +173,9 @@ handle_info(Info, State) ->
?ERROR("badinfo :~p",[Info]),
{stop, {badinfo, Info}, State}.
terminate(_Reason, #state{keep_alive=KeepAlive}) ->
terminate(_Reason, #state{client_id = ClientId, keep_alive=KeepAlive}) ->
emqtt_keep_alive:cancel(KeepAlive),
emqtt_cm:destroy(self()),
emqtt_cm:destroy(ClientId, self()),
ok.
code_change(_OldVsn, State, _Extra) ->

View File

@ -29,15 +29,15 @@
-define(SERVER, ?MODULE).
-define(TAB, emqtt_client).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start_link/0]).
-export([create/2,
destroy/1,
lookup/1]).
-export([lookup/1, create/2, destroy/2]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
@ -59,28 +59,17 @@ start_link() ->
-spec lookup(ClientId :: binary()) -> pid() | undefined.
lookup(ClientId) ->
case ets:lookup(emqtt_client, ClientId) of
[{_, Pid}] -> Pid;
[{_, Pid, _}] -> Pid;
[] -> undefined
end.
-spec create(ClientId :: binary(), Pid :: pid()) -> ok.
create(ClientId, Pid) ->
case lookup(ClientId) of
Pid ->
ignore;
OldPid when is_pid(OldPid) ->
OldPid ! {stop, duplicate_id},
ets:insert(emqtt_client, {ClientId, Pid});
undefined ->
ets:insert(emqtt_client, {ClientId, Pid})
end.
gen_server:call(?SERVER, {create, ClientId, Pid}).
-spec destroy(binary() | pid()) -> ok.
destroy(ClientId) when is_binary(ClientId) ->
ets:delete(emqtt_client, ClientId);
destroy(Pid) when is_pid(Pid) ->
ets:match_delete(emqtt_client, {{'_', Pid}}).
-spec destroy(ClientId :: binary(), Pid :: pid()) -> ok.
destroy(ClientId, Pid) when is_binary(ClientId) ->
gen_server:cast(?SERVER, {destroy, ClientId, Pid});
%% ------------------------------------------------------------------
%% gen_server Function Definitions
@ -88,15 +77,45 @@ destroy(Pid) when is_pid(Pid) ->
init(Args) ->
%on one node
ets:new(emqtt_client, [named_table, public]),
ets:new(?TAB, [set, named_table, protected]),
{ok, Args}.
handle_call({create, ClientId, Pid}, _From, State) ->
case ets:lookup(?TAB, ClientId) of
[{_, Pid, _}] ->
?ERROR("client '~s' has been registered with ~p", [ClientId, Pid]),
ignore;
[{_, OldPid, MRef}] ->
OldPid ! {stop, duplicate_id},
erlang:demonitor(MRef),
ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)});
[] ->
ets:insert(emqtt_client, {ClientId, Pid, erlang:monitor(process, Pid)})
end.
{reply, ok, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({destroy, ClientId, Pid}, State) when is_binary(ClientId) ->
case ets:lookup(?TAB, ClientId) of
[{_, Pid, MRef}] ->
erlang:demonitor(MRef),
ets:delete(?TAB, ClientId);
[_] ->
ignore;
[] ->
?ERROR("cannot find client '~s' with ~p", [ClientId, Pid])
end
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(emqtt_client, {{'_', DownPid, MRef}}),
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.

View File

@ -0,0 +1,28 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2014, Feng Lee <feng@slimchat.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_router).
%%Router Chain-->
%%--->In
%%Out<---