session
This commit is contained in:
parent
0301644793
commit
600a3b0e2c
|
@ -76,7 +76,15 @@ start_servers(Sup) ->
|
|||
end,
|
||||
[{"emqtt config", emqtt_config},
|
||||
{"emqtt client manager", emqtt_cm},
|
||||
{"emqtt session manager", emqtt_sm, SessOpts},
|
||||
{"emqtt session manager", emqtt_sm},
|
||||
%%TODO: fixme
|
||||
{"emqtt session supervisor", fun() ->
|
||||
Mod = emqtt_session_sup,
|
||||
supervisor:start_child(Sup,
|
||||
{Mod,
|
||||
{Mod, start_link, [SessOpts]},
|
||||
permanent, 1000, supervisor, [Mod]})
|
||||
end},
|
||||
{"emqtt auth", emqtt_auth},
|
||||
{"emqtt retained", emqtt_retained},
|
||||
{"emqtt pubsub", emqtt_pubsub},
|
||||
|
|
|
@ -86,6 +86,8 @@ topics() ->
|
|||
subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
|
||||
gen_server:call(?SERVER, {subscribe, {Topic, Qos}, SubPid}).
|
||||
|
||||
|
||||
|
||||
%%
|
||||
%% @doc Unsubscribe Topic
|
||||
%%
|
||||
|
|
|
@ -29,10 +29,11 @@
|
|||
%% ------------------------------------------------------------------
|
||||
%% API Function Exports
|
||||
%% ------------------------------------------------------------------
|
||||
-export([start/1, resume/1, publish/2, puback/2]).
|
||||
-export([start/1, resume/2, publish/2, puback/2, subscribe/2, unsubscribe/2]).
|
||||
|
||||
%%start gen_server
|
||||
-export([start_link/3]).
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% gen_server Function Exports
|
||||
%% ------------------------------------------------------------------
|
||||
|
@ -41,13 +42,13 @@
|
|||
terminate/2, code_change/3]).
|
||||
|
||||
-record(session_state, {
|
||||
client_id,
|
||||
client_pid,
|
||||
packet_id = 1,
|
||||
subscriptions = [],
|
||||
messages = [], %% do not receive rel
|
||||
awaiting_ack,
|
||||
awaiting_rel,
|
||||
client_id :: binary(),
|
||||
client_pid :: pid(),
|
||||
packet_id = 1,
|
||||
submap :: map(),
|
||||
messages = [], %% do not receive rel
|
||||
awaiting_ack :: map(),
|
||||
awaiting_rel :: map(),
|
||||
expires,
|
||||
max_queue }).
|
||||
|
||||
|
@ -66,54 +67,94 @@ start({false = CleanSess, ClientId, ClientPid}) ->
|
|||
%% ------------------------------------------------------------------
|
||||
%% Session API
|
||||
%% ------------------------------------------------------------------
|
||||
resume(SessState = #session_state{}, _ClientPid) ->
|
||||
resume(SessState = #session_state{}, _ClientPid) ->
|
||||
SessState;
|
||||
resume(SessPid, ClientPid) when is_pid(SessPid) ->
|
||||
resume(SessPid, ClientPid) when is_pid(SessPid) ->
|
||||
gen_server:cast(SessPid, {resume, ClientPid}),
|
||||
SessPid.
|
||||
|
||||
publish(_, {?QOS_0, Message}) ->
|
||||
emqtt_router:route(Message);
|
||||
|
||||
%%TODO:
|
||||
publish(_, {?QOS_1, Message}) ->
|
||||
emqtt_router:route(Message);
|
||||
|
||||
%%TODO:
|
||||
publish(Session = #session_state{awaiting_rel = Awaiting}, {?QOS_2, Message}) ->
|
||||
%% store gb_tree:
|
||||
Session#session_state{awaiting_rel = Awaiting};
|
||||
publish(SessState = #session_state{awaiting_rel = Awaiting},
|
||||
{?QOS_2, Message = #mqtt_message{ msgid = MsgId }}) ->
|
||||
%% store in awaiting map
|
||||
%%TODO: TIMEOUT
|
||||
Awaiting1 = maps:put(MsgId, Message, Awaiting),
|
||||
SessState#session_state{awaiting_rel = Awaiting1};
|
||||
|
||||
publish(_, {?QOS_2, Message}) ->
|
||||
%TODO:
|
||||
%put({msg, PacketId}, pubrec),
|
||||
emqtt_router:route(Message).
|
||||
publish(SessPid, {?QOS_2, Message}) when is_pid(SessPid) ->
|
||||
gen_server:cast(SessPid, {publish, ?QOS_2, Message}),
|
||||
SessPid.
|
||||
|
||||
puback(_, {?PUBACK, PacketId}) ->
|
||||
'TODO';
|
||||
puback(_, {?PUBREC, PacketId}) ->
|
||||
'TODO';
|
||||
puback(_, {?PUBREL, PacketId}) ->
|
||||
puback(SessState = #session_state{client_id = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) ->
|
||||
Awaiting1 =
|
||||
case maps:is_key(PacketId, Awaiting) of
|
||||
true -> maps:remove(PacketId, Awaiting);
|
||||
false -> lager:warning("~s puback packetid '~p' not exist", [ClientId, PacketId])
|
||||
end,
|
||||
SessState#session_state{awaiting_ack= Awaiting1};
|
||||
puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) ->
|
||||
gen_server:cast(SessPid, {puback, PacketId}), SessPid;
|
||||
|
||||
puback(SessState = #session_state{}, {?PUBREC, PacketId}) ->
|
||||
%%TODO'
|
||||
SessState;
|
||||
puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
|
||||
gen_server:cast(SessPid, {pubrec, PacketId}), SessPid;
|
||||
|
||||
puback(SessState = #session_state{}, {?PUBREL, PacketId}) ->
|
||||
%FIXME Later: should release the message here
|
||||
erase({msg, PacketId}),
|
||||
'TODO';
|
||||
puback(_, {?PUBCOMP, PacketId}) ->
|
||||
'TODO'.
|
||||
%%emqtt_router:route(Message).
|
||||
'TODO', erase({msg, PacketId}), SessState;
|
||||
puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) ->
|
||||
gen_server:cast(SessPid, {pubrel, PacketId}), SessPid;
|
||||
|
||||
subscribe(Session, Topics) ->
|
||||
%%TODO.
|
||||
{ok, Session, [Qos || {_Name, Qos} <- Topics]}.
|
||||
puback(SessState = #session_state{}, {?PUBCOMP, PacketId}) ->
|
||||
'TODO', SessState;
|
||||
puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
|
||||
gen_server:cast(SessPid, {pubcomp, PacketId}), SessPid.
|
||||
|
||||
unsubscribe(Session, Topics) ->
|
||||
%%TODO.
|
||||
{ok, Session}.
|
||||
subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) ->
|
||||
Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)],
|
||||
case Resubs of
|
||||
[] -> ok;
|
||||
_ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs])
|
||||
end,
|
||||
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
|
||||
[ok = emqtt_pubsub:subscribe({Topic, Qos}, self()) || {Topic, Qos} <- Topics],
|
||||
%%TODO: granted all?
|
||||
GrantedQos = [Qos || {_Name, Qos} <- Topics],
|
||||
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
|
||||
|
||||
subscribe(SessPid, Topics) when is_pid(SessPid) ->
|
||||
{ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}),
|
||||
{ok, SessPid, GrantedQos}.
|
||||
|
||||
unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) ->
|
||||
%%TODO: refactor later.
|
||||
case Topics -- maps:keys(SubMap) of
|
||||
[] -> ok;
|
||||
BadUnsubs -> lager:warning("~s should not unsubscribe ~p", [ClientId, BadUnsubs])
|
||||
end,
|
||||
%%unsubscribe from topic tree
|
||||
[ok = emqtt_pubsub:unsubscribe(Topic, self()) || Topic <- Topics],
|
||||
SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics),
|
||||
{ok, SessState#session_state{submap = SubMap1}};
|
||||
|
||||
unsubscribe(SessPid, Topics) ->
|
||||
gen_server:call(SessPid, {unsubscribe, Topics}),
|
||||
{ok, SessPid}.
|
||||
|
||||
initial_state(ClientId) ->
|
||||
#session_state { client_id = ClientId,
|
||||
packet_id = 1,
|
||||
subscriptions = [],
|
||||
awaiting_ack = gb_trees:empty(),
|
||||
awaiting_rel = gb_trees:empty() }.
|
||||
#session_state { client_id = ClientId,
|
||||
packet_id = 1,
|
||||
submap = #{},
|
||||
awaiting_ack = #{},
|
||||
awaiting_rel = #{} }.
|
||||
|
||||
initial_state(ClientId, ClientPid) ->
|
||||
State = initial_state(ClientId),
|
||||
|
@ -131,14 +172,43 @@ init([SessOpts, ClientId, ClientPid]) ->
|
|||
%%TODO: OK?
|
||||
true = link(ClientPid),
|
||||
State = initial_state(ClientId, ClientPid),
|
||||
{ok, State#state{ expires = proplists:get_value(expires, SessOpts, 24) * 3600,
|
||||
max_queue = proplists:get_value(max_queue, SessOpts, 1000) } }.
|
||||
{ok, State#session_state{
|
||||
expires = proplists:get_value(expires, SessOpts, 24) * 3600,
|
||||
max_queue = proplists:get_value(max_queue, SessOpts, 1000) } }.
|
||||
|
||||
handle_call({subscribe, Topics}, _From, State) ->
|
||||
{ok, NewState, GrantedQos} = subscribe(State, Topics),
|
||||
{reply, {ok, GrantedQos}, NewState};
|
||||
|
||||
handle_call({unsubscribe, Topics}, _From, State) ->
|
||||
{ok, NewState} = unsubscribe(State, Topics),
|
||||
{reply, ok, NewState};
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
handle_cast({publish, ?QOS_2, Message}, State) ->
|
||||
NewState = publish(State, {?QOS_2, Message}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({puback, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBACK, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({pubrec, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBREC, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({pubrel, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBREL, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast({pubcomp, PacketId}, State) ->
|
||||
NewState = puback(State, {?PUBCOMP, PacketId}),
|
||||
{noreply, NewState};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
{stop, {badmsg, Msg}, State}.
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
|
||||
-behavior(supervisor).
|
||||
|
||||
-export([start_link/0, start_session/2]).
|
||||
-export([start_link/1, start_session/2]).
|
||||
|
||||
-export([init/1]).
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@
|
|||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([lookup_session/1, start_session/2, destroy_session/1]).
|
||||
-export([lookup_session/1, start_session/2, destory_session/1]).
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% gen_server Function Exports
|
||||
|
@ -75,7 +75,7 @@
|
|||
|
||||
-spec(start_session/2 :: (binary(), pid()) -> {ok, pid()} | {error, any()}).
|
||||
|
||||
-spec(destroy_session/1 :: (binary()) -> ok).
|
||||
-spec(destory_session/1 :: (binary()) -> ok).
|
||||
|
||||
-endif.
|
||||
|
||||
|
@ -107,15 +107,15 @@ destory_session(ClientId) ->
|
|||
%% gen_server Function Definitions
|
||||
%% ------------------------------------------------------------------
|
||||
|
||||
init() ->
|
||||
init([]) ->
|
||||
process_flag(trap_exit, true),
|
||||
ets:new(?TABLE, [set, protected, named_table]),
|
||||
{ok, State}.
|
||||
{ok, #state{}}.
|
||||
|
||||
handle_call({start_session, ClientId, ClientPid}, _From, State) ->
|
||||
Reply =
|
||||
case ets:lookup(?TABLE, ClientId) of
|
||||
[{_, SessPid, MRef}] ->
|
||||
[{_, SessPid, _MRef}] ->
|
||||
emqtt_session:resume(SessPid, ClientPid),
|
||||
{ok, SessPid};
|
||||
[] ->
|
||||
|
|
Loading…
Reference in New Issue