client, session, topics, subscribers statatistics
This commit is contained in:
parent
b26f6f1b1d
commit
778c34f11d
|
@ -34,7 +34,7 @@
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-define(TABLE, ?MODULE).
|
-define(BROKER_TAB, ?MODULE).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
|
@ -42,7 +42,10 @@
|
||||||
|
|
||||||
-export([start_link/1]).
|
-export([start_link/1]).
|
||||||
|
|
||||||
-export([version/0, uptime/0, datetime/0, description/0]).
|
-export([version/0, uptime/0, datetime/0, sysdescr/0]).
|
||||||
|
|
||||||
|
%% Statistics.
|
||||||
|
-export([getstats/0, getstat/1, setstat/2]).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%% ------------------------------------------------------------------
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
|
@ -53,42 +56,109 @@
|
||||||
|
|
||||||
-record(state, {started_at, sys_interval, tick_timer}).
|
-record(state, {started_at, sys_interval, tick_timer}).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
%% API Function Definitions
|
%%% API
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Start emqtt broker.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}.
|
||||||
start_link(Options) ->
|
start_link(Options) ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Get broker version.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec version() -> string().
|
||||||
version() ->
|
version() ->
|
||||||
{ok, Version} = application:get_key(emqtt, vsn), Version.
|
{ok, Version} = application:get_key(emqtt, vsn), Version.
|
||||||
|
|
||||||
description() ->
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Get broker description.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec sysdescr() -> string().
|
||||||
|
sysdescr() ->
|
||||||
{ok, Descr} = application:get_key(emqtt, description), Descr.
|
{ok, Descr} = application:get_key(emqtt, description), Descr.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Get broker uptime.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec uptime() -> string().
|
||||||
uptime() ->
|
uptime() ->
|
||||||
gen_server:call(?SERVER, uptime).
|
gen_server:call(?SERVER, uptime).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Get broker datetime.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec datetime() -> string().
|
||||||
datetime() ->
|
datetime() ->
|
||||||
{{Y, M, D}, {H, MM, S}} = calendar:local_time(),
|
{{Y, M, D}, {H, MM, S}} = calendar:local_time(),
|
||||||
lists:flatten(
|
lists:flatten(
|
||||||
io_lib:format(
|
io_lib:format(
|
||||||
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
|
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% gen_server Function Definitions
|
%% @doc
|
||||||
%% ------------------------------------------------------------------
|
%% Get broker statistics.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec getstats() -> [{atom(), non_neg_integer()}].
|
||||||
|
getstats() ->
|
||||||
|
ets:tab2list(?BROKER_TAB).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Get stats by name.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec getstat(atom()) -> non_neg_integer() | undefined.
|
||||||
|
getstat(Name) ->
|
||||||
|
case ets:lookup(?BROKER_TAB, Name) of
|
||||||
|
[{Name, Val}] -> Val;
|
||||||
|
[] -> undefined
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Set broker stats.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec setstat(atom(), pos_integer()) -> boolean().
|
||||||
|
setstat(Name, Val) ->
|
||||||
|
ets:insert(?BROKER_TAB, {Name, Val}).
|
||||||
|
|
||||||
|
%%%=============================================================================
|
||||||
|
%%% gen_server callbacks
|
||||||
|
%%%=============================================================================
|
||||||
|
|
||||||
init([Options]) ->
|
init([Options]) ->
|
||||||
random:seed(now()),
|
random:seed(now()),
|
||||||
SysInterval = proplists:get_value(sys_interval, Options, 60),
|
|
||||||
% Create $SYS Topics
|
% Create $SYS Topics
|
||||||
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_BROKERS],
|
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_BROKERS],
|
||||||
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_CLIENTS],
|
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_CLIENTS],
|
||||||
|
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_SESSIONS],
|
||||||
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_PUBSUB],
|
[{atomic, _} = create(systop(Name)) || Name <- ?SYSTOP_PUBSUB],
|
||||||
ets:new(?MODULE, [set, public, named_table, {write_concurrency, true}]),
|
ets:new(?BROKER_TAB, [set, public, named_table, {write_concurrency, true}]),
|
||||||
[ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_CLIENTS],
|
SysInterval = proplists:get_value(sys_interval, Options, 60),
|
||||||
[ets:insert(?TABLE, {Name, 0}) || Name <- ?SYSTOP_PUBSUB],
|
|
||||||
% retain version, description
|
|
||||||
State = #state{started_at = os:timestamp(), sys_interval = SysInterval},
|
State = #state{started_at = os:timestamp(), sys_interval = SysInterval},
|
||||||
{ok, tick(random:uniform(SysInterval), State)}.
|
{ok, tick(random:uniform(SysInterval), State)}.
|
||||||
|
|
||||||
|
@ -103,13 +173,11 @@ handle_cast(_Msg, State) ->
|
||||||
|
|
||||||
handle_info(tick, State) ->
|
handle_info(tick, State) ->
|
||||||
retain(systop(version), list_to_binary(version())),
|
retain(systop(version), list_to_binary(version())),
|
||||||
retain(systop(description), list_to_binary(description())),
|
retain(systop(sysdescr), list_to_binary(sysdescr())),
|
||||||
publish(systop(uptime), list_to_binary(uptime(State))),
|
publish(systop(uptime), list_to_binary(uptime(State))),
|
||||||
publish(systop(datetime), list_to_binary(datetime())),
|
publish(systop(datetime), list_to_binary(datetime())),
|
||||||
%%TODO... call emqtt_cm here?
|
[publish(systop(Stat), i2b(Val))
|
||||||
[publish(systop(Stat), i2b(Val)) || {Stat, Val} <- emqtt_cm:stats()],
|
|| {Stat, Val} <- ets:tab2list(?BROKER_TAB)],
|
||||||
%%TODO... call emqtt_pubsub here?
|
|
||||||
[publish(systop(Stat), i2b(Val)) || {Stat, Val} <- emqtt_pubsub:stats()],
|
|
||||||
{noreply, tick(State)};
|
{noreply, tick(State)};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
|
@ -121,9 +189,10 @@ terminate(_Reason, _State) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
%% Internal Function Definitions
|
%%% Internal functions
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
|
|
||||||
systop(Name) when is_atom(Name) ->
|
systop(Name) when is_atom(Name) ->
|
||||||
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
|
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
|
||||||
|
|
||||||
|
|
|
@ -219,6 +219,7 @@ received_stats(?PACKET(Type)) ->
|
||||||
inc(?CONNECT) ->
|
inc(?CONNECT) ->
|
||||||
emqtt_metrics:inc('packets/connect');
|
emqtt_metrics:inc('packets/connect');
|
||||||
inc(?PUBLISH) ->
|
inc(?PUBLISH) ->
|
||||||
|
emqtt_metrics:inc('messages/received'),
|
||||||
emqtt_metrics:inc('packets/publish/received');
|
emqtt_metrics:inc('packets/publish/received');
|
||||||
inc(?SUBSCRIBE) ->
|
inc(?SUBSCRIBE) ->
|
||||||
emqtt_metrics:inc('packets/subscribe');
|
emqtt_metrics:inc('packets/subscribe');
|
||||||
|
|
|
@ -1,26 +1,29 @@
|
||||||
%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
|
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||||
%%
|
%%%
|
||||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
%% of this software and associated documentation files (the "Software"), to deal
|
%%% of this software and associated documentation files (the "Software"), to deal
|
||||||
%% in the Software without restriction, including without limitation the rights
|
%%% in the Software without restriction, including without limitation the rights
|
||||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
%% copies of the Software, and to permit persons to whom the Software is
|
%%% copies of the Software, and to permit persons to whom the Software is
|
||||||
%% furnished to do so, subject to the following conditions:
|
%%% furnished to do so, subject to the following conditions:
|
||||||
%%
|
%%%
|
||||||
%% The above copyright notice and this permission notice shall be included in all
|
%%% The above copyright notice and this permission notice shall be included in all
|
||||||
%% copies or substantial portions of the Software.
|
%%% copies or substantial portions of the Software.
|
||||||
%%
|
%%%
|
||||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%------------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%%% @doc
|
||||||
%client manager
|
%%% emqtt client manager.
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqtt_cm).
|
-module(emqtt_cm).
|
||||||
|
|
||||||
-author('feng@emqtt.io').
|
-author('feng@emqtt.io').
|
||||||
|
@ -29,21 +32,16 @@
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-define(TABLE, emqtt_client).
|
-define(CLIENT_TAB, emqtt_client).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
%% API Function Exports
|
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
%% API Exports
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
-export([lookup/1, register/2, unregister/2]).
|
-export([lookup/1, register/2, unregister/2]).
|
||||||
|
|
||||||
-export([stats/0]).
|
-export([getstats/0]).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
|
|
||||||
-export([init/1,
|
-export([init/1,
|
||||||
handle_call/3,
|
handle_call/3,
|
||||||
|
@ -54,15 +52,26 @@
|
||||||
|
|
||||||
-record(state, {max = 0}).
|
-record(state, {max = 0}).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
%% API Function Definitions
|
%%% API
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Start client manager.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Lookup client pid with clientId.
|
||||||
%%
|
%%
|
||||||
%% @doc lookup client pid with clientId.
|
%% @end
|
||||||
%%
|
%%------------------------------------------------------------------------------
|
||||||
-spec lookup(ClientId :: binary()) -> pid() | undefined.
|
-spec lookup(ClientId :: binary()) -> pid() | undefined.
|
||||||
lookup(ClientId) when is_binary(ClientId) ->
|
lookup(ClientId) when is_binary(ClientId) ->
|
||||||
case ets:lookup(emqtt_client, ClientId) of
|
case ets:lookup(emqtt_client, ClientId) of
|
||||||
|
@ -70,33 +79,45 @@ lookup(ClientId) when is_binary(ClientId) ->
|
||||||
[] -> undefined
|
[] -> undefined
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Register clientId with pid.
|
||||||
%%
|
%%
|
||||||
%% @doc register clientId with pid.
|
%% @end
|
||||||
%%
|
%%------------------------------------------------------------------------------
|
||||||
-spec register(ClientId :: binary(), Pid :: pid()) -> ok.
|
-spec register(ClientId :: binary(), Pid :: pid()) -> ok.
|
||||||
register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
|
register(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
|
||||||
gen_server:call(?SERVER, {register, ClientId, Pid}).
|
gen_server:call(?SERVER, {register, ClientId, Pid}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Unregister clientId with pid.
|
||||||
%%
|
%%
|
||||||
%% @doc unregister clientId with pid.
|
%% @end
|
||||||
%%
|
%%------------------------------------------------------------------------------
|
||||||
-spec unregister(ClientId :: binary(), Pid :: pid()) -> ok.
|
-spec unregister(ClientId :: binary(), Pid :: pid()) -> ok.
|
||||||
unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
|
unregister(ClientId, Pid) when is_binary(ClientId), is_pid(Pid) ->
|
||||||
gen_server:cast(?SERVER, {unregister, ClientId, Pid}).
|
gen_server:cast(?SERVER, {unregister, ClientId, Pid}).
|
||||||
|
|
||||||
stats() ->
|
%%------------------------------------------------------------------------------
|
||||||
gen_server:call(?SERVER, stats).
|
%% @doc
|
||||||
|
%% Get statistics of client manager.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
getstats() ->
|
||||||
|
gen_server:call(?SERVER, getstats).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
%% gen_server Function Definitions
|
%%% gen_server callbacks
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
ets:new(?TABLE, [set, named_table, protected]),
|
ets:new(?CLIENT_TAB, [set, named_table, protected]),
|
||||||
{ok, #state{}}.
|
{ok, #state{}}.
|
||||||
|
|
||||||
handle_call({register, ClientId, Pid}, _From, State) ->
|
handle_call({register, ClientId, Pid}, _From, State) ->
|
||||||
case ets:lookup(?TABLE, ClientId) of
|
case ets:lookup(?CLIENT_TAB, ClientId) of
|
||||||
[{_, Pid, _}] ->
|
[{_, Pid, _}] ->
|
||||||
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
|
lager:error("clientId '~s' has been registered with ~p", [ClientId, Pid]),
|
||||||
ignore;
|
ignore;
|
||||||
|
@ -107,10 +128,10 @@ handle_call({register, ClientId, Pid}, _From, State) ->
|
||||||
[] ->
|
[] ->
|
||||||
insert(ClientId, Pid)
|
insert(ClientId, Pid)
|
||||||
end,
|
end,
|
||||||
{reply, ok, set_max(State)};
|
{reply, ok, setstats(State)};
|
||||||
|
|
||||||
handle_call(stats, _From, State = #state{max = Max}) ->
|
handle_call(getstats, _From, State = #state{max = Max}) ->
|
||||||
Stats = [{'clients/total', ets:info(?TABLE, size)},
|
Stats = [{'clients/count', ets:info(?CLIENT_TAB, size)},
|
||||||
{'clients/max', Max}],
|
{'clients/max', Max}],
|
||||||
{reply, Stats, State};
|
{reply, Stats, State};
|
||||||
|
|
||||||
|
@ -118,23 +139,23 @@ handle_call(_Request, _From, State) ->
|
||||||
{reply, ok, State}.
|
{reply, ok, State}.
|
||||||
|
|
||||||
handle_cast({unregister, ClientId, Pid}, State) ->
|
handle_cast({unregister, ClientId, Pid}, State) ->
|
||||||
case ets:lookup(?TABLE, ClientId) of
|
case ets:lookup(?CLIENT_TAB, ClientId) of
|
||||||
[{_, Pid, MRef}] ->
|
[{_, Pid, MRef}] ->
|
||||||
erlang:demonitor(MRef),
|
erlang:demonitor(MRef),
|
||||||
ets:delete(?TABLE, ClientId);
|
ets:delete(?CLIENT_TAB, ClientId);
|
||||||
[_] ->
|
[_] ->
|
||||||
ignore;
|
ignore;
|
||||||
[] ->
|
[] ->
|
||||||
lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid])
|
lager:error("cannot find clientId '~s' with ~p", [ClientId, Pid])
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{noreply, setstats(State)};
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
||||||
ets:match_delete(?TABLE, {{'_', DownPid, MRef}}),
|
ets:match_delete(?CLIENT_TAB, {{'_', DownPid, MRef}}),
|
||||||
{noreply, State};
|
{noreply, setstats(State)};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -145,16 +166,22 @@ terminate(_Reason, _State) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
%% Internal Function Definitions
|
%%% Internal functions
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
|
|
||||||
insert(ClientId, Pid) ->
|
insert(ClientId, Pid) ->
|
||||||
ets:insert(?TABLE, {ClientId, Pid, erlang:monitor(process, Pid)}).
|
ets:insert(?CLIENT_TAB, {ClientId, Pid, erlang:monitor(process, Pid)}).
|
||||||
|
|
||||||
set_max(State = #state{max = Max}) ->
|
setstats(State = #state{max = Max}) ->
|
||||||
Total = ets:info(?TABLE, size),
|
Count = ets:info(?CLIENT_TAB, size),
|
||||||
|
emqtt_broker:setstat('client/count', Count),
|
||||||
if
|
if
|
||||||
Total > Max -> State#state{max = Total};
|
Count > Max ->
|
||||||
true -> State
|
emqtt_broker:setstat('client/max', Count),
|
||||||
|
State#state{max = Count};
|
||||||
|
true ->
|
||||||
|
State
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -307,6 +307,7 @@ sent_stats(?PACKET(Type)) ->
|
||||||
inc(?CONNACK) ->
|
inc(?CONNACK) ->
|
||||||
emqtt_metrics:inc('packets/connack');
|
emqtt_metrics:inc('packets/connack');
|
||||||
inc(?PUBLISH) ->
|
inc(?PUBLISH) ->
|
||||||
|
emqtt_metrics:inc('messages/sent'),
|
||||||
emqtt_metrics:inc('packets/publish/sent');
|
emqtt_metrics:inc('packets/publish/sent');
|
||||||
inc(?SUBACK) ->
|
inc(?SUBACK) ->
|
||||||
emqtt_metrics:inc('packets/suback');
|
emqtt_metrics:inc('packets/suback');
|
||||||
|
|
|
@ -1,29 +1,37 @@
|
||||||
%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
|
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||||
%%
|
%%%
|
||||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
%% of this software and associated documentation files (the "Software"), to deal
|
%%% of this software and associated documentation files (the "Software"), to deal
|
||||||
%% in the Software without restriction, including without limitation the rights
|
%%% in the Software without restriction, including without limitation the rights
|
||||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
%% copies of the Software, and to permit persons to whom the Software is
|
%%% copies of the Software, and to permit persons to whom the Software is
|
||||||
%% furnished to do so, subject to the following conditions:
|
%%% furnished to do so, subject to the following conditions:
|
||||||
%%
|
%%%
|
||||||
%% The above copyright notice and this permission notice shall be included in all
|
%%% The above copyright notice and this permission notice shall be included in all
|
||||||
%% copies or substantial portions of the Software.
|
%%% copies or substantial portions of the Software.
|
||||||
%%
|
%%%
|
||||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%------------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%%% @doc
|
||||||
|
%%% emqtt core pubsub.
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqtt_pubsub).
|
-module(emqtt_pubsub).
|
||||||
|
|
||||||
-author('feng@emqtt.io').
|
-author('feng@emqtt.io').
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt.hrl").
|
||||||
|
|
||||||
-include("emqtt_topic.hrl").
|
-include("emqtt_topic.hrl").
|
||||||
|
@ -32,11 +40,9 @@
|
||||||
|
|
||||||
-include_lib("stdlib/include/qlc.hrl").
|
-include_lib("stdlib/include/qlc.hrl").
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%% API Exports
|
||||||
%% API Function Exports
|
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0, getstats/0]).
|
||||||
|
|
||||||
-export([topics/0,
|
-export([topics/0,
|
||||||
create/1,
|
create/1,
|
||||||
|
@ -48,15 +54,7 @@
|
||||||
dispatch/2,
|
dispatch/2,
|
||||||
match/1]).
|
match/1]).
|
||||||
|
|
||||||
-export([stats/0]).
|
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
|
||||||
|
|
||||||
-export([init/1,
|
-export([init/1,
|
||||||
handle_call/3,
|
handle_call/3,
|
||||||
|
@ -65,68 +63,85 @@
|
||||||
terminate/2,
|
terminate/2,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
-ifdef(use_specs).
|
|
||||||
|
|
||||||
-spec topics() -> list(topic()).
|
|
||||||
|
|
||||||
-spec subscribe({binary(), mqtt_qos()} | list(), pid()) -> {ok, list(mqtt_qos())}.
|
|
||||||
|
|
||||||
-spec unsubscribe(binary() | list(binary()), pid()) -> ok.
|
|
||||||
|
|
||||||
-endif.
|
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
-record(state, {max_subs = 0}).
|
-record(state, {max_subs = 0}).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
%% API Function Definitions
|
%%% API
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Start Pubsub.
|
||||||
%%
|
%%
|
||||||
%% @doc Start Pubsub.
|
%% @end
|
||||||
%%
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
stats() ->
|
%%------------------------------------------------------------------------------
|
||||||
gen_server:call(?SERVER, stats).
|
%% @doc
|
||||||
|
%% Get stats of PubSub.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec getstats() -> [{atom(), non_neg_integer()}].
|
||||||
|
getstats() ->
|
||||||
|
gen_server:call(?SERVER, getstats).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% All Topics.
|
||||||
%%
|
%%
|
||||||
%% @doc All topics
|
%% @end
|
||||||
%%
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec topics() -> list(binary()).
|
||||||
topics() ->
|
topics() ->
|
||||||
mnesia:dirty_all_keys(topic).
|
mnesia:dirty_all_keys(topic).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Create static topic.
|
||||||
%%
|
%%
|
||||||
%% @doc Create static topic.
|
%% @end
|
||||||
%%
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec create(binary()) -> {atomic, Reason :: any()} | {aborted, Reason :: any()}.
|
||||||
create(Topic) ->
|
create(Topic) ->
|
||||||
gen_server:call(?SERVER, {create, Topic}).
|
gen_server:call(?SERVER, {create, Topic}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Subscribe Topic or Topics
|
||||||
%%
|
%%
|
||||||
%% @doc Subscribe Topic or Topics
|
%% @end
|
||||||
%%
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec subscribe({binary(), mqtt_qos()} | list(), pid()) -> {ok, list(mqtt_qos())}.
|
||||||
subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
|
subscribe({Topic, Qos}, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
|
||||||
subscribe([{Topic, Qos}], SubPid);
|
subscribe([{Topic, Qos}], SubPid);
|
||||||
|
|
||||||
subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
|
subscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
|
||||||
gen_server:call(?SERVER, {subscribe, Topics, SubPid}).
|
gen_server:call(?SERVER, {subscribe, Topics, SubPid}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Unsubscribe Topic or Topics
|
||||||
%%
|
%%
|
||||||
%% @doc Unsubscribe Topic or Topics
|
%% @end
|
||||||
%%
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec unsubscribe(binary() | list(binary()), pid()) -> ok.
|
||||||
unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
|
unsubscribe(Topic, SubPid) when is_binary(Topic) and is_pid(SubPid) ->
|
||||||
unsubscribe([Topic], SubPid);
|
unsubscribe([Topic], SubPid);
|
||||||
|
|
||||||
unsubscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
|
unsubscribe(Topics, SubPid) when is_list(Topics) and is_pid(SubPid) ->
|
||||||
gen_server:cast(?SERVER, {unsubscribe, Topics, SubPid}).
|
gen_server:cast(?SERVER, {unsubscribe, Topics, SubPid}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Publish to cluster node.
|
||||||
%%
|
%%
|
||||||
%% @doc Publish to cluster node.
|
%% @end
|
||||||
%%
|
%%------------------------------------------------------------------------------
|
||||||
-spec publish(Msg :: mqtt_message()) -> ok.
|
-spec publish(Msg :: mqtt_message()) -> ok.
|
||||||
publish(Msg=#mqtt_message{topic=Topic}) ->
|
publish(Msg=#mqtt_message{topic=Topic}) ->
|
||||||
publish(Topic, Msg).
|
publish(Topic, Msg).
|
||||||
|
@ -140,7 +155,14 @@ publish(Topic, Msg) when is_binary(Topic) ->
|
||||||
end
|
end
|
||||||
end, match(Topic)).
|
end, match(Topic)).
|
||||||
|
|
||||||
%dispatch locally, should only be called by publish
|
%%TODO: dispatch counts....
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Dispatch Locally. Should only be called by publish.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
|
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
|
||||||
lists:foreach(fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) ->
|
lists:foreach(fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) ->
|
||||||
Msg1 = if
|
Msg1 = if
|
||||||
|
@ -150,6 +172,13 @@ dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
|
||||||
SubPid ! {dispatch, {self(), Msg1}}
|
SubPid ! {dispatch, {self(), Msg1}}
|
||||||
end, ets:lookup(topic_subscriber, Topic)).
|
end, ets:lookup(topic_subscriber, Topic)).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% @private
|
||||||
|
%% Match topic.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
-spec match(Topic :: binary()) -> [topic()].
|
-spec match(Topic :: binary()) -> [topic()].
|
||||||
match(Topic) when is_binary(Topic) ->
|
match(Topic) when is_binary(Topic) ->
|
||||||
TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]),
|
TrieNodes = mnesia:async_dirty(fun trie_match/1, [emqtt_topic:words(Topic)]),
|
||||||
|
@ -178,15 +207,15 @@ init([]) ->
|
||||||
ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]),
|
ets:new(topic_subscriber, [bag, named_table, {keypos, 2}]),
|
||||||
{ok, #state{}}.
|
{ok, #state{}}.
|
||||||
|
|
||||||
handle_call(stats, _From, State = #state{max_subs = Max}) ->
|
handle_call(getstats, _From, State = #state{max_subs = Max}) ->
|
||||||
Stats = [{'topics/total', mnesia:table_info(topic, size)},
|
Stats = [{'topics/count', mnesia:table_info(topic, size)},
|
||||||
{'subscribers/total', ets:info(topic_subscriber, size)},
|
{'subscribers/count', ets:info(topic_subscriber, size)},
|
||||||
{'subscribers/max', Max}],
|
{'subscribers/max', Max}],
|
||||||
{reply, Stats, State};
|
{reply, Stats, State};
|
||||||
|
|
||||||
handle_call({create, Topic}, _From, State) ->
|
handle_call({create, Topic}, _From, State) ->
|
||||||
Result = mnesia:transaction(fun trie_add/1, [Topic]),
|
Result = mnesia:transaction(fun trie_add/1, [Topic]),
|
||||||
{reply, Result , State};
|
{reply, Result, setstats(State)};
|
||||||
|
|
||||||
handle_call({subscribe, Topics, SubPid}, _From, State) ->
|
handle_call({subscribe, Topics, SubPid}, _From, State) ->
|
||||||
Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics],
|
Result = [subscribe_topic({Topic, Qos}, SubPid) || {Topic, Qos} <- Topics],
|
||||||
|
@ -195,7 +224,7 @@ handle_call({subscribe, Topics, SubPid}, _From, State) ->
|
||||||
[] -> {ok, [Qos || {ok, Qos} <- Result]};
|
[] -> {ok, [Qos || {ok, Qos} <- Result]};
|
||||||
Errors -> hd(Errors)
|
Errors -> hd(Errors)
|
||||||
end,
|
end,
|
||||||
{reply, Reply, set_maxsubs(State)};
|
{reply, Reply, setstats(State)};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
{stop, {badreq, Req}, State}.
|
{stop, {badreq, Req}, State}.
|
||||||
|
@ -205,7 +234,7 @@ handle_cast({unsubscribe, Topics, SubPid}, State) ->
|
||||||
ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}),
|
ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}),
|
||||||
try_remove_topic(Topic)
|
try_remove_topic(Topic)
|
||||||
end, Topics),
|
end, Topics),
|
||||||
{noreply, State};
|
{noreply, setstats(State)};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
{stop, {badmsg, Msg}, State}.
|
{stop, {badmsg, Msg}, State}.
|
||||||
|
@ -221,7 +250,7 @@ handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) ->
|
||||||
[ets:delete_object(topic_subscriber, Sub) || Sub <- Subs],
|
[ets:delete_object(topic_subscriber, Sub) || Sub <- Subs],
|
||||||
[try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs]
|
[try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs]
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{noreply, setstats(State)};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
{stop, {badinfo, Info}, State}.
|
{stop, {badinfo, Info}, State}.
|
||||||
|
@ -232,9 +261,9 @@ terminate(_Reason, _State) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
%% Internal Function Definitions
|
%%% Internal functions
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
subscribe_topic({Topic, Qos}, SubPid) ->
|
subscribe_topic({Topic, Qos}, SubPid) ->
|
||||||
case mnesia:transaction(fun trie_add/1, [Topic]) of
|
case mnesia:transaction(fun trie_add/1, [Topic]) of
|
||||||
{atomic, _} ->
|
{atomic, _} ->
|
||||||
|
@ -367,9 +396,16 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) ->
|
||||||
throw({notfound, NodeId})
|
throw({notfound, NodeId})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
set_maxsubs(State = #state{max_subs = Max}) ->
|
setstats(State = #state{max_subs = Max}) ->
|
||||||
Total = ets:info(topic_subscriber, size),
|
emqtt_broker:setstat('topics/count', mnesia:table_info(topic, size)),
|
||||||
|
SubCount = ets:info(topic_subscriber, size),
|
||||||
|
emqtt_broker:setstat('subscribers/count', SubCount),
|
||||||
if
|
if
|
||||||
Total > Max -> State#state{max_subs = Total};
|
SubCount > Max ->
|
||||||
true -> State
|
emqtt_broker:setstat('subscribers/max', SubCount),
|
||||||
|
State#state{max_subs = SubCount};
|
||||||
|
true ->
|
||||||
|
State
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -103,6 +103,7 @@ handle_cast({retain, Msg = #mqtt_message{ qos = Qos,
|
||||||
Size when Size >= Limit ->
|
Size when Size >= Limit ->
|
||||||
lager:error("Server dropped message(retain) for table is full: ~p", [Msg]);
|
lager:error("Server dropped message(retain) for table is full: ~p", [Msg]);
|
||||||
_ ->
|
_ ->
|
||||||
|
%emqtt_metrics:update('messages/retained', Size),
|
||||||
lager:info("Server retained message: ~p", [Msg]),
|
lager:info("Server retained message: ~p", [Msg]),
|
||||||
mnesia:dirty_write(#mqtt_retained{ topic = Topic,
|
mnesia:dirty_write(#mqtt_retained{ topic = Topic,
|
||||||
qos = Qos,
|
qos = Qos,
|
||||||
|
|
|
@ -1,44 +1,41 @@
|
||||||
%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
|
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||||
%%
|
%%%
|
||||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
%% of this software and associated documentation files (the "Software"), to deal
|
%%% of this software and associated documentation files (the "Software"), to deal
|
||||||
%% in the Software without restriction, including without limitation the rights
|
%%% in the Software without restriction, including without limitation the rights
|
||||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
%% copies of the Software, and to permit persons to whom the Software is
|
%%% copies of the Software, and to permit persons to whom the Software is
|
||||||
%% furnished to do so, subject to the following conditions:
|
%%% furnished to do so, subject to the following conditions:
|
||||||
%%
|
%%%
|
||||||
%% The above copyright notice and this permission notice shall be included in all
|
%%% The above copyright notice and this permission notice shall be included in all
|
||||||
%% copies or substantial portions of the Software.
|
%%% copies or substantial portions of the Software.
|
||||||
%%
|
%%%
|
||||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%------------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%%% @doc
|
||||||
|
%%% emqtt session manager.
|
||||||
%%------------------------------------------------------------------------------
|
%%%
|
||||||
%%
|
%%% The Session state in the Server consists of:
|
||||||
%% The Session state in the Server consists of:
|
%%% The existence of a Session, even if the rest of the Session state is empty.
|
||||||
%% The existence of a Session, even if the rest of the Session state is empty.
|
%%% The Client’s subscriptions.
|
||||||
%% The Client’s subscriptions.
|
%%% QoS 1 and QoS 2 messages which have been sent to the Client, but have not
|
||||||
%% QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely
|
%%% been completely acknowledged.
|
||||||
%% acknowledged.
|
%%% QoS 1 and QoS 2 messages pending transmission to the Client.
|
||||||
%% QoS 1 and QoS 2 messages pending transmission to the Client.
|
%%% QoS 2 messages which have been received from the Client, but have not been
|
||||||
%% QoS 2 messages which have been received from the Client, but have not been completely
|
%%% completely acknowledged.
|
||||||
%% acknowledged.
|
%%% Optionally, QoS 0 messages pending transmission to the Client.
|
||||||
%% Optionally, QoS 0 messages pending transmission to the Client.
|
%%%
|
||||||
%%
|
%%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqtt_sm).
|
-module(emqtt_sm).
|
||||||
|
|
||||||
%%emqtt session manager...
|
|
||||||
|
|
||||||
%%cleanSess: true | false
|
%%cleanSess: true | false
|
||||||
|
|
||||||
-include("emqtt.hrl").
|
-include("emqtt.hrl").
|
||||||
|
@ -47,73 +44,71 @@
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-define(TABLE, emqtt_session).
|
-define(SESSION_TAB, emqtt_session).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
-export([lookup_session/1, start_session/2, destroy_session/1]).
|
-export([lookup_session/1, start_session/2, destroy_session/1]).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-record(state, {max = 0}).
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
|
%%% API
|
||||||
-ifdef(use_specs).
|
%%%=============================================================================
|
||||||
|
-spec start_link() -> {ok, pid()} | ignore | {error, any()}.
|
||||||
-spec(start_link/0 :: () -> {ok, pid()}).
|
|
||||||
|
|
||||||
-spec(lookup_session/1 :: (binary()) -> pid() | undefined).
|
|
||||||
|
|
||||||
-spec(start_session/2 :: (binary(), pid()) -> {ok, pid()} | {error, any()}).
|
|
||||||
|
|
||||||
-spec(destroy_session/1 :: (binary()) -> ok).
|
|
||||||
|
|
||||||
-endif.
|
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
|
||||||
|
|
||||||
-record(state, {}).
|
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
%% API Function Definitions
|
|
||||||
%% ------------------------------------------------------------------
|
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Lookup Session Pid.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec lookup_session(binary()) -> pid() | undefined.
|
||||||
lookup_session(ClientId) ->
|
lookup_session(ClientId) ->
|
||||||
case ets:lookup(?TABLE, ClientId) of
|
case ets:lookup(?SESSION_TAB, ClientId) of
|
||||||
[{_, SessPid, _}] -> SessPid;
|
[{_, SessPid, _}] -> SessPid;
|
||||||
[] -> undefined
|
[] -> undefined
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Start Session.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec start_session(binary(), pid()) -> {ok, pid()} | {error, any()}.
|
||||||
start_session(ClientId, ClientPid) ->
|
start_session(ClientId, ClientPid) ->
|
||||||
gen_server:call(?SERVER, {start_session, ClientId, ClientPid}).
|
gen_server:call(?SERVER, {start_session, ClientId, ClientPid}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Destroy Session.
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec destroy_session(binary()) -> ok.
|
||||||
destroy_session(ClientId) ->
|
destroy_session(ClientId) ->
|
||||||
gen_server:call(?SERVER, {destroy_session, ClientId}).
|
gen_server:call(?SERVER, {destroy_session, ClientId}).
|
||||||
|
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
%% gen_server Function Definitions
|
%%% gen_server callbacks
|
||||||
%% ------------------------------------------------------------------
|
%%%=============================================================================
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
ets:new(?TABLE, [set, protected, named_table]),
|
ets:new(?SESSION_TAB, [set, protected, named_table]),
|
||||||
{ok, #state{}}.
|
{ok, #state{}}.
|
||||||
|
|
||||||
handle_call({start_session, ClientId, ClientPid}, _From, State) ->
|
handle_call({start_session, ClientId, ClientPid}, _From, State) ->
|
||||||
Reply =
|
Reply =
|
||||||
case ets:lookup(?TABLE, ClientId) of
|
case ets:lookup(?SESSION_TAB, ClientId) of
|
||||||
[{_, SessPid, _MRef}] ->
|
[{_, SessPid, _MRef}] ->
|
||||||
emqtt_session:resume(SessPid, ClientId, ClientPid),
|
emqtt_session:resume(SessPid, ClientId, ClientPid),
|
||||||
{ok, SessPid};
|
{ok, SessPid};
|
||||||
|
@ -121,24 +116,24 @@ handle_call({start_session, ClientId, ClientPid}, _From, State) ->
|
||||||
case emqtt_session_sup:start_session(ClientId, ClientPid) of
|
case emqtt_session_sup:start_session(ClientId, ClientPid) of
|
||||||
{ok, SessPid} ->
|
{ok, SessPid} ->
|
||||||
MRef = erlang:monitor(process, SessPid),
|
MRef = erlang:monitor(process, SessPid),
|
||||||
ets:insert(?TABLE, {ClientId, SessPid, MRef}),
|
ets:insert(?SESSION_TAB, {ClientId, SessPid, MRef}),
|
||||||
{ok, SessPid};
|
{ok, SessPid};
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
{error, Error}
|
{error, Error}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
{reply, Reply, State};
|
{reply, Reply, setstats(State)};
|
||||||
|
|
||||||
handle_call({destroy_session, ClientId}, _From, State) ->
|
handle_call({destroy_session, ClientId}, _From, State) ->
|
||||||
case ets:lookup(?TABLE, ClientId) of
|
case ets:lookup(?SESSION_TAB, ClientId) of
|
||||||
[{_, SessPid, MRef}] ->
|
[{_, SessPid, MRef}] ->
|
||||||
erlang:demonitor(MRef),
|
erlang:demonitor(MRef),
|
||||||
emqtt_session:destroy(SessPid, ClientId),
|
emqtt_session:destroy(SessPid, ClientId),
|
||||||
ets:delete(?TABLE, ClientId);
|
ets:delete(?SESSION_TAB, ClientId);
|
||||||
[] ->
|
[] ->
|
||||||
ignore
|
ignore
|
||||||
end,
|
end,
|
||||||
{reply, ok, State};
|
{reply, ok, setstats(State)};
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
{reply, ok, State}.
|
{reply, ok, State}.
|
||||||
|
@ -147,8 +142,8 @@ handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
||||||
ets:match_delete(?TABLE, {{'_', DownPid, MRef}}),
|
ets:match_delete(?SESSION_TAB, {{'_', DownPid, MRef}}),
|
||||||
{noreply, State};
|
{noreply, setstats(State)};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -159,4 +154,18 @@ terminate(_Reason, _State) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
|
%%%=============================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%=============================================================================
|
||||||
|
|
||||||
|
setstats(State = #state{max = Max}) ->
|
||||||
|
Count = ets:info(?SESSION_TAB, size),
|
||||||
|
emqtt_broker:setstat('sessions/count', Count),
|
||||||
|
if
|
||||||
|
Count > Max ->
|
||||||
|
emqtt_broker:setstat('sessions/max', Count),
|
||||||
|
State#state{max = Count};
|
||||||
|
true ->
|
||||||
|
State
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue