This commit is contained in:
Ery Lee 2015-01-13 16:45:53 +08:00
parent cf37428c91
commit 0301644793
4 changed files with 153 additions and 27 deletions

View File

@ -138,10 +138,8 @@ handle_packet(?CONNECT, Packet = #mqtt_packet {
send_packet( #mqtt_packet { send_packet( #mqtt_packet {
header = #mqtt_packet_header { type = ?CONNACK }, header = #mqtt_packet_header { type = ?CONNACK },
variable = #mqtt_packet_connack{ return_code = ReturnCode1 }}, State1 ), variable = #mqtt_packet_connack{ return_code = ReturnCode1 }}, State1 ),
%% %%Starting session
{ok, Session} = emqtt_session:start({CleanSess, ClientId, self()}), {ok, Session} = emqtt_session:start({CleanSess, ClientId, self()}),
emqtt_session:resume(Session),
%%TODO: Resume session
{ok, State1#proto_state { session = Session }}; {ok, State1#proto_state { session = Session }};
handle_packet(?PUBLISH, Packet = #mqtt_packet { handle_packet(?PUBLISH, Packet = #mqtt_packet {

View File

@ -31,6 +31,8 @@
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
-export([start/1, resume/1, publish/2, puback/2]). -export([start/1, resume/1, publish/2, puback/2]).
%%start gen_server
-export([start_link/3]).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% gen_server Function Exports %% gen_server Function Exports
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
@ -45,22 +47,30 @@
subscriptions = [], subscriptions = [],
messages = [], %% do not receive rel messages = [], %% do not receive rel
awaiting_ack, awaiting_ack,
awaiting_rel }). awaiting_rel,
expires,
max_queue }).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% API Function Definitions %% Start Session
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
start({true = CleanSess, ClientId, ClientPid}) -> start({true = CleanSess, ClientId, _ClientPid}) ->
%%destroy old session %%Destroy old session if CleanSess is true before.
%%TODO: emqtt_sm:destory_session(ClientId), ok = emqtt_sm:destory_session(ClientId),
{ok, initial_state(ClientId)}; {ok, initial_state(ClientId)};
start({false = CleanSess, ClientId, ClientPid}) -> start({false = CleanSess, ClientId, ClientPid}) ->
%%TODO: emqtt_sm:start_session({ClientId, ClientPid}) {ok, SessPid} = emqtt_sm:start_session(ClientId, ClientPid),
gen_server:start_link(?MODULE, [ClientId, ClientPid], []). {ok, SessPid}.
resume(#session_state {}) -> 'TODO'; %% ------------------------------------------------------------------
resume(SessPid) when is_pid(SessPid) -> 'TODO'. %% Session API
%% ------------------------------------------------------------------
resume(SessState = #session_state{}, _ClientPid) ->
SessState;
resume(SessPid, ClientPid) when is_pid(SessPid) ->
gen_server:cast(SessPid, {resume, ClientPid}),
SessPid.
publish(_, {?QOS_0, Message}) -> publish(_, {?QOS_0, Message}) ->
emqtt_router:route(Message); emqtt_router:route(Message);
@ -113,10 +123,16 @@ initial_state(ClientId, ClientPid) ->
%% gen_server Function Definitions %% gen_server Function Definitions
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
init([ClientId, ClientPid]) -> start_link(SessOpts, ClientId, ClientPid) ->
gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []).
init([SessOpts, ClientId, ClientPid]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
%%TODO: OK?
true = link(ClientPid),
State = initial_state(ClientId, ClientPid), State = initial_state(ClientId, ClientPid),
{ok, State}. {ok, State#state{ expires = proplists:get_value(expires, SessOpts, 24) * 3600,
max_queue = proplists:get_value(max_queue, SessOpts, 1000) } }.
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.

View File

@ -0,0 +1,57 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_session_sup).
-author('feng@emqtt.io').
-behavior(supervisor).
-export([start_link/0, start_session/2]).
-export([init/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start_link/1 :: (list(tuple())) -> {ok, pid()}).
-spec(start_session/2 :: (binary(), pid()) -> {ok, pid()}).
-endif.
%%----------------------------------------------------------------------------
start_link(SessOpts) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [SessOpts]).
start_session(ClientId, ClientPid) ->
supervisor:start_child(?MODULE, [ClientId, ClientPid]).
init([SessOpts]) ->
{ok, {{simple_one_for_one, 0, 1},
[{session, {emqtt_session, start_link, [SessOpts]},
transient, 10000, worker, [emqtt_session]}]}}.

View File

@ -47,13 +47,15 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(TABLE, emqtt_session).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% API Function Exports %% API Function Exports
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
-export([start_link/1]). -export([start_link/0]).
-export([lookup/1, register/2, resume/2, destroy/1]). -export([lookup_session/1, start_session/2, destroy_session/1]).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% gen_server Function Exports %% gen_server Function Exports
@ -62,34 +64,82 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, { expires = 24, %hours
max_queue = 1000 }).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start_link/0 :: () -> {ok, pid()}).
-spec(lookup_session/1 :: (binary()) -> pid() | undefined).
-spec(start_session/2 :: (binary(), pid()) -> {ok, pid()} | {error, any()}).
-spec(destroy_session/1 :: (binary()) -> ok).
-endif.
%%----------------------------------------------------------------------------
-record(state, {}).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% API Function Definitions %% API Function Definitions
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
start_link(SessOpts) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [SessOpts], []).
lookup(ClientId) -> ok. start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
register(ClientId, Pid) -> ok. lookup_session(ClientId) ->
case ets:lookup(?TABLE, ClientId) of
[{_, SessPid, _}] -> SessPid;
[] -> undefined
end.
resume(ClientId, Pid) -> ok. start_session(ClientId, ClientPid) ->
gen_server:call(?SERVER, {start_session, ClientId, ClientPid}).
destroy(ClientId) -> ok. destory_session(ClientId) ->
gen_server:call(?SERVER, {destory_session, ClientId}).
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
%% gen_server Function Definitions %% gen_server Function Definitions
%% ------------------------------------------------------------------ %% ------------------------------------------------------------------
init(SessOpts) -> init() ->
State = #state{ expires = proplists:get_value(expires, SessOpts, 24) * 3600, process_flag(trap_exit, true),
max_queue = proplists:get_value(max_queue, SessOpts, 1000) }, ets:new(?TABLE, [set, protected, named_table]),
{ok, State}. {ok, State}.
handle_call({start_session, ClientId, ClientPid}, _From, State) ->
Reply =
case ets:lookup(?TABLE, ClientId) of
[{_, SessPid, MRef}] ->
emqtt_session:resume(SessPid, ClientPid),
{ok, SessPid};
[] ->
case emqtt_session_sup:start_session(ClientId, ClientPid) of
{ok, SessPid} ->
MRef = erlang:monitor(process, SessPid),
ets:insert(?TABLE, {ClientId, SessPid, MRef}),
{ok, SessPid};
{error, Error} ->
{error, Error}
end
end,
{reply, Reply, State};
handle_call({destory_session, ClientId}, _From, State) ->
case ets:lookup(?TABLE, ClientId) of
[{_, SessPid, MRef}] ->
erlang:demonitor(MRef),
emqtt_session:destory(SessPid),
ets:delete(?TABLE, ClientId);
[] ->
ignore
end,
{reply, ok, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
@ -97,6 +147,10 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(emqtt_client, {{'_', DownPid, MRef}}),
{noreply, State};
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
@ -106,3 +160,4 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.