diff --git a/apps/emqtt/src/emqtt_session.erl b/apps/emqtt/src/emqtt_session.erl index 72c2757fb..2c1304e03 100644 --- a/apps/emqtt/src/emqtt_session.erl +++ b/apps/emqtt/src/emqtt_session.erl @@ -29,7 +29,7 @@ %% ------------------------------------------------------------------ %% API Function Exports %% ------------------------------------------------------------------ --export([start/1, resume/3, publish/2, puback/2, subscribe/2, unsubscribe/2]). +-export([start/1, resume/3, publish/2, puback/2, subscribe/2, unsubscribe/2, destroy/2]). %%start gen_server -export([start_link/3]). @@ -56,12 +56,12 @@ %% ------------------------------------------------------------------ %% Start Session %% ------------------------------------------------------------------ -start({true = CleanSess, ClientId, _ClientPid}) -> +start({true = _CleanSess, ClientId, _ClientPid}) -> %%Destroy old session if CleanSess is true before. - ok = emqtt_sm:destory_session(ClientId), + ok = emqtt_sm:destroy_session(ClientId), {ok, initial_state(ClientId)}; -start({false = CleanSess, ClientId, ClientPid}) -> +start({false = _CleanSess, ClientId, ClientPid}) -> {ok, SessPid} = emqtt_sm:start_session(ClientId, ClientPid), {ok, SessPid}. @@ -146,10 +146,13 @@ unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, T SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics), {ok, SessState#session_state{submap = SubMap1}}; -unsubscribe(SessPid, Topics) -> +unsubscribe(SessPid, Topics) when is_pid(SessPid) -> gen_server:call(SessPid, {unsubscribe, Topics}), {ok, SessPid}. +destroy(SessPid, ClientId) when is_pid(SessPid) -> + gen_server:cast(SessPid, {destroy, ClientId}). + initial_state(ClientId) -> #session_state { client_id = ClientId, packet_id = 1, @@ -219,6 +222,10 @@ handle_cast({pubcomp, PacketId}, State) -> NewState = puback(State, {?PUBCOMP, PacketId}), {noreply, NewState}; +handle_cast({destroy, ClientId}, State = #session_state{client_id = ClientId}) -> + lager:warning("Session: ~s destroyed", [ClientId]), + {stop, normal, State}; + handle_cast(Msg, State) -> {stop, {badmsg, Msg}, State}. diff --git a/apps/emqtt/src/emqtt_sm.erl b/apps/emqtt/src/emqtt_sm.erl index 406c699be..f98dc27d3 100644 --- a/apps/emqtt/src/emqtt_sm.erl +++ b/apps/emqtt/src/emqtt_sm.erl @@ -55,7 +55,7 @@ -export([start_link/0]). --export([lookup_session/1, start_session/2, destory_session/1]). +-export([lookup_session/1, start_session/2, destroy_session/1]). %% ------------------------------------------------------------------ %% gen_server Function Exports @@ -75,7 +75,7 @@ -spec(start_session/2 :: (binary(), pid()) -> {ok, pid()} | {error, any()}). --spec(destory_session/1 :: (binary()) -> ok). +-spec(destroy_session/1 :: (binary()) -> ok). -endif. @@ -100,8 +100,8 @@ lookup_session(ClientId) -> start_session(ClientId, ClientPid) -> gen_server:call(?SERVER, {start_session, ClientId, ClientPid}). -destory_session(ClientId) -> - gen_server:call(?SERVER, {destory_session, ClientId}). +destroy_session(ClientId) -> + gen_server:call(?SERVER, {destroy_session, ClientId}). %% ------------------------------------------------------------------ %% gen_server Function Definitions @@ -130,11 +130,11 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) -> end, {reply, Reply, State}; -handle_call({destory_session, ClientId}, _From, State) -> +handle_call({destroy_session, ClientId}, _From, State) -> case ets:lookup(?TABLE, ClientId) of [{_, SessPid, MRef}] -> erlang:demonitor(MRef), - emqtt_session:destory(SessPid), + emqtt_session:destory(SessPid, ClientId), ets:delete(?TABLE, ClientId); [] -> ignore