diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 40d651959..cdd35250c 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -67,19 +67,17 @@ print_vsn() -> ?PRINT("~s ~s is running now~n", [Desc, Vsn]). start_servers(Sup) -> - {ok, SessOpts} = application:get_env(mqtt_session), {ok, PubSubOpts} = application:get_env(pubsub), {ok, BrokerOpts} = application:get_env(broker), {ok, MetricOpts} = application:get_env(metrics), {ok, AccessOpts} = application:get_env(access_control), Servers = [ - {"emqttd config", emqttd_config}, {"emqttd event", emqttd_event}, {"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd client manager", {supervisor, emqttd_cm_sup}}, {"emqttd session manager", emqttd_sm}, - {"emqttd session supervisor", {supervisor, emqttd_session_sup}, SessOpts}, + {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts}, %{"emqttd router", emqttd_router}, {"emqttd broker", emqttd_broker, BrokerOpts}, diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index dcda649b1..df85e9a71 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -37,7 +37,7 @@ -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/1]). +-export([start_link/0]). -export([all/0, value/1, inc/1, inc/2, inc/3, @@ -50,7 +50,7 @@ -define(METRIC_TAB, mqtt_metric). --record(state, {pub_interval, tick_timer}). +-record(state, {tick}). %%%============================================================================= %%% API @@ -60,9 +60,9 @@ %% @doc Start metrics server %% @end %%------------------------------------------------------------------------------ --spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}. -start_link(Options) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []). +-spec start_link() -> {ok, pid()} | ignore | {error, term()}. +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%------------------------------------------------------------------------------ %% @doc Get all metrics @@ -155,8 +155,9 @@ key(counter, Metric) -> %%% gen_server callbacks %%%============================================================================= -init([Options]) -> +init([]) -> random:seed(now()), + {ok, BrokerOpts} = application:get_env(mqtt_broker), Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, % Create metrics table ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), @@ -164,12 +165,9 @@ init([Options]) -> [new_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics [ok = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics], - PubInterval = proplists:get_value(pub_interval, Options, 60), - Delay = if - PubInterval == 0 -> 0; - true -> random:uniform(PubInterval) - end, - {ok, tick(Delay, #state{pub_interval = PubInterval}), hibernate}. + % Tick to publish stats + Tick = emqttd_tick:new(proplists:get_value(sys_interval, BrokerOpts, 60)), + {ok, #state{tick = Tick}, hibernate}. handle_call(_Req, _From, State) -> {reply, {error, badreq}, State}. @@ -177,10 +175,10 @@ handle_call(_Req, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info(tick, State) -> +handle_info(tick, State = #state{tick = Tick}) -> % publish metric message - [publish(systop(Metric), i2b(Val))|| {Metric, Val} <- all()], - {noreply, tick(State), hibernate}; + [publish(Metric, Val) || {Metric, Val} <- all()], + {noreply, State#state{tick = emqttd_tick:tick(Tick)}, hibernate}; handle_info(_Info, State) -> {noreply, State}. @@ -195,12 +193,10 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -systop(Name) when is_atom(Name) -> - list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). - -publish(Topic, Payload) -> - emqttd_pubsub:publish(metrics, #mqtt_message{topic = Topic, - payload = Payload}). +publish(Metric, Val) -> + emqttd_pubsub:publish(metrics, #mqtt_message{ + topic = emqtt_topic:systop(Metric), + payload = emqttd_utils:integer_to_binary(Val)}). new_metric({gauge, Name}) -> ets:insert(?METRIC_TAB, {{Name, 0}, 0}); @@ -209,14 +205,3 @@ new_metric({counter, Name}) -> Schedulers = lists:seq(1, erlang:system_info(schedulers)), [ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers]. -tick(State = #state{pub_interval = PubInterval}) -> - tick(PubInterval, State). - -tick(0, State) -> - State; -tick(Delay, State) -> - State#state{tick_timer = erlang:send_after(Delay * 1000, self(), tick)}. - -i2b(I) -> - list_to_binary(integer_to_list(I)). - diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 79969daa1..28ba042e7 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -247,17 +247,18 @@ initial_state(ClientId, ClientPid) -> %% @doc Start a session process. %% @end %%------------------------------------------------------------------------------ -start_link(SessOpts, ClientId, ClientPid) -> - gen_server:start_link(?MODULE, [SessOpts, ClientId, ClientPid], []). +start_link(ClientId, ClientPid) -> + gen_server:start_link(?MODULE, [ClientId, ClientPid], []). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= -init([SessOpts, ClientId, ClientPid]) -> +init([ClientId, ClientPid]) -> process_flag(trap_exit, true), - %%TODO: Is this OK? should monitor... + %%TODO: Is this OK? or should monitor... true = link(ClientPid), + {ok, SessOpts} = application:get_env(mqtt_session), State = initial_state(ClientId, ClientPid), Expires = proplists:get_value(expires, SessOpts, 1) * 3600, MsgQueue = emqttd_queue:new(proplists:get_value(max_queue, SessOpts, 1000), diff --git a/apps/emqttd/src/emqttd_session_sup.erl b/apps/emqttd/src/emqttd_session_sup.erl index e0f82f083..e1a13f098 100644 --- a/apps/emqttd/src/emqttd_session_sup.erl +++ b/apps/emqttd/src/emqttd_session_sup.erl @@ -30,16 +30,22 @@ -behavior(supervisor). --export([start_link/1, start_session/2]). +-export([start_link/0, start_session/2]). -export([init/1]). -%TODO: FIX COMMENTS... - --spec start_link([tuple()]) -> {ok, pid()}. -start_link(SessOpts) -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [SessOpts]). +%%------------------------------------------------------------------------------ +%% @doc Start session supervisor +%% @end +%%------------------------------------------------------------------------------ +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). +%%------------------------------------------------------------------------------ +%% @doc Start a session +%% @end +%%------------------------------------------------------------------------------ -spec start_session(binary(), pid()) -> {ok, pid()}. start_session(ClientId, ClientPid) -> supervisor:start_child(?MODULE, [ClientId, ClientPid]). @@ -48,8 +54,8 @@ start_session(ClientId, ClientPid) -> %%% Supervisor callbacks %%%============================================================================= -init([SessOpts]) -> +init([]) -> {ok, {{simple_one_for_one, 10, 10}, - [{session, {emqttd_session, start_link, [SessOpts]}, + [{session, {emqttd_session, start_link, []}, transient, 10000, worker, [emqttd_session]}]}}. diff --git a/apps/emqttd/src/emqttd_stats.erl b/apps/emqttd/src/emqttd_stats.erl index c48d7da0e..307ce2af6 100644 --- a/apps/emqttd/src/emqttd_stats.erl +++ b/apps/emqttd/src/emqttd_stats.erl @@ -30,10 +30,14 @@ -include("emqttd_systop.hrl"). +-include_lib("emqtt/include/emqtt.hrl"). + -behaviour(gen_server). -define(SERVER, ?MODULE). +-export([start_link/0]). + %% statistics API. -export([statsfun/1, statsfun/2, getstats/0, getstat/1, @@ -45,7 +49,7 @@ -define(STATS_TAB, mqtt_stats). --record(state, {sys_interval, tick_timer}). +-record(state, {tick}). %%%============================================================================= %%% API @@ -118,13 +122,15 @@ setstats(Stat, MaxStat, Val) -> init([]) -> random:seed(now()), + {ok, Options} = application:get_env(mqtt_broker), ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]), Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB, [ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics], % Create $SYS Topics [ok = emqttd_pubsub:create(emqtt_topic:systop(Topic)) || Topic <- Topics], - SysInterval = proplists:get_value(sys_interval, Options, 60), - {ok, #state{}}. + % Tick to publish stats + Tick = emqttd_tick:new(proplists:get_value(sys_interval, Options, 60)), + {ok, #state{tick = Tick}, hibernate}. handle_call(_Request, _From, State) -> {reply, error, State}. @@ -132,10 +138,9 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info(tick, State) -> - [publish(systop(Stat), i2b(Val)) - || {Stat, Val} <- ets:tab2list(?STATS_TAB)], - {noreply, State}; +handle_info(tick, State = #state{tick = Tick}) -> + [publish(Stat, Val) || {Stat, Val} <- ets:tab2list(?STATS_TAB)], + {noreply, State#state{tick = emqttd_tick:tick(Tick)}, hibernate}; handle_info(_Info, State) -> {noreply, State}. @@ -149,4 +154,9 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= +publish(Stat, Val) -> + emqttd_pubsub:publish(stats, #mqtt_message{ + topic = emqtt_topic:systop(Stat), + payload = emqttd_utils:integer_to_binary(Val)}). + diff --git a/apps/emqttd/src/emqttd_config.erl b/apps/emqttd/src/emqttd_tick.erl similarity index 50% rename from apps/emqttd/src/emqttd_config.erl rename to apps/emqttd/src/emqttd_tick.erl index 38d6e6794..e16c47bb6 100644 --- a/apps/emqttd/src/emqttd_config.erl +++ b/apps/emqttd/src/emqttd_tick.erl @@ -20,61 +20,26 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd config manager. +%%% emqttd tick. %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_config). +-module(emqttd_tick). --author("Feng Lee "). +-export([new/1, tick/1]). --define(SERVER, ?MODULE). +-record(tick_state, {interval, timer}). --behaviour(gen_server). +new(Interval) -> + Delay = if + Interval == 0 -> 0; + true -> random:uniform(Interval) + end, + tick(Delay, #tick_state{interval = Interval}). -%% API Function Exports --export([start_link/0, lookup/1]). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%%%============================================================================= -%%% API -%%%============================================================================= - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%TODO: fix later... -lookup(Key) -> {ok, Key}. - -%%%============================================================================= -%%% gen_server callbacks -%%%============================================================================= - -init(_Args) -> - %%TODO: Load application config. - ets:new(?MODULE, [set, protected, named_table]), - {ok, none}. - -handle_call(_Request, _From, State) -> - {reply, ok, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%============================================================================= -%%% Internal functions -%%%============================================================================= +tick(Tick = #tick_state{interval = Interval}) -> + tick(Interval, Tick). +tick(Delay, Tick) when is_record(Tick, tick_state) -> + Tick#tick_state{timer = erlang:send_after(Delay * 1000, self(), tick)}. diff --git a/apps/emqttd/src/emqttd_utils.erl b/apps/emqttd/src/emqttd_utils.erl index d9a76380a..a5a94b931 100644 --- a/apps/emqttd/src/emqttd_utils.erl +++ b/apps/emqttd/src/emqttd_utils.erl @@ -32,6 +32,8 @@ all_module_attributes/1, cancel_timer/1]). +-export([integer_to_binary/1]). + %% only {F, Args}... apply_module_attributes(Name) -> [{Module, [apply(Module, F, Args) || {F, Args} <- Attrs]} || @@ -84,3 +86,8 @@ cancel_timer(undefined) -> undefined; cancel_timer(Ref) -> catch erlang:cancel_timer(Ref). + +integer_to_binary(I) when is_integer(I) -> + list_to_binary(integer_to_list(I)). + +