diff --git a/apps/emqtt/src/emqtt_app.erl b/apps/emqtt/src/emqtt_app.erl index f742a0167..ce65a92ec 100644 --- a/apps/emqtt/src/emqtt_app.erl +++ b/apps/emqtt/src/emqtt_app.erl @@ -76,7 +76,15 @@ start_servers(Sup) -> end, [{"emqtt config", emqtt_config}, {"emqtt client manager", emqtt_cm}, - {"emqtt session manager", emqtt_sm, SessOpts}, + {"emqtt session manager", emqtt_sm}, + %%TODO: fixme + {"emqtt session supervisor", fun() -> + Mod = emqtt_session_sup, + supervisor:start_child(Sup, + {Mod, + {Mod, start_link, [SessOpts]}, + permanent, 1000, supervisor, [Mod]}) + end}, {"emqtt auth", emqtt_auth}, {"emqtt retained", emqtt_retained}, {"emqtt pubsub", emqtt_pubsub}, diff --git a/apps/emqtt/src/emqtt_pubsub.erl b/apps/emqtt/src/emqtt_pubsub.erl index 84f162fdf..a3c73a9bb 100644 --- a/apps/emqtt/src/emqtt_pubsub.erl +++ b/apps/emqtt/src/emqtt_pubsub.erl @@ -86,6 +86,8 @@ topics() -> subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) -> gen_server:call(?SERVER, {subscribe, {Topic, Qos}, SubPid}). + + %% %% @doc Unsubscribe Topic %% diff --git a/apps/emqtt/src/emqtt_session.erl b/apps/emqtt/src/emqtt_session.erl index 059a70e34..20334ac93 100644 --- a/apps/emqtt/src/emqtt_session.erl +++ b/apps/emqtt/src/emqtt_session.erl @@ -29,10 +29,11 @@ %% ------------------------------------------------------------------ %% API Function Exports %% ------------------------------------------------------------------ --export([start/1, resume/1, publish/2, puback/2]). +-export([start/1, resume/2, publish/2, puback/2, subscribe/2, unsubscribe/2]). %%start gen_server -export([start_link/3]). + %% ------------------------------------------------------------------ %% gen_server Function Exports %% ------------------------------------------------------------------ @@ -41,13 +42,13 @@ terminate/2, code_change/3]). -record(session_state, { - client_id, - client_pid, - packet_id = 1, - subscriptions = [], - messages = [], %% do not receive rel - awaiting_ack, - awaiting_rel, + client_id :: binary(), + client_pid :: pid(), + packet_id = 1, + submap :: map(), + messages = [], %% do not receive rel + awaiting_ack :: map(), + awaiting_rel :: map(), expires, max_queue }). @@ -66,54 +67,94 @@ start({false = CleanSess, ClientId, ClientPid}) -> %% ------------------------------------------------------------------ %% Session API %% ------------------------------------------------------------------ -resume(SessState = #session_state{}, _ClientPid) -> +resume(SessState = #session_state{}, _ClientPid) -> SessState; -resume(SessPid, ClientPid) when is_pid(SessPid) -> +resume(SessPid, ClientPid) when is_pid(SessPid) -> gen_server:cast(SessPid, {resume, ClientPid}), SessPid. publish(_, {?QOS_0, Message}) -> emqtt_router:route(Message); - %%TODO: publish(_, {?QOS_1, Message}) -> emqtt_router:route(Message); - %%TODO: -publish(Session = #session_state{awaiting_rel = Awaiting}, {?QOS_2, Message}) -> - %% store gb_tree: - Session#session_state{awaiting_rel = Awaiting}; +publish(SessState = #session_state{awaiting_rel = Awaiting}, + {?QOS_2, Message = #mqtt_message{ msgid = MsgId }}) -> + %% store in awaiting map + %%TODO: TIMEOUT + Awaiting1 = maps:put(MsgId, Message, Awaiting), + SessState#session_state{awaiting_rel = Awaiting1}; -publish(_, {?QOS_2, Message}) -> - %TODO: - %put({msg, PacketId}, pubrec), - emqtt_router:route(Message). +publish(SessPid, {?QOS_2, Message}) when is_pid(SessPid) -> + gen_server:cast(SessPid, {publish, ?QOS_2, Message}), + SessPid. -puback(_, {?PUBACK, PacketId}) -> - 'TODO'; -puback(_, {?PUBREC, PacketId}) -> - 'TODO'; -puback(_, {?PUBREL, PacketId}) -> +puback(SessState = #session_state{client_id = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> + Awaiting1 = + case maps:is_key(PacketId, Awaiting) of + true -> maps:remove(PacketId, Awaiting); + false -> lager:warning("~s puback packetid '~p' not exist", [ClientId, PacketId]) + end, + SessState#session_state{awaiting_ack= Awaiting1}; +puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) -> + gen_server:cast(SessPid, {puback, PacketId}), SessPid; + +puback(SessState = #session_state{}, {?PUBREC, PacketId}) -> + %%TODO' + SessState; +puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> + gen_server:cast(SessPid, {pubrec, PacketId}), SessPid; + +puback(SessState = #session_state{}, {?PUBREL, PacketId}) -> %FIXME Later: should release the message here - erase({msg, PacketId}), - 'TODO'; -puback(_, {?PUBCOMP, PacketId}) -> - 'TODO'. + %%emqtt_router:route(Message). + 'TODO', erase({msg, PacketId}), SessState; +puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) -> + gen_server:cast(SessPid, {pubrel, PacketId}), SessPid; -subscribe(Session, Topics) -> - %%TODO. - {ok, Session, [Qos || {_Name, Qos} <- Topics]}. +puback(SessState = #session_state{}, {?PUBCOMP, PacketId}) -> + 'TODO', SessState; +puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> + gen_server:cast(SessPid, {pubcomp, PacketId}), SessPid. -unsubscribe(Session, Topics) -> - %%TODO. - {ok, Session}. +subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) -> + Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)], + case Resubs of + [] -> ok; + _ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs]) + end, + SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), + [ok = emqtt_pubsub:subscribe({Topic, Qos}, self()) || {Topic, Qos} <- Topics], + %%TODO: granted all? + GrantedQos = [Qos || {_Name, Qos} <- Topics], + {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; + +subscribe(SessPid, Topics) when is_pid(SessPid) -> + {ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}), + {ok, SessPid, GrantedQos}. + +unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) -> + %%TODO: refactor later. + case Topics -- maps:keys(SubMap) of + [] -> ok; + BadUnsubs -> lager:warning("~s should not unsubscribe ~p", [ClientId, BadUnsubs]) + end, + %%unsubscribe from topic tree + [ok = emqtt_pubsub:unsubscribe(Topic, self()) || Topic <- Topics], + SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics), + {ok, SessState#session_state{submap = SubMap1}}; + +unsubscribe(SessPid, Topics) -> + gen_server:call(SessPid, {unsubscribe, Topics}), + {ok, SessPid}. initial_state(ClientId) -> - #session_state { client_id = ClientId, - packet_id = 1, - subscriptions = [], - awaiting_ack = gb_trees:empty(), - awaiting_rel = gb_trees:empty() }. + #session_state { client_id = ClientId, + packet_id = 1, + submap = #{}, + awaiting_ack = #{}, + awaiting_rel = #{} }. initial_state(ClientId, ClientPid) -> State = initial_state(ClientId), @@ -131,14 +172,43 @@ init([SessOpts, ClientId, ClientPid]) -> %%TODO: OK? true = link(ClientPid), State = initial_state(ClientId, ClientPid), - {ok, State#state{ expires = proplists:get_value(expires, SessOpts, 24) * 3600, - max_queue = proplists:get_value(max_queue, SessOpts, 1000) } }. + {ok, State#session_state{ + expires = proplists:get_value(expires, SessOpts, 24) * 3600, + max_queue = proplists:get_value(max_queue, SessOpts, 1000) } }. + +handle_call({subscribe, Topics}, _From, State) -> + {ok, NewState, GrantedQos} = subscribe(State, Topics), + {reply, {ok, GrantedQos}, NewState}; + +handle_call({unsubscribe, Topics}, _From, State) -> + {ok, NewState} = unsubscribe(State, Topics), + {reply, ok, NewState}; handle_call(_Request, _From, State) -> {reply, ok, State}. -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast({publish, ?QOS_2, Message}, State) -> + NewState = publish(State, {?QOS_2, Message}), + {noreply, NewState}; + +handle_cast({puback, PacketId}, State) -> + NewState = puback(State, {?PUBACK, PacketId}), + {noreply, NewState}; + +handle_cast({pubrec, PacketId}, State) -> + NewState = puback(State, {?PUBREC, PacketId}), + {noreply, NewState}; + +handle_cast({pubrel, PacketId}, State) -> + NewState = puback(State, {?PUBREL, PacketId}), + {noreply, NewState}; + +handle_cast({pubcomp, PacketId}, State) -> + NewState = puback(State, {?PUBCOMP, PacketId}), + {noreply, NewState}; + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. handle_info(_Info, State) -> {noreply, State}. diff --git a/apps/emqtt/src/emqtt_session_sup.erl b/apps/emqtt/src/emqtt_session_sup.erl index d4a118bbf..01ea4db7e 100644 --- a/apps/emqtt/src/emqtt_session_sup.erl +++ b/apps/emqtt/src/emqtt_session_sup.erl @@ -26,7 +26,7 @@ -behavior(supervisor). --export([start_link/0, start_session/2]). +-export([start_link/1, start_session/2]). -export([init/1]). diff --git a/apps/emqtt/src/emqtt_sm.erl b/apps/emqtt/src/emqtt_sm.erl index 7714034ef..e473e057c 100644 --- a/apps/emqtt/src/emqtt_sm.erl +++ b/apps/emqtt/src/emqtt_sm.erl @@ -55,7 +55,7 @@ -export([start_link/0]). --export([lookup_session/1, start_session/2, destroy_session/1]). +-export([lookup_session/1, start_session/2, destory_session/1]). %% ------------------------------------------------------------------ %% gen_server Function Exports @@ -75,7 +75,7 @@ -spec(start_session/2 :: (binary(), pid()) -> {ok, pid()} | {error, any()}). --spec(destroy_session/1 :: (binary()) -> ok). +-spec(destory_session/1 :: (binary()) -> ok). -endif. @@ -107,15 +107,15 @@ destory_session(ClientId) -> %% gen_server Function Definitions %% ------------------------------------------------------------------ -init() -> +init([]) -> process_flag(trap_exit, true), ets:new(?TABLE, [set, protected, named_table]), - {ok, State}. + {ok, #state{}}. handle_call({start_session, ClientId, ClientPid}, _From, State) -> Reply = case ets:lookup(?TABLE, ClientId) of - [{_, SessPid, MRef}] -> + [{_, SessPid, _MRef}] -> emqtt_session:resume(SessPid, ClientPid), {ok, SessPid}; [] ->