session destroy
This commit is contained in:
parent
40faeddd0e
commit
f54986dd0d
|
@ -29,7 +29,7 @@
|
|||
%% ------------------------------------------------------------------
|
||||
%% API Function Exports
|
||||
%% ------------------------------------------------------------------
|
||||
-export([start/1, resume/3, publish/2, puback/2, subscribe/2, unsubscribe/2]).
|
||||
-export([start/1, resume/3, publish/2, puback/2, subscribe/2, unsubscribe/2, destroy/2]).
|
||||
|
||||
%%start gen_server
|
||||
-export([start_link/3]).
|
||||
|
@ -56,12 +56,12 @@
|
|||
%% ------------------------------------------------------------------
|
||||
%% Start Session
|
||||
%% ------------------------------------------------------------------
|
||||
start({true = CleanSess, ClientId, _ClientPid}) ->
|
||||
start({true = _CleanSess, ClientId, _ClientPid}) ->
|
||||
%%Destroy old session if CleanSess is true before.
|
||||
ok = emqtt_sm:destory_session(ClientId),
|
||||
ok = emqtt_sm:destroy_session(ClientId),
|
||||
{ok, initial_state(ClientId)};
|
||||
|
||||
start({false = CleanSess, ClientId, ClientPid}) ->
|
||||
start({false = _CleanSess, ClientId, ClientPid}) ->
|
||||
{ok, SessPid} = emqtt_sm:start_session(ClientId, ClientPid),
|
||||
{ok, SessPid}.
|
||||
|
||||
|
@ -146,10 +146,13 @@ unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, T
|
|||
SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics),
|
||||
{ok, SessState#session_state{submap = SubMap1}};
|
||||
|
||||
unsubscribe(SessPid, Topics) ->
|
||||
unsubscribe(SessPid, Topics) when is_pid(SessPid) ->
|
||||
gen_server:call(SessPid, {unsubscribe, Topics}),
|
||||
{ok, SessPid}.
|
||||
|
||||
destroy(SessPid, ClientId) when is_pid(SessPid) ->
|
||||
gen_server:cast(SessPid, {destroy, ClientId}).
|
||||
|
||||
initial_state(ClientId) ->
|
||||
#session_state { client_id = ClientId,
|
||||
packet_id = 1,
|
||||
|
@ -219,6 +222,10 @@ handle_cast({pubcomp, PacketId}, State) ->
|
|||
NewState = puback(State, {?PUBCOMP, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({destroy, ClientId}, State = #session_state{client_id = ClientId}) ->
|
||||
lager:warning("Session: ~s destroyed", [ClientId]),
|
||||
{stop, normal, State};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
{stop, {badmsg, Msg}, State}.
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@
|
|||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([lookup_session/1, start_session/2, destory_session/1]).
|
||||
-export([lookup_session/1, start_session/2, destroy_session/1]).
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% gen_server Function Exports
|
||||
|
@ -75,7 +75,7 @@
|
|||
|
||||
-spec(start_session/2 :: (binary(), pid()) -> {ok, pid()} | {error, any()}).
|
||||
|
||||
-spec(destory_session/1 :: (binary()) -> ok).
|
||||
-spec(destroy_session/1 :: (binary()) -> ok).
|
||||
|
||||
-endif.
|
||||
|
||||
|
@ -100,8 +100,8 @@ lookup_session(ClientId) ->
|
|||
start_session(ClientId, ClientPid) ->
|
||||
gen_server:call(?SERVER, {start_session, ClientId, ClientPid}).
|
||||
|
||||
destory_session(ClientId) ->
|
||||
gen_server:call(?SERVER, {destory_session, ClientId}).
|
||||
destroy_session(ClientId) ->
|
||||
gen_server:call(?SERVER, {destroy_session, ClientId}).
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% gen_server Function Definitions
|
||||
|
@ -130,11 +130,11 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) ->
|
|||
end,
|
||||
{reply, Reply, State};
|
||||
|
||||
handle_call({destory_session, ClientId}, _From, State) ->
|
||||
handle_call({destroy_session, ClientId}, _From, State) ->
|
||||
case ets:lookup(?TABLE, ClientId) of
|
||||
[{_, SessPid, MRef}] ->
|
||||
erlang:demonitor(MRef),
|
||||
emqtt_session:destory(SessPid),
|
||||
emqtt_session:destory(SessPid, ClientId),
|
||||
ets:delete(?TABLE, ClientId);
|
||||
[] ->
|
||||
ignore
|
||||
|
|
Loading…
Reference in New Issue