messages statistics

This commit is contained in:
Ery Lee 2015-03-09 16:41:39 +08:00
parent f2293c118a
commit a5573d0c50
6 changed files with 313 additions and 222 deletions

View File

@ -68,37 +68,38 @@
%% Bytes sent and received of Broker %% Bytes sent and received of Broker
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-define(SYSTOP_BYTES, [ -define(SYSTOP_BYTES, [
'bytes/received', % Total bytes received {counter, 'bytes/received'}, % Total bytes received
'bytes/sent' % Total bytes sent {counter, 'bytes/sent'} % Total bytes sent
]). ]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Packets sent and received of Broker %% Packets sent and received of Broker
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-define(SYSTOP_PACKETS, [ -define(SYSTOP_PACKETS, [
'packets/received', % All Packets received {counter, 'packets/received'}, % All Packets received
'packets/sent', % All Packets sent {counter, 'packets/sent'}, % All Packets sent
'packets/connect', % CONNECT Packets received {counter, 'packets/connect'}, % CONNECT Packets received
'packets/connack', % CONNACK Packets sent {counter, 'packets/connack'}, % CONNACK Packets sent
'packets/publish/received', % PUBLISH packets received {counter, 'packets/publish/received'}, % PUBLISH packets received
'packets/publish/sent', % PUBLISH packets sent {counter, 'packets/publish/sent'}, % PUBLISH packets sent
'packets/subscribe', % SUBSCRIBE Packets received {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received
'packets/suback', % SUBACK packets sent {counter, 'packets/suback'}, % SUBACK packets sent
'packets/unsubscribe', % UNSUBSCRIBE Packets received {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received
'packets/unsuback', % UNSUBACK Packets sent {counter, 'packets/unsuback'}, % UNSUBACK Packets sent
'packets/pingreq', % PINGREQ packets received {counter, 'packets/pingreq'}, % PINGREQ packets received
'packets/pingresp', % PINGRESP Packets sent {counter, 'packets/pingresp'}, % PINGRESP Packets sent
'packets/disconnect' % DISCONNECT Packets received {counter, 'packets/disconnect'} % DISCONNECT Packets received
]). ]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Messages sent and received of broker %% Messages sent and received of broker
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-define(SYSTOP_MESSAGES, [ -define(SYSTOP_MESSAGES, [
'messages/received', % Messages received {counter, 'messages/received'}, % Messages received
'messages/sent', % Messages sent {counter, 'messages/sent'}, % Messages sent
'messages/retained/count',% Messagea retained {gauge, 'messages/retained/count'},% Messagea retained
'messages/stored', % Messages stored {gauge, 'messages/stored/count'}, % Messages stored
'messages/dropped' % Messages dropped {counter, 'messages/dropped'} % Messages dropped
]). ]).

View File

@ -40,8 +40,9 @@
-export([start_link/1]). -export([start_link/1]).
-export([all/0, value/1, -export([all/0, value/1,
inc/1, inc/2, inc/1, inc/2, inc/3,
dec/1, dec/2]). dec/2, dec/3,
set/2]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -82,7 +83,7 @@ all() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
%% Get metric value %% Get metric value.
%% %%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -92,23 +93,49 @@ value(Metric) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
%% Increase metric value %% Increase counter.
%% %%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec inc(atom()) -> non_neg_integer(). -spec inc(atom()) -> non_neg_integer().
inc(Metric) -> inc(Metric) ->
inc(Metric, 1). inc(counter, Metric, 1).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
%% Increase metric value %% Increase metric value.
%% %%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec inc(atom(), pos_integer()) -> pos_integer(). -spec inc(counter | gauge, atom()) -> non_neg_integer().
inc(Metric, Val) -> inc(gauge, Metric) ->
ets:update_counter(?METRIC_TAB, key(Metric), {2, Val}). inc(gauge, Metric, 1);
inc(counter, Metric) ->
inc(counter, Metric, 1);
inc(Metric, Val) when is_atom(Metric) and is_integer(Val) ->
inc(counter, Metric, Val).
%%------------------------------------------------------------------------------
%% @doc
%% Increase metric value.
%%
%% @end
%%------------------------------------------------------------------------------
-spec inc(counter | gauge, atom(), pos_integer()) -> pos_integer().
inc(gauge, Metric, Val) ->
ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, Val});
inc(counter, Metric, Val) ->
ets:update_counter(?METRIC_TAB, key(counter, Metric), {2, Val}).
%%------------------------------------------------------------------------------
%% @doc
%% Decrease metric value.
%%
%% @end
%%------------------------------------------------------------------------------
-spec dec(gauge, atom()) -> integer().
dec(gauge, Metric) ->
dec(gauge, Metric, 1).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -116,20 +143,20 @@ inc(Metric, Val) ->
%% %%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec dec(atom()) -> integer(). -spec dec(gauge, atom(), pos_integer()) -> integer().
dec(Metric) -> dec(gauge, Metric, Val) ->
dec(Metric, 1). ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, -Val}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
%% Decrease metric value %% Set metric value.
%% %%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec dec(atom(), pos_integer()) -> integer(). set(Metric, Val) when is_atom(Metric) ->
dec(Metric, Val) -> set(gauge, Metric, Val).
%TODO: ok? set(gauge, Metric, Val) ->
ets:update_counter(?METRIC_TAB, key(Metric), {2, -Val}). ets:insert(?METRIC_TAB, key(gauge, Metric), Val).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -138,38 +165,40 @@ dec(Metric, Val) ->
%% %%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
key(Metric) -> key(gauge, Metric) ->
{Metric, 0};
key(counter, Metric) ->
{Metric, erlang:system_info(scheduler_id)}. {Metric, erlang:system_info(scheduler_id)}.
%% ------------------------------------------------------------------ %%%=============================================================================
%% gen_server Function Definitions %%% gen_server callbacks
%% ------------------------------------------------------------------ %%%=============================================================================
init(Options) -> init(Options) ->
random:seed(now()), random:seed(now()),
Topics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
% Create metrics table % Create metrics table
ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
% Init metrics % Init metrics
[new_metric(Topic) || Topic <- Topics], [new_metric(Metric) || Metric <- Metrics],
% $SYS Topics for metrics % $SYS Topics for metrics
[{atomic, _} = emqtt_pubsub:create(systop(Topic)) || Topic <- Topics], [{atomic, _} = emqtt_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics],
PubInterval = proplists:get_value(pub_interval, Options, 60), PubInterval = proplists:get_value(pub_interval, Options, 60),
{ok, tick(random:uniform(PubInterval), #state{pub_interval = PubInterval}), hibernate}. {ok, tick(random:uniform(PubInterval), #state{pub_interval = PubInterval}), hibernate}.
handle_call(_Request, _From, State) -> handle_call(Req, _From, State) ->
{reply, ok, State}. {stop, {badreq, Req}, State}.
handle_cast(_Msg, State) -> handle_cast(Msg, State) ->
{noreply, State}. {stop, {badmsg, Msg}, State}.
handle_info(tick, State) -> handle_info(tick, State) ->
% publish metric message % publish metric message
[publish(systop(Metric), i2b(Val))|| {Metric, Val} <- all()], [publish(systop(Metric), i2b(Val))|| {Metric, Val} <- all()],
{noreply, tick(State), hibernate}; {noreply, tick(State), hibernate};
handle_info(_Info, State) -> handle_info(Info, State) ->
{noreply, State}. {stop, {badinfo, Info}, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
ok. ok.
@ -187,7 +216,10 @@ systop(Name) when is_atom(Name) ->
publish(Topic, Payload) -> publish(Topic, Payload) ->
emqtt_router:route(#mqtt_message{topic = Topic, payload = Payload}). emqtt_router:route(#mqtt_message{topic = Topic, payload = Payload}).
new_metric(Name) -> new_metric({gauge, Name}) ->
ets:insert(?METRIC_TAB, {{Name, 0}, 0});
new_metric({counter, Name}) ->
Schedulers = lists:seq(1, erlang:system_info(schedulers)), Schedulers = lists:seq(1, erlang:system_info(schedulers)),
[ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers]. [ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
@ -200,3 +232,4 @@ tick(Delay, State) ->
i2b(I) -> i2b(I) ->
list_to_binary(integer_to_list(I)). list_to_binary(integer_to_list(I)).

View File

@ -318,3 +318,4 @@ inc(?PINGRESP) ->
inc(_) -> inc(_) ->
ingore. ingore.

View File

@ -63,7 +63,6 @@
terminate/2, terminate/2,
code_change/3]). code_change/3]).
-record(state, {max_subs = 0}). -record(state, {max_subs = 0}).
%%%============================================================================= %%%=============================================================================
@ -148,14 +147,14 @@ publish(Msg=#mqtt_message{topic=Topic}) ->
-spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any(). -spec publish(Topic :: binary(), Msg :: mqtt_message()) -> any().
publish(Topic, Msg) when is_binary(Topic) -> publish(Topic, Msg) when is_binary(Topic) ->
lists:foreach(fun(#topic{name=Name, node=Node}) -> Count =
lists:foldl(fun(#topic{name=Name, node=Node}, Acc) ->
case Node =:= node() of case Node =:= node() of
true -> dispatch(Name, Msg); true -> dispatch(Name, Msg) + Acc;
false -> rpc:call(Node, ?MODULE, dispatch, [Name, Msg]) false -> rpc:call(Node, ?MODULE, dispatch, [Name, Msg]) + Acc
end end
end, match(Topic)). end, 0, match(Topic)),
dropped(Count =:= 0).
%%TODO: dispatch counts....
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -163,14 +162,18 @@ publish(Topic, Msg) when is_binary(Topic) ->
%% %%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
lists:foreach(fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) -> Subscribers = ets:lookup(topic_subscriber, Topic),
lists:foreach(
fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) ->
Msg1 = if Msg1 = if
Qos > SubQos -> Msg#mqtt_message{qos = SubQos}; Qos > SubQos -> Msg#mqtt_message{qos = SubQos};
true -> Msg true -> Msg
end, end,
SubPid ! {dispatch, {self(), Msg1}} SubPid ! {dispatch, {self(), Msg1}}
end, ets:lookup(topic_subscriber, Topic)). end, Subscribers),
length(Subscribers).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -408,4 +411,8 @@ setstats(State = #state{max_subs = Max}) ->
State State
end. end.
dropped(true) ->
emqtt_metrics:inc('messages/dropped');
dropped(false) ->
ok.

View File

@ -1,66 +1,65 @@
%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io> %%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%% %%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal %%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights %%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell %%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is %%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions: %%% furnished to do so, subject to the following conditions:
%% %%%
%% The above copyright notice and this permission notice shall be included in all %%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software. %%% copies or substantial portions of the Software.
%% %%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR %%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, %%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE %%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER %%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, %%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE. %%% SOFTWARE.
%%------------------------------------------------------------------------------ %%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt server. retain messages???
%%% TODO: redesign...
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_server). -module(emqtt_server).
-author('feng@slimpp.io'). -author('feng@slimpp.io').
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-include("emqtt.hrl"). -include("emqtt.hrl").
-include("emqtt_topic.hrl"). -include("emqtt_topic.hrl").
-include("emqtt_packet.hrl"). -include("emqtt_packet.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
-define(RETAINED_TAB, mqtt_retained).
-define(STORE_LIMIT, 100000).
-record(mqtt_retained, {topic, qos, payload}). -record(mqtt_retained, {topic, qos, payload}).
-record(state, {store_limit}). -record(state, {store_limit}).
%% ------------------------------------------------------------------ -define(RETAINED_TAB, mqtt_retained).
%% API Function Exports
%% ------------------------------------------------------------------
%%TODO: subscribe -define(STORE_LIMIT, 1000000).
%% API Function Exports
-export([start_link/1, retain/1, subscribe/2]). -export([start_link/1, retain/1, subscribe/2]).
%% ------------------------------------------------------------------
%% gen_server Function Exports %% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% ------------------------------------------------------------------ %%%=============================================================================
%% API Function Definitions %%% API
%% ------------------------------------------------------------------ %%%=============================================================================
start_link(RetainOpts) -> -spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}.
gen_server:start_link({local, ?SERVER}, ?MODULE, [RetainOpts], []). start_link(Opts) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
retain(#mqtt_message{retain = false}) -> ignore; retain(#mqtt_message{retain = false}) -> ignore;
@ -71,26 +70,24 @@ retain(#mqtt_message{ retain = true, topic = Topic, payload = <<>> }) ->
retain(Msg = #mqtt_message{retain = true}) -> retain(Msg = #mqtt_message{retain = true}) ->
gen_server:cast(?SERVER, {retain, Msg}). gen_server:cast(?SERVER, {retain, Msg}).
%% %% TODO: this is not right???
subscribe(Topics, CPid) when is_pid(CPid) -> subscribe(Topics, CPid) when is_pid(CPid) ->
lager:info("Retained Topics: ~p", [match(Topics)]),
RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)]), RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)]),
lager:info("Retained Messages: ~p", [RetainedMsgs]),
lists:foreach(fun(Msg) -> lists:foreach(fun(Msg) ->
CPid ! {dispatch, {self(), retained_msg(Msg)}} CPid ! {dispatch, {self(), retained_msg(Msg)}}
end, RetainedMsgs). end, RetainedMsgs).
%% ------------------------------------------------------------------ %%%=============================================================================
%% gen_server Function Definitions %%% gen_server callbacks
%% ------------------------------------------------------------------ %%%=============================================================================
init([RetainOpts]) -> init([Opts]) ->
mnesia:create_table(mqtt_retained, [ mnesia:create_table(mqtt_retained, [
{type, ordered_set}, {type, ordered_set},
{ram_copies, [node()]}, {ram_copies, [node()]},
{attributes, record_info(fields, mqtt_retained)}]), {attributes, record_info(fields, mqtt_retained)}]),
mnesia:add_table_copy(mqtt_retained, node(), ram_copies), mnesia:add_table_copy(mqtt_retained, node(), ram_copies),
Limit = proplists:get_value(store_limit, RetainOpts, ?STORE_LIMIT), Limit = proplists:get_value(store_limit, Opts, ?STORE_LIMIT),
{ok, #state{store_limit = Limit}}. {ok, #state{store_limit = Limit}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
@ -98,16 +95,17 @@ handle_call(Req, _From, State) ->
handle_cast({retain, Msg = #mqtt_message{qos = Qos, handle_cast({retain, Msg = #mqtt_message{qos = Qos,
topic = Topic, topic = Topic,
payload = Payload }}, State = #state{store_limit = Limit}) -> payload = Payload}},
State = #state{store_limit = Limit}) ->
case mnesia:table_info(?RETAINED_TAB, size) of case mnesia:table_info(?RETAINED_TAB, size) of
Size when Size >= Limit -> Size when Size >= Limit ->
lager:error("Server dropped message(retain) for table is full: ~p", [Msg]); lager:error("Dropped message(retain) for table is full: ~p", [Msg]);
_ -> Size ->
%emqtt_metrics:update('messages/retained', Size), lager:debug("Retained message: ~p", [Msg]),
lager:info("Server retained message: ~p", [Msg]), mnesia:dirty_write(#mqtt_retained{qos = Qos,
mnesia:dirty_write(#mqtt_retained{ topic = Topic, topic = Topic,
qos = Qos, payload = Payload}),
payload = Payload }) emqtt_metrics:set('messages/retained/count', Size)
end, end,
{noreply, State}; {noreply, State};
@ -123,9 +121,10 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%% ------------------------------------------------------------------ %%%=============================================================================
%% Internal Function Definitions %%% Internal functions
%% ------------------------------------------------------------------ %%%=============================================================================
match(Topics) -> match(Topics) ->
RetainedTopics = mnesia:dirty_all_keys(?RETAINED_TAB), RetainedTopics = mnesia:dirty_all_keys(?RETAINED_TAB),
lists:flatten([match(Topic, RetainedTopics) || Topic <- Topics]). lists:flatten([match(Topic, RetainedTopics) || Topic <- Topics]).

View File

@ -1,45 +1,50 @@
%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io> %%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
%% %%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy %%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal %%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights %%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell %%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is %%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions: %%% furnished to do so, subject to the following conditions:
%% %%%
%% The above copyright notice and this permission notice shall be included in all %%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software. %%% copies or substantial portions of the Software.
%% %%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR %%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, %%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE %%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER %%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, %%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE. %%% SOFTWARE.
%%------------------------------------------------------------------------------ %%%-----------------------------------------------------------------------------
%%% @doc
%%% emqtt session.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqtt_session). -module(emqtt_session).
-include("emqtt.hrl"). -include("emqtt.hrl").
-include("emqtt_packet.hrl"). -include("emqtt_packet.hrl").
%% ------------------------------------------------------------------
%% API Function Exports %% API Function Exports
%% ------------------------------------------------------------------ -export([start/1,
-export([start/1, resume/3, publish/2, puback/2, subscribe/2, unsubscribe/2, destroy/2]). resume/3,
publish/2,
puback/2,
subscribe/2,
unsubscribe/2,
destroy/2]).
-export([store/2]). -export([store/2]).
%%start gen_server %% Start gen_server
-export([start_link/3]). -export([start_link/3]).
%% ------------------------------------------------------------------
%% gen_server Function Exports %% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
@ -55,9 +60,19 @@
expires, expires,
expire_timer}). expire_timer}).
%% ------------------------------------------------------------------ -type session() :: #session_state{} | pid().
%% Start Session
%% ------------------------------------------------------------------ %%%=============================================================================
%%% Session API
%%%=============================================================================
%%------------------------------------------------------------------------------
%% @doc
%% Start Session.
%%
%% @end
%%------------------------------------------------------------------------------
-spec start({boolean(), binary(), pid()}) -> {ok, session()}.
start({true = _CleanSess, ClientId, _ClientPid}) -> start({true = _CleanSess, ClientId, _ClientPid}) ->
%%Destroy old session if CleanSess is true before. %%Destroy old session if CleanSess is true before.
ok = emqtt_sm:destroy_session(ClientId), ok = emqtt_sm:destroy_session(ClientId),
@ -67,15 +82,26 @@ start({false = _CleanSess, ClientId, ClientPid}) ->
{ok, SessPid} = emqtt_sm:start_session(ClientId, ClientPid), {ok, SessPid} = emqtt_sm:start_session(ClientId, ClientPid),
{ok, SessPid}. {ok, SessPid}.
%% ------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Session API %% @doc
%% ------------------------------------------------------------------ %% Resume Session.
%%
%% @end
%%------------------------------------------------------------------------------
-spec resume(session(), binary(), pid()) -> session().
resume(SessState = #session_state{}, _ClientId, _ClientPid) -> resume(SessState = #session_state{}, _ClientId, _ClientPid) ->
SessState; SessState;
resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
gen_server:cast(SessPid, {resume, ClientId, ClientPid}), gen_server:cast(SessPid, {resume, ClientId, ClientPid}),
SessPid. SessPid.
%%------------------------------------------------------------------------------
%% @doc
%% Publish message.
%%
%% @end
%%------------------------------------------------------------------------------
-spec publish(session(), {mqtt_qos(), mqtt_message()}) -> session().
publish(Session, {?QOS_0, Message}) -> publish(Session, {?QOS_0, Message}) ->
emqtt_router:route(Message), Session; emqtt_router:route(Message), Session;
@ -91,7 +117,13 @@ publish(SessPid, {?QOS_2, Message}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {publish, ?QOS_2, Message}), gen_server:cast(SessPid, {publish, ?QOS_2, Message}),
SessPid. SessPid.
%% PUBACK %%------------------------------------------------------------------------------
%% @doc
%% PubAck message.
%%
%% @end
%%------------------------------------------------------------------------------
-spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session().
puback(SessState = #session_state{client_id = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> puback(SessState = #session_state{client_id = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) ->
case maps:is_key(PacketId, Awaiting) of case maps:is_key(PacketId, Awaiting) of
true -> ok; true -> ok;
@ -116,7 +148,8 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubrec, PacketId}), SessPid; gen_server:cast(SessPid, {pubrec, PacketId}), SessPid;
%% PUBREL %% PUBREL
puback(SessState = #session_state{client_id = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> puback(SessState = #session_state{client_id = ClientId,
awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
case maps:find(PacketId, Awaiting) of case maps:find(PacketId, Awaiting) of
{ok, Msg} -> emqtt_router:route(Msg); {ok, Msg} -> emqtt_router:route(Msg);
error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId]) error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId])
@ -138,7 +171,13 @@ puback(SessState = #session_state{ client_id = ClientId,
puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubcomp, PacketId}), SessPid. gen_server:cast(SessPid, {pubcomp, PacketId}), SessPid.
%% SUBSCRIBE %%------------------------------------------------------------------------------
%% @doc
%% Subscribe Topics.
%%
%% @end
%%------------------------------------------------------------------------------
-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}.
subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) -> subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) ->
Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)], Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)],
case Resubs of case Resubs of
@ -155,9 +194,13 @@ subscribe(SessPid, Topics) when is_pid(SessPid) ->
{ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}), {ok, GrantedQos} = gen_server:call(SessPid, {subscribe, Topics}),
{ok, SessPid, GrantedQos}. {ok, SessPid, GrantedQos}.
%%------------------------------------------------------------------------------
%% @doc
%% Unsubscribe Topics.
%% %%
%% @doc UNSUBSCRIBE %% @end
%% %%------------------------------------------------------------------------------
-spec unsubscribe(session(), [binary()]) -> {ok, session()}.
unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) -> unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) ->
%%TODO: refactor later. %%TODO: refactor later.
case Topics -- maps:keys(SubMap) of case Topics -- maps:keys(SubMap) of
@ -173,6 +216,13 @@ unsubscribe(SessPid, Topics) when is_pid(SessPid) ->
gen_server:call(SessPid, {unsubscribe, Topics}), gen_server:call(SessPid, {unsubscribe, Topics}),
{ok, SessPid}. {ok, SessPid}.
%%------------------------------------------------------------------------------
%% @doc
%% Destroy Session.
%%
%% @end
%%------------------------------------------------------------------------------
-spec destroy(SessPid :: pid(), ClientId :: binary()) -> ok.
destroy(SessPid, ClientId) when is_pid(SessPid) -> destroy(SessPid, ClientId) when is_pid(SessPid) ->
gen_server:cast(SessPid, {destroy, ClientId}). gen_server:cast(SessPid, {destroy, ClientId}).
@ -200,16 +250,16 @@ initial_state(ClientId, ClientPid) ->
State = initial_state(ClientId), State = initial_state(ClientId),
State#session_state{client_pid = ClientPid}. State#session_state{client_pid = ClientPid}.
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
start_link(SessOpts, ClientId, ClientPid) -> start_link(SessOpts, ClientId, ClientPid) ->
gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []). gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []).
%%%=============================================================================
%%% gen_server callbacks
%%%=============================================================================
init([SessOpts, ClientId, ClientPid]) -> init([SessOpts, ClientId, ClientPid]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
%%TODO: Is this OK? %%TODO: Is this OK? should monitor...
true = link(ClientPid), true = link(ClientPid),
State = initial_state(ClientId, ClientPid), State = initial_state(ClientId, ClientPid),
Expires = proplists:get_value(expires, SessOpts, 1) * 3600, Expires = proplists:get_value(expires, SessOpts, 1) * 3600,
@ -291,8 +341,9 @@ handle_cast(Msg, State) ->
handle_info({dispatch, {_From, Message}}, State) -> handle_info({dispatch, {_From, Message}}, State) ->
{noreply, dispatch(Message, State)}; {noreply, dispatch(Message, State)};
handle_info({'EXIT', ClientPid, Reason}, State = #session_state{ handle_info({'EXIT', ClientPid, Reason}, State = #session_state{client_id = ClientId,
client_id = ClientId, client_pid = ClientPid, expires = Expires}) -> client_pid = ClientPid,
expires = Expires}) ->
lager:warning("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]), lager:warning("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]),
Timer = erlang:send_after(Expires * 1000, self(), session_expired), Timer = erlang:send_after(Expires * 1000, self(), session_expired),
{noreply, State#session_state{client_pid = undefined, expire_timer = Timer}}; {noreply, State#session_state{client_pid = undefined, expire_timer = Timer}};
@ -310,16 +361,15 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
%% ------------------------------------------------------------------ %%%=============================================================================
%% Internal Function Definitions %%% Internal functions
%% ------------------------------------------------------------------ %%%=============================================================================
dispatch(Message, State = #session_state{client_id = ClientId, dispatch(Message, State = #session_state{client_id = ClientId,
client_pid = undefined}) -> client_pid = undefined}) ->
queue(ClientId, Message, State); queue(ClientId, Message, State);
dispatch(Message = #mqtt_message{ qos = ?QOS_0 }, State = #session_state{ dispatch(Message = #mqtt_message{qos = ?QOS_0}, State = #session_state{client_pid = ClientPid}) ->
client_pid = ClientPid }) ->
ClientPid ! {dispatch, {self(), Message}}, ClientPid ! {dispatch, {self(), Message}},
State; State;