tick
This commit is contained in:
parent
eb84783d4a
commit
e88875d2af
|
@ -67,19 +67,17 @@ print_vsn() ->
|
||||||
?PRINT("~s ~s is running now~n", [Desc, Vsn]).
|
?PRINT("~s ~s is running now~n", [Desc, Vsn]).
|
||||||
|
|
||||||
start_servers(Sup) ->
|
start_servers(Sup) ->
|
||||||
{ok, SessOpts} = application:get_env(mqtt_session),
|
|
||||||
{ok, PubSubOpts} = application:get_env(pubsub),
|
{ok, PubSubOpts} = application:get_env(pubsub),
|
||||||
{ok, BrokerOpts} = application:get_env(broker),
|
{ok, BrokerOpts} = application:get_env(broker),
|
||||||
{ok, MetricOpts} = application:get_env(metrics),
|
{ok, MetricOpts} = application:get_env(metrics),
|
||||||
{ok, AccessOpts} = application:get_env(access_control),
|
{ok, AccessOpts} = application:get_env(access_control),
|
||||||
Servers = [
|
Servers = [
|
||||||
{"emqttd config", emqttd_config},
|
|
||||||
{"emqttd event", emqttd_event},
|
{"emqttd event", emqttd_event},
|
||||||
{"emqttd trace", emqttd_trace},
|
{"emqttd trace", emqttd_trace},
|
||||||
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
|
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
|
||||||
{"emqttd client manager", {supervisor, emqttd_cm_sup}},
|
{"emqttd client manager", {supervisor, emqttd_cm_sup}},
|
||||||
{"emqttd session manager", emqttd_sm},
|
{"emqttd session manager", emqttd_sm},
|
||||||
{"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts},
|
{"emqttd session supervisor", {supervisor, emqttd_session_sup}},
|
||||||
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts},
|
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts},
|
||||||
%{"emqttd router", emqttd_router},
|
%{"emqttd router", emqttd_router},
|
||||||
{"emqttd broker", emqttd_broker, BrokerOpts},
|
{"emqttd broker", emqttd_broker, BrokerOpts},
|
||||||
|
|
|
@ -37,7 +37,7 @@
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([start_link/1]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
-export([all/0, value/1,
|
-export([all/0, value/1,
|
||||||
inc/1, inc/2, inc/3,
|
inc/1, inc/2, inc/3,
|
||||||
|
@ -50,7 +50,7 @@
|
||||||
|
|
||||||
-define(METRIC_TAB, mqtt_metric).
|
-define(METRIC_TAB, mqtt_metric).
|
||||||
|
|
||||||
-record(state, {pub_interval, tick_timer}).
|
-record(state, {tick}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -60,9 +60,9 @@
|
||||||
%% @doc Start metrics server
|
%% @doc Start metrics server
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}.
|
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
|
||||||
start_link(Options) ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Get all metrics
|
%% @doc Get all metrics
|
||||||
|
@ -155,8 +155,9 @@ key(counter, Metric) ->
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([Options]) ->
|
init([]) ->
|
||||||
random:seed(now()),
|
random:seed(now()),
|
||||||
|
{ok, BrokerOpts} = application:get_env(mqtt_broker),
|
||||||
Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
|
Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
|
||||||
% Create metrics table
|
% Create metrics table
|
||||||
ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
|
ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
|
||||||
|
@ -164,12 +165,9 @@ init([Options]) ->
|
||||||
[new_metric(Metric) || Metric <- Metrics],
|
[new_metric(Metric) || Metric <- Metrics],
|
||||||
% $SYS Topics for metrics
|
% $SYS Topics for metrics
|
||||||
[ok = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics],
|
[ok = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics],
|
||||||
PubInterval = proplists:get_value(pub_interval, Options, 60),
|
% Tick to publish stats
|
||||||
Delay = if
|
Tick = emqttd_tick:new(proplists:get_value(sys_interval, BrokerOpts, 60)),
|
||||||
PubInterval == 0 -> 0;
|
{ok, #state{tick = Tick}, hibernate}.
|
||||||
true -> random:uniform(PubInterval)
|
|
||||||
end,
|
|
||||||
{ok, tick(Delay, #state{pub_interval = PubInterval}), hibernate}.
|
|
||||||
|
|
||||||
handle_call(_Req, _From, State) ->
|
handle_call(_Req, _From, State) ->
|
||||||
{reply, {error, badreq}, State}.
|
{reply, {error, badreq}, State}.
|
||||||
|
@ -177,10 +175,10 @@ handle_call(_Req, _From, State) ->
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(tick, State) ->
|
handle_info(tick, State = #state{tick = Tick}) ->
|
||||||
% publish metric message
|
% publish metric message
|
||||||
[publish(systop(Metric), i2b(Val))|| {Metric, Val} <- all()],
|
[publish(Metric, Val) || {Metric, Val} <- all()],
|
||||||
{noreply, tick(State), hibernate};
|
{noreply, State#state{tick = emqttd_tick:tick(Tick)}, hibernate};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -195,12 +193,10 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
systop(Name) when is_atom(Name) ->
|
publish(Metric, Val) ->
|
||||||
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
|
emqttd_pubsub:publish(metrics, #mqtt_message{
|
||||||
|
topic = emqtt_topic:systop(Metric),
|
||||||
publish(Topic, Payload) ->
|
payload = emqttd_utils:integer_to_binary(Val)}).
|
||||||
emqttd_pubsub:publish(metrics, #mqtt_message{topic = Topic,
|
|
||||||
payload = Payload}).
|
|
||||||
|
|
||||||
new_metric({gauge, Name}) ->
|
new_metric({gauge, Name}) ->
|
||||||
ets:insert(?METRIC_TAB, {{Name, 0}, 0});
|
ets:insert(?METRIC_TAB, {{Name, 0}, 0});
|
||||||
|
@ -209,14 +205,3 @@ new_metric({counter, Name}) ->
|
||||||
Schedulers = lists:seq(1, erlang:system_info(schedulers)),
|
Schedulers = lists:seq(1, erlang:system_info(schedulers)),
|
||||||
[ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
|
[ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers].
|
||||||
|
|
||||||
tick(State = #state{pub_interval = PubInterval}) ->
|
|
||||||
tick(PubInterval, State).
|
|
||||||
|
|
||||||
tick(0, State) ->
|
|
||||||
State;
|
|
||||||
tick(Delay, State) ->
|
|
||||||
State#state{tick_timer = erlang:send_after(Delay * 1000, self(), tick)}.
|
|
||||||
|
|
||||||
i2b(I) ->
|
|
||||||
list_to_binary(integer_to_list(I)).
|
|
||||||
|
|
||||||
|
|
|
@ -247,17 +247,18 @@ initial_state(ClientId, ClientPid) ->
|
||||||
%% @doc Start a session process.
|
%% @doc Start a session process.
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
start_link(SessOpts, ClientId, ClientPid) ->
|
start_link(ClientId, ClientPid) ->
|
||||||
gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []).
|
gen_server:start_link(?MODULE, [ClientId, ClientPid], []).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([SessOpts, ClientId, ClientPid]) ->
|
init([ClientId, ClientPid]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
%%TODO: Is this OK? should monitor...
|
%%TODO: Is this OK? or should monitor...
|
||||||
true = link(ClientPid),
|
true = link(ClientPid),
|
||||||
|
{ok, SessOpts} = application:get_env(mqtt_session),
|
||||||
State = initial_state(ClientId, ClientPid),
|
State = initial_state(ClientId, ClientPid),
|
||||||
Expires = proplists:get_value(expires, SessOpts, 1) * 3600,
|
Expires = proplists:get_value(expires, SessOpts, 1) * 3600,
|
||||||
MsgQueue = emqttd_queue:new(proplists:get_value(max_queue, SessOpts, 1000),
|
MsgQueue = emqttd_queue:new(proplists:get_value(max_queue, SessOpts, 1000),
|
||||||
|
|
|
@ -30,16 +30,22 @@
|
||||||
|
|
||||||
-behavior(supervisor).
|
-behavior(supervisor).
|
||||||
|
|
||||||
-export([start_link/1, start_session/2]).
|
-export([start_link/0, start_session/2]).
|
||||||
|
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
|
|
||||||
%TODO: FIX COMMENTS...
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Start session supervisor
|
||||||
-spec start_link([tuple()]) -> {ok, pid()}.
|
%% @end
|
||||||
start_link(SessOpts) ->
|
%%------------------------------------------------------------------------------
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, [SessOpts]).
|
-spec start_link() -> {ok, pid()}.
|
||||||
|
start_link() ->
|
||||||
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Start a session
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
-spec start_session(binary(), pid()) -> {ok, pid()}.
|
-spec start_session(binary(), pid()) -> {ok, pid()}.
|
||||||
start_session(ClientId, ClientPid) ->
|
start_session(ClientId, ClientPid) ->
|
||||||
supervisor:start_child(?MODULE, [ClientId, ClientPid]).
|
supervisor:start_child(?MODULE, [ClientId, ClientPid]).
|
||||||
|
@ -48,8 +54,8 @@ start_session(ClientId, ClientPid) ->
|
||||||
%%% Supervisor callbacks
|
%%% Supervisor callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([SessOpts]) ->
|
init([]) ->
|
||||||
{ok, {{simple_one_for_one, 10, 10},
|
{ok, {{simple_one_for_one, 10, 10},
|
||||||
[{session, {emqttd_session, start_link, [SessOpts]},
|
[{session, {emqttd_session, start_link, []},
|
||||||
transient, 10000, worker, [emqttd_session]}]}}.
|
transient, 10000, worker, [emqttd_session]}]}}.
|
||||||
|
|
||||||
|
|
|
@ -30,10 +30,14 @@
|
||||||
|
|
||||||
-include("emqttd_systop.hrl").
|
-include("emqttd_systop.hrl").
|
||||||
|
|
||||||
|
-include_lib("emqtt/include/emqtt.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
-export([start_link/0]).
|
||||||
|
|
||||||
%% statistics API.
|
%% statistics API.
|
||||||
-export([statsfun/1, statsfun/2,
|
-export([statsfun/1, statsfun/2,
|
||||||
getstats/0, getstat/1,
|
getstats/0, getstat/1,
|
||||||
|
@ -45,7 +49,7 @@
|
||||||
|
|
||||||
-define(STATS_TAB, mqtt_stats).
|
-define(STATS_TAB, mqtt_stats).
|
||||||
|
|
||||||
-record(state, {sys_interval, tick_timer}).
|
-record(state, {tick}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -118,13 +122,15 @@ setstats(Stat, MaxStat, Val) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
random:seed(now()),
|
random:seed(now()),
|
||||||
|
{ok, Options} = application:get_env(mqtt_broker),
|
||||||
ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]),
|
ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]),
|
||||||
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
|
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
|
||||||
[ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics],
|
[ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics],
|
||||||
% Create $SYS Topics
|
% Create $SYS Topics
|
||||||
[ok = emqttd_pubsub:create(emqtt_topic:systop(Topic)) || Topic <- Topics],
|
[ok = emqttd_pubsub:create(emqtt_topic:systop(Topic)) || Topic <- Topics],
|
||||||
SysInterval = proplists:get_value(sys_interval, Options, 60),
|
% Tick to publish stats
|
||||||
{ok, #state{}}.
|
Tick = emqttd_tick:new(proplists:get_value(sys_interval, Options, 60)),
|
||||||
|
{ok, #state{tick = Tick}, hibernate}.
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
{reply, error, State}.
|
{reply, error, State}.
|
||||||
|
@ -132,10 +138,9 @@ handle_call(_Request, _From, State) ->
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(tick, State) ->
|
handle_info(tick, State = #state{tick = Tick}) ->
|
||||||
[publish(systop(Stat), i2b(Val))
|
[publish(Stat, Val) || {Stat, Val} <- ets:tab2list(?STATS_TAB)],
|
||||||
|| {Stat, Val} <- ets:tab2list(?STATS_TAB)],
|
{noreply, State#state{tick = emqttd_tick:tick(Tick)}, hibernate};
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -149,4 +154,9 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
publish(Stat, Val) ->
|
||||||
|
emqttd_pubsub:publish(stats, #mqtt_message{
|
||||||
|
topic = emqtt_topic:systop(Stat),
|
||||||
|
payload = emqttd_utils:integer_to_binary(Val)}).
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -20,61 +20,26 @@
|
||||||
%%% SOFTWARE.
|
%%% SOFTWARE.
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%% emqttd config manager.
|
%%% emqttd tick.
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_config).
|
-module(emqttd_tick).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-export([new/1, tick/1]).
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-record(tick_state, {interval, timer}).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
new(Interval) ->
|
||||||
|
Delay = if
|
||||||
|
Interval == 0 -> 0;
|
||||||
|
true -> random:uniform(Interval)
|
||||||
|
end,
|
||||||
|
tick(Delay, #tick_state{interval = Interval}).
|
||||||
|
|
||||||
%% API Function Exports
|
tick(Tick = #tick_state{interval = Interval}) ->
|
||||||
-export([start_link/0, lookup/1]).
|
tick(Interval, Tick).
|
||||||
|
|
||||||
%% gen_server Function Exports
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
||||||
terminate/2, code_change/3]).
|
|
||||||
|
|
||||||
%%%=============================================================================
|
|
||||||
%%% API
|
|
||||||
%%%=============================================================================
|
|
||||||
|
|
||||||
start_link() ->
|
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
|
||||||
|
|
||||||
%%TODO: fix later...
|
|
||||||
lookup(Key) -> {ok, Key}.
|
|
||||||
|
|
||||||
%%%=============================================================================
|
|
||||||
%%% gen_server callbacks
|
|
||||||
%%%=============================================================================
|
|
||||||
|
|
||||||
init(_Args) ->
|
|
||||||
%%TODO: Load application config.
|
|
||||||
ets:new(?MODULE, [set, protected, named_table]),
|
|
||||||
{ok, none}.
|
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
|
||||||
{reply, ok, State}.
|
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
%%%=============================================================================
|
|
||||||
%%% Internal functions
|
|
||||||
%%%=============================================================================
|
|
||||||
|
|
||||||
|
tick(Delay, Tick) when is_record(Tick, tick_state) ->
|
||||||
|
Tick#tick_state{timer = erlang:send_after(Delay * 1000, self(), tick)}.
|
||||||
|
|
|
@ -32,6 +32,8 @@
|
||||||
all_module_attributes/1,
|
all_module_attributes/1,
|
||||||
cancel_timer/1]).
|
cancel_timer/1]).
|
||||||
|
|
||||||
|
-export([integer_to_binary/1]).
|
||||||
|
|
||||||
%% only {F, Args}...
|
%% only {F, Args}...
|
||||||
apply_module_attributes(Name) ->
|
apply_module_attributes(Name) ->
|
||||||
[{Module, [apply(Module, F, Args) || {F, Args} <- Attrs]} ||
|
[{Module, [apply(Module, F, Args) || {F, Args} <- Attrs]} ||
|
||||||
|
@ -84,3 +86,8 @@ cancel_timer(undefined) ->
|
||||||
undefined;
|
undefined;
|
||||||
cancel_timer(Ref) ->
|
cancel_timer(Ref) ->
|
||||||
catch erlang:cancel_timer(Ref).
|
catch erlang:cancel_timer(Ref).
|
||||||
|
|
||||||
|
integer_to_binary(I) when is_integer(I) ->
|
||||||
|
list_to_binary(integer_to_list(I)).
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue