diff --git a/apps/emqtt/include/emqtt_systop.hrl b/apps/emqtt/include/emqtt_systop.hrl index 382624c14..93ceff73c 100644 --- a/apps/emqtt/include/emqtt_systop.hrl +++ b/apps/emqtt/include/emqtt_systop.hrl @@ -68,37 +68,38 @@ %% Bytes sent and received of Broker %%------------------------------------------------------------------------------ -define(SYSTOP_BYTES, [ - 'bytes/received', % Total bytes received - 'bytes/sent' % Total bytes sent + {counter, 'bytes/received'}, % Total bytes received + {counter, 'bytes/sent'} % Total bytes sent ]). %%------------------------------------------------------------------------------ %% Packets sent and received of Broker %%------------------------------------------------------------------------------ -define(SYSTOP_PACKETS, [ - 'packets/received', % All Packets received - 'packets/sent', % All Packets sent - 'packets/connect', % CONNECT Packets received - 'packets/connack', % CONNACK Packets sent - 'packets/publish/received', % PUBLISH packets received - 'packets/publish/sent', % PUBLISH packets sent - 'packets/subscribe', % SUBSCRIBE Packets received - 'packets/suback', % SUBACK packets sent - 'packets/unsubscribe', % UNSUBSCRIBE Packets received - 'packets/unsuback', % UNSUBACK Packets sent - 'packets/pingreq', % PINGREQ packets received - 'packets/pingresp', % PINGRESP Packets sent - 'packets/disconnect' % DISCONNECT Packets received + {counter, 'packets/received'}, % All Packets received + {counter, 'packets/sent'}, % All Packets sent + {counter, 'packets/connect'}, % CONNECT Packets received + {counter, 'packets/connack'}, % CONNACK Packets sent + {counter, 'packets/publish/received'}, % PUBLISH packets received + {counter, 'packets/publish/sent'}, % PUBLISH packets sent + {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received + {counter, 'packets/suback'}, % SUBACK packets sent + {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received + {counter, 'packets/unsuback'}, % UNSUBACK Packets sent + {counter, 'packets/pingreq'}, % PINGREQ packets received + {counter, 'packets/pingresp'}, % PINGRESP Packets sent + {counter, 'packets/disconnect'} % DISCONNECT Packets received ]). %%------------------------------------------------------------------------------ %% Messages sent and received of broker %%------------------------------------------------------------------------------ -define(SYSTOP_MESSAGES, [ - 'messages/received', % Messages received - 'messages/sent', % Messages sent - 'messages/retained/count',% Messagea retained - 'messages/stored', % Messages stored - 'messages/dropped' % Messages dropped + {counter, 'messages/received'}, % Messages received + {counter, 'messages/sent'}, % Messages sent + {gauge, 'messages/retained/count'},% Messagea retained + {gauge, 'messages/stored/count'}, % Messages stored + {counter, 'messages/dropped'} % Messages dropped ]). + diff --git a/apps/emqtt/src/emqtt_metrics.erl b/apps/emqtt/src/emqtt_metrics.erl index 07f18c02c..208782033 100644 --- a/apps/emqtt/src/emqtt_metrics.erl +++ b/apps/emqtt/src/emqtt_metrics.erl @@ -40,8 +40,9 @@ -export([start_link/1]). -export([all/0, value/1, - inc/1, inc/2, - dec/1, dec/2]). + inc/1, inc/2, inc/3, + dec/2, dec/3, + set/2]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -82,7 +83,7 @@ all() -> %%------------------------------------------------------------------------------ %% @doc -%% Get metric value +%% Get metric value. %% %% @end %%------------------------------------------------------------------------------ @@ -92,23 +93,49 @@ value(Metric) -> %%------------------------------------------------------------------------------ %% @doc -%% Increase metric value +%% Increase counter. %% %% @end %%------------------------------------------------------------------------------ -spec inc(atom()) -> non_neg_integer(). inc(Metric) -> - inc(Metric, 1). + inc(counter, Metric, 1). %%------------------------------------------------------------------------------ %% @doc -%% Increase metric value +%% Increase metric value. %% %% @end %%------------------------------------------------------------------------------ --spec inc(atom(), pos_integer()) -> pos_integer(). -inc(Metric, Val) -> - ets:update_counter(?METRIC_TAB, key(Metric), {2, Val}). +-spec inc(counter | gauge, atom()) -> non_neg_integer(). +inc(gauge, Metric) -> + inc(gauge, Metric, 1); +inc(counter, Metric) -> + inc(counter, Metric, 1); +inc(Metric, Val) when is_atom(Metric) and is_integer(Val) -> + inc(counter, Metric, Val). + +%%------------------------------------------------------------------------------ +%% @doc +%% Increase metric value. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec inc(counter | gauge, atom(), pos_integer()) -> pos_integer(). +inc(gauge, Metric, Val) -> + ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, Val}); +inc(counter, Metric, Val) -> + ets:update_counter(?METRIC_TAB, key(counter, Metric), {2, Val}). + +%%------------------------------------------------------------------------------ +%% @doc +%% Decrease metric value. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec dec(gauge, atom()) -> integer(). +dec(gauge, Metric) -> + dec(gauge, Metric, 1). %%------------------------------------------------------------------------------ %% @doc @@ -116,20 +143,20 @@ inc(Metric, Val) -> %% %% @end %%------------------------------------------------------------------------------ --spec dec(atom()) -> integer(). -dec(Metric) -> - dec(Metric, 1). +-spec dec(gauge, atom(), pos_integer()) -> integer(). +dec(gauge, Metric, Val) -> + ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, -Val}). %%------------------------------------------------------------------------------ %% @doc -%% Decrease metric value +%% Set metric value. %% %% @end %%------------------------------------------------------------------------------ --spec dec(atom(), pos_integer()) -> integer(). -dec(Metric, Val) -> - %TODO: ok? - ets:update_counter(?METRIC_TAB, key(Metric), {2, -Val}). +set(Metric, Val) when is_atom(Metric) -> + set(gauge, Metric, Val). +set(gauge, Metric, Val) -> + ets:insert(?METRIC_TAB, key(gauge, Metric), Val). %%------------------------------------------------------------------------------ %% @doc @@ -138,38 +165,40 @@ dec(Metric, Val) -> %% %% @end %%------------------------------------------------------------------------------ -key(Metric) -> +key(gauge, Metric) -> + {Metric, 0}; +key(counter, Metric) -> {Metric, erlang:system_info(scheduler_id)}. -%% ------------------------------------------------------------------ -%% gen_server Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= init(Options) -> random:seed(now()), - Topics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, + Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, % Create metrics table ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), % Init metrics - [new_metric(Topic) || Topic <- Topics], + [new_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics - [{atomic, _} = emqtt_pubsub:create(systop(Topic)) || Topic <- Topics], + [{atomic, _} = emqtt_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics], PubInterval = proplists:get_value(pub_interval, Options, 60), {ok, tick(random:uniform(PubInterval), #state{pub_interval = PubInterval}), hibernate}. -handle_call(_Request, _From, State) -> - {reply, ok, State}. +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. handle_info(tick, State) -> % publish metric message [publish(systop(Metric), i2b(Val))|| {Metric, Val} <- all()], {noreply, tick(State), hibernate}; -handle_info(_Info, State) -> - {noreply, State}. +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. terminate(_Reason, _State) -> ok. @@ -187,7 +216,10 @@ systop(Name) when is_atom(Name) -> publish(Topic, Payload) -> emqtt_router:route(#mqtt_message{topic = Topic, payload = Payload}). -new_metric(Name) -> +new_metric({gauge, Name}) -> + ets:insert(?METRIC_TAB, {{Name, 0}, 0}); + +new_metric({counter, Name}) -> Schedulers = lists:seq(1, erlang:system_info(schedulers)), [ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers]. @@ -200,3 +232,4 @@ tick(Delay, State) -> i2b(I) -> list_to_binary(integer_to_list(I)). + diff --git a/apps/emqtt/src/emqtt_protocol.erl b/apps/emqtt/src/emqtt_protocol.erl index 984ace5c4..2d1f42cb5 100644 --- a/apps/emqtt/src/emqtt_protocol.erl +++ b/apps/emqtt/src/emqtt_protocol.erl @@ -318,3 +318,4 @@ inc(?PINGRESP) -> inc(_) -> ingore. + diff --git a/apps/emqtt/src/emqtt_pubsub.erl b/apps/emqtt/src/emqtt_pubsub.erl index 2d28e9cc4..6521b9649 100644 --- a/apps/emqtt/src/emqtt_pubsub.erl +++ b/apps/emqtt/src/emqtt_pubsub.erl @@ -63,7 +63,6 @@ terminate/2, code_change/3]). - -record(state, {max_subs = 0}). %%%============================================================================= @@ -148,14 +147,14 @@ publish(Msg=#mqtt_message{topic=Topic}) -> -spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any(). publish(Topic, Msg) when is_binary(Topic) -> - lists:foreach(fun(#topic{name=Name, node=Node}) -> + Count = + lists:foldl(fun(#topic{name=Name, node=Node}, Acc) -> case Node =:= node() of - true -> dispatch(Name, Msg); - false -> rpc:call(Node, ?MODULE, dispatch, [Name, Msg]) - end - end, match(Topic)). - -%%TODO: dispatch counts.... + true -> dispatch(Name, Msg) + Acc; + false -> rpc:call(Node, ?MODULE, dispatch, [Name, Msg]) + Acc + end + end, 0, match(Topic)), + dropped(Count =:= 0). %%------------------------------------------------------------------------------ %% @doc @@ -163,14 +162,18 @@ publish(Topic, Msg) when is_binary(Topic) -> %% %% @end %%------------------------------------------------------------------------------ +-spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> - lists:foreach(fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) -> - Msg1 = if - Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; - true -> Msg - end, - SubPid ! {dispatch, {self(), Msg1}} - end, ets:lookup(topic_subscriber, Topic)). + Subscribers = ets:lookup(topic_subscriber, Topic), + lists:foreach( + fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) -> + Msg1 = if + Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; + true -> Msg + end, + SubPid ! {dispatch, {self(), Msg1}} + end, Subscribers), + length(Subscribers). %%------------------------------------------------------------------------------ %% @doc @@ -408,4 +411,8 @@ setstats(State = #state{max_subs = Max}) -> State end. +dropped(true) -> + emqtt_metrics:inc('messages/dropped'); +dropped(false) -> + ok. diff --git a/apps/emqtt/src/emqtt_server.erl b/apps/emqtt/src/emqtt_server.erl index 8f41253b7..e1e38b1bb 100644 --- a/apps/emqtt/src/emqtt_server.erl +++ b/apps/emqtt/src/emqtt_server.erl @@ -1,113 +1,111 @@ -%%----------------------------------------------------------------------------- -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqtt server. retain messages??? +%%% TODO: redesign... +%%% @end +%%%----------------------------------------------------------------------------- -module(emqtt_server). -author('feng@slimpp.io'). +-behaviour(gen_server). + +-define(SERVER, ?MODULE). + -include("emqtt.hrl"). -include("emqtt_topic.hrl"). -include("emqtt_packet.hrl"). --behaviour(gen_server). - --define(SERVER, ?MODULE). - --define(RETAINED_TAB, mqtt_retained). - --define(STORE_LIMIT, 100000). - -record(mqtt_retained, {topic, qos, payload}). -record(state, {store_limit}). -%% ------------------------------------------------------------------ -%% API Function Exports -%% ------------------------------------------------------------------ +-define(RETAINED_TAB, mqtt_retained). -%%TODO: subscribe +-define(STORE_LIMIT, 1000000). + +%% API Function Exports -export([start_link/1, retain/1, subscribe/2]). -%% ------------------------------------------------------------------ %% gen_server Function Exports -%% ------------------------------------------------------------------ - -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%% ------------------------------------------------------------------ -%% API Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% API +%%%============================================================================= -start_link(RetainOpts) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [RetainOpts], []). +-spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}. +start_link(Opts) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). -retain(#mqtt_message{ retain = false }) -> ignore; +retain(#mqtt_message{retain = false}) -> ignore; %% RETAIN flag set to 1 and payload containing zero bytes -retain(#mqtt_message{ retain = true, topic = Topic, payload = <<>> }) -> +retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> mnesia:dirty_delete(?RETAINED_TAB, Topic); retain(Msg = #mqtt_message{retain = true}) -> gen_server:cast(?SERVER, {retain, Msg}). -%% +%% TODO: this is not right??? subscribe(Topics, CPid) when is_pid(CPid) -> - lager:info("Retained Topics: ~p", [match(Topics)]), RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)]), - lager:info("Retained Messages: ~p", [RetainedMsgs]), lists:foreach(fun(Msg) -> CPid ! {dispatch, {self(), retained_msg(Msg)}} end, RetainedMsgs). -%% ------------------------------------------------------------------ -%% gen_server Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= -init([RetainOpts]) -> +init([Opts]) -> mnesia:create_table(mqtt_retained, [ {type, ordered_set}, {ram_copies, [node()]}, {attributes, record_info(fields, mqtt_retained)}]), mnesia:add_table_copy(mqtt_retained, node(), ram_copies), - Limit = proplists:get_value(store_limit, RetainOpts, ?STORE_LIMIT), + Limit = proplists:get_value(store_limit, Opts, ?STORE_LIMIT), {ok, #state{store_limit = Limit}}. handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. -handle_cast({retain, Msg = #mqtt_message{ qos = Qos, - topic = Topic, - payload = Payload }}, State = #state{store_limit = Limit}) -> +handle_cast({retain, Msg = #mqtt_message{qos = Qos, + topic = Topic, + payload = Payload}}, + State = #state{store_limit = Limit}) -> case mnesia:table_info(?RETAINED_TAB, size) of Size when Size >= Limit -> - lager:error("Server dropped message(retain) for table is full: ~p", [Msg]); - _ -> - %emqtt_metrics:update('messages/retained', Size), - lager:info("Server retained message: ~p", [Msg]), - mnesia:dirty_write(#mqtt_retained{ topic = Topic, - qos = Qos, - payload = Payload }) + lager:error("Dropped message(retain) for table is full: ~p", [Msg]); + Size -> + lager:debug("Retained message: ~p", [Msg]), + mnesia:dirty_write(#mqtt_retained{qos = Qos, + topic = Topic, + payload = Payload}), + emqtt_metrics:set('messages/retained/count', Size) end, {noreply, State}; @@ -123,9 +121,10 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% Internal functions +%%%============================================================================= + match(Topics) -> RetainedTopics = mnesia:dirty_all_keys(?RETAINED_TAB), lists:flatten([match(Topic, RetainedTopics) || Topic <- Topics]). @@ -135,9 +134,9 @@ match(Topic, RetainedTopics) -> direct -> %% FIXME [Topic]; wildcard -> - [ T || T <- RetainedTopics, emqtt_topic:match(T, Topic) ] + [T || T <- RetainedTopics, emqtt_topic:match(T, Topic)] end. retained_msg(#mqtt_retained{topic = Topic, qos = Qos, payload = Payload}) -> - #mqtt_message { qos = Qos, retain = true, topic = Topic, payload = Payload }. + #mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}. diff --git a/apps/emqtt/src/emqtt_session.erl b/apps/emqtt/src/emqtt_session.erl index e52890a1a..186b9a4fd 100644 --- a/apps/emqtt/src/emqtt_session.erl +++ b/apps/emqtt/src/emqtt_session.erl @@ -1,45 +1,50 @@ -%%----------------------------------------------------------------------------- -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ - +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqtt session. +%%% +%%% @end +%%%----------------------------------------------------------------------------- -module(emqtt_session). -include("emqtt.hrl"). -include("emqtt_packet.hrl"). -%% ------------------------------------------------------------------ %% API Function Exports -%% ------------------------------------------------------------------ --export([start/1, resume/3, publish/2, puback/2, subscribe/2, unsubscribe/2, destroy/2]). +-export([start/1, + resume/3, + publish/2, + puback/2, + subscribe/2, + unsubscribe/2, + destroy/2]). -export([store/2]). -%%start gen_server +%% Start gen_server -export([start_link/3]). -%% ------------------------------------------------------------------ %% gen_server Function Exports -%% ------------------------------------------------------------------ - -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -53,11 +58,21 @@ awaiting_rel :: map(), awaiting_comp :: map(), expires, - expire_timer }). + expire_timer}). -%% ------------------------------------------------------------------ -%% Start Session -%% ------------------------------------------------------------------ +-type session() :: #session_state{} | pid(). + +%%%============================================================================= +%%% Session API +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc +%% Start Session. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec start({boolean(), binary(), pid()}) -> {ok, session()}. start({true = _CleanSess, ClientId, _ClientPid}) -> %%Destroy old session if CleanSess is true before. ok = emqtt_sm:destroy_session(ClientId), @@ -67,15 +82,26 @@ start({false = _CleanSess, ClientId, ClientPid}) -> {ok, SessPid} = emqtt_sm:start_session(ClientId, ClientPid), {ok, SessPid}. -%% ------------------------------------------------------------------ -%% Session API -%% ------------------------------------------------------------------ +%%------------------------------------------------------------------------------ +%% @doc +%% Resume Session. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec resume(session(), binary(), pid()) -> session(). resume(SessState = #session_state{}, _ClientId, _ClientPid) -> SessState; resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> gen_server:cast(SessPid, {resume, ClientId, ClientPid}), SessPid. +%%------------------------------------------------------------------------------ +%% @doc +%% Publish message. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec publish(session(), {mqtt_qos(), mqtt_message()}) -> session(). publish(Session, {?QOS_0, Message}) -> emqtt_router:route(Message), Session; @@ -83,7 +109,7 @@ publish(Session, {?QOS_1, Message}) -> emqtt_router:route(Message), Session; publish(SessState = #session_state{awaiting_rel = AwaitingRel}, - {?QOS_2, Message = #mqtt_message{ msgid = MsgId }}) -> + {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> %% store in awaiting_rel SessState#session_state{awaiting_rel = maps:put(MsgId, Message, AwaitingRel)}; @@ -91,7 +117,13 @@ publish(SessPid, {?QOS_2, Message}) when is_pid(SessPid) -> gen_server:cast(SessPid, {publish, ?QOS_2, Message}), SessPid. -%% PUBACK +%%------------------------------------------------------------------------------ +%% @doc +%% PubAck message. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session(). puback(SessState = #session_state{client_id = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> case maps:is_key(PacketId, Awaiting) of true -> ok; @@ -102,21 +134,22 @@ puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) -> gen_server:cast(SessPid, {puback, PacketId}), SessPid; %% PUBREC -puback(SessState = #session_state{ client_id = ClientId, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp }, {?PUBREC, PacketId}) -> +puback(SessState = #session_state{client_id = ClientId, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp}, {?PUBREC, PacketId}) -> case maps:is_key(PacketId, AwaitingAck) of true -> ok; false -> lager:warning("Session ~s: PUBREC PacketId '~p' not found!", [ClientId, PacketId]) end, - SessState#session_state{ awaiting_ack = maps:remove(PacketId, AwaitingAck), - awaiting_comp = maps:put(PacketId, true, AwaitingComp) }; + SessState#session_state{awaiting_ack = maps:remove(PacketId, AwaitingAck), + awaiting_comp = maps:put(PacketId, true, AwaitingComp)}; puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> gen_server:cast(SessPid, {pubrec, PacketId}), SessPid; %% PUBREL -puback(SessState = #session_state{client_id = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> +puback(SessState = #session_state{client_id = ClientId, + awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> case maps:find(PacketId, Awaiting) of {ok, Msg} -> emqtt_router:route(Msg); error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId]) @@ -127,18 +160,24 @@ puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) -> gen_server:cast(SessPid, {pubrel, PacketId}), SessPid; %% PUBCOMP -puback(SessState = #session_state{ client_id = ClientId, - awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) -> +puback(SessState = #session_state{client_id = ClientId, + awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) -> case maps:is_key(PacketId, AwaitingComp) of true -> ok; false -> lager:warning("Session ~s: PUBREC PacketId '~p' not exist", [ClientId, PacketId]) end, - SessState#session_state{ awaiting_comp = maps:remove(PacketId, AwaitingComp) }; + SessState#session_state{awaiting_comp = maps:remove(PacketId, AwaitingComp)}; puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> gen_server:cast(SessPid, {pubcomp, PacketId}), SessPid. -%% SUBSCRIBE +%%------------------------------------------------------------------------------ +%% @doc +%% Subscribe Topics. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}. subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) -> Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)], case Resubs of @@ -155,9 +194,13 @@ subscribe(SessPid, Topics) when is_pid(SessPid) -> {ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}), {ok, SessPid, GrantedQos}. +%%------------------------------------------------------------------------------ +%% @doc +%% Unsubscribe Topics. %% -%% @doc UNSUBSCRIBE -%% +%% @end +%%------------------------------------------------------------------------------ +-spec unsubscribe(session(), [binary()]) -> {ok, session()}. unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) -> %%TODO: refactor later. case Topics -- maps:keys(SubMap) of @@ -173,50 +216,57 @@ unsubscribe(SessPid, Topics) when is_pid(SessPid) -> gen_server:call(SessPid, {unsubscribe, Topics}), {ok, SessPid}. +%%------------------------------------------------------------------------------ +%% @doc +%% Destroy Session. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok. destroy(SessPid, ClientId) when is_pid(SessPid) -> gen_server:cast(SessPid, {destroy, ClientId}). %store message(qos1) that sent to client -store(SessState = #session_state{ message_id = MsgId, awaiting_ack = Awaiting}, - Message = #mqtt_message{ qos = Qos }) when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> +store(SessState = #session_state{message_id = MsgId, awaiting_ack = Awaiting}, + Message = #mqtt_message{qos = Qos}) when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> %%assign msgid before send - Message1 = Message#mqtt_message{ msgid = MsgId }, + Message1 = Message#mqtt_message{msgid = MsgId}, Message2 = if Qos =:= ?QOS_2 -> Message1#mqtt_message{dup = false}; true -> Message1 end, Awaiting1 = maps:put(MsgId, Message2, Awaiting), - {Message1, next_msg_id(SessState#session_state{ awaiting_ack = Awaiting1 })}. + {Message1, next_msg_id(SessState#session_state{awaiting_ack = Awaiting1})}. initial_state(ClientId) -> - #session_state { client_id = ClientId, - submap = #{}, - awaiting_ack = #{}, - awaiting_rel = #{}, - awaiting_comp = #{} }. + #session_state{client_id = ClientId, + submap = #{}, + awaiting_ack = #{}, + awaiting_rel = #{}, + awaiting_comp = #{}}. initial_state(ClientId, ClientPid) -> State = initial_state(ClientId), State#session_state{client_pid = ClientPid}. -%% ------------------------------------------------------------------ -%% gen_server Function Definitions -%% ------------------------------------------------------------------ - start_link(SessOpts, ClientId, ClientPid) -> gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []). +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= + init([SessOpts, ClientId, ClientPid]) -> process_flag(trap_exit, true), - %%TODO: Is this OK? + %%TODO: Is this OK? should monitor... true = link(ClientPid), State = initial_state(ClientId, ClientPid), Expires = proplists:get_value(expires, SessOpts, 1) * 3600, - MsgQueue = emqtt_queue:new( proplists:get_value(max_queue, SessOpts, 1000), - proplists:get_value(store_qos0, SessOpts, false) ), - {ok, State#session_state{ expires = Expires, - msg_queue = MsgQueue }, hibernate}. + MsgQueue = emqtt_queue:new(proplists:get_value(max_queue, SessOpts, 1000), + proplists:get_value(store_qos0, SessOpts, false)), + {ok, State#session_state{expires = Expires, + msg_queue = MsgQueue}, hibernate}. handle_call({subscribe, Topics}, _From, State) -> {ok, NewState, GrantedQos} = subscribe(State, Topics), @@ -229,7 +279,7 @@ handle_call({unsubscribe, Topics}, _From, State) -> handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. -handle_cast({resume, ClientId, ClientPid}, State = #session_state { +handle_cast({resume, ClientId, ClientPid}, State = #session_state{ client_id = ClientId, client_pid = undefined, msg_queue = Queue, @@ -246,7 +296,7 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state { end, maps:keys(AwaitingComp)), %% redelivery messages that awaiting PUBACK or PUBREC - Dup = fun(Msg) -> Msg#mqtt_message{ dup = true } end, + Dup = fun(Msg) -> Msg#mqtt_message{dup = true} end, lists:foreach(fun(Msg) -> ClientPid ! {dispatch, {self(), Dup(Msg)}} end, maps:values(AwaitingAck)), @@ -256,9 +306,9 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state { ClientPid ! {dispatch, {self(), Msg}} end, emqtt_queue:all(Queue)), - NewState = State#session_state{ client_pid = ClientPid, - msg_queue = emqtt_queue:clear(Queue), - expire_timer = undefined}, + NewState = State#session_state{client_pid = ClientPid, + msg_queue = emqtt_queue:clear(Queue), + expire_timer = undefined}, {noreply, NewState, hibernate}; handle_cast({publish, ?QOS_2, Message}, State) -> @@ -291,11 +341,12 @@ handle_cast(Msg, State) -> handle_info({dispatch, {_From, Message}}, State) -> {noreply, dispatch(Message, State)}; -handle_info({'EXIT', ClientPid, Reason}, State = #session_state{ - client_id = ClientId, client_pid = ClientPid, expires = Expires}) -> +handle_info({'EXIT', ClientPid, Reason}, State = #session_state{client_id = ClientId, + client_pid = ClientPid, + expires = Expires}) -> lager:warning("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]), Timer = erlang:send_after(Expires * 1000, self(), session_expired), - {noreply, State#session_state{ client_pid = undefined, expire_timer = Timer}}; + {noreply, State#session_state{client_pid = undefined, expire_timer = Timer}}; handle_info(session_expired, State = #session_state{client_id = ClientId}) -> lager:warning("Session ~s expired!", [ClientId]), @@ -310,21 +361,20 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ +%%%============================================================================= +%%% Internal functions +%%%============================================================================= -dispatch(Message, State = #session_state{ client_id = ClientId, - client_pid = undefined }) -> +dispatch(Message, State = #session_state{client_id = ClientId, + client_pid = undefined}) -> queue(ClientId, Message, State); -dispatch(Message = #mqtt_message{ qos = ?QOS_0 }, State = #session_state{ - client_pid = ClientPid }) -> +dispatch(Message = #mqtt_message{qos = ?QOS_0}, State = #session_state{client_pid = ClientPid}) -> ClientPid ! {dispatch, {self(), Message}}, State; -dispatch(Message = #mqtt_message{ qos = Qos }, State = #session_state{ client_pid = ClientPid }) - when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> +dispatch(Message = #mqtt_message{qos = Qos}, State = #session_state{client_pid = ClientPid}) + when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> {Message1, NewState} = store(State, Message), ClientPid ! {dispatch, {self(), Message1}}, NewState. @@ -332,10 +382,10 @@ dispatch(Message = #mqtt_message{ qos = Qos }, State = #session_state{ client_pi queue(ClientId, Message, State = #session_state{msg_queue = Queue}) -> State#session_state{msg_queue = emqtt_queue:in(ClientId, Message, Queue)}. -next_msg_id(State = #session_state{ message_id = 16#ffff }) -> - State#session_state{ message_id = 1 }; +next_msg_id(State = #session_state{message_id = 16#ffff}) -> + State#session_state{message_id = 1}; + +next_msg_id(State = #session_state{message_id = MsgId}) -> + State#session_state{message_id = MsgId + 1}. -next_msg_id(State = #session_state{ message_id = MsgId }) -> - State#session_state{ message_id = MsgId + 1 }. -