start_tick, stop_tick

This commit is contained in:
Feng Lee 2015-04-29 16:23:26 +08:00
parent 2db943f5e8
commit 5a43afd07f
3 changed files with 32 additions and 13 deletions

View File

@ -45,6 +45,9 @@
%% Broker API %% Broker API
-export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). -export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]).
%% Tick API
-export([start_tick/1, stop_tick/1]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
@ -124,6 +127,27 @@ datetime() ->
io_lib:format( io_lib:format(
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
%%------------------------------------------------------------------------------
%% @doc Start a tick timer
%% @end
%%------------------------------------------------------------------------------
start_tick(Msg) ->
start_tick(timer:seconds(env(sys_interval)), Msg).
start_tick(0, _Msg) ->
undefined;
start_tick(Interval, Msg) when Interval > 0 ->
{ok, TRef} = timer:send_interval(Interval, Msg), TRef.
%%------------------------------------------------------------------------------
%% @doc Start tick timer
%% @end
%%------------------------------------------------------------------------------
stop_tick(undefined) ->
ok;
stop_tick(TRef) ->
timer:cancel(TRef).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================= %%%=============================================================================
@ -134,10 +158,7 @@ init([]) ->
% Create $SYS Topics % Create $SYS Topics
[ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS], [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS],
% Tick % Tick
SysInterval = env(sys_interval), {ok, #state{started_at = os:timestamp(), tick_tref = start_tick(tick)}, hibernate}.
{ok, TRef} = timer:send_interval(timer:seconds(SysInterval), tick),
State = #state{started_at = os:timestamp(), sys_interval = SysInterval, tick_tref = TRef},
{ok, State, hibernate}.
handle_call(uptime, _From, State) -> handle_call(uptime, _From, State) ->
{reply, uptime(State), State}; {reply, uptime(State), State};
@ -158,8 +179,8 @@ handle_info(tick, State) ->
handle_info(_Info, State) -> handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, #state{tick_tref = TRef}) ->
ok. stop_tick(TRef).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.

View File

@ -165,8 +165,7 @@ init([]) ->
% $SYS Topics for metrics % $SYS Topics for metrics
[ok = create_topic(Topic) || {_, Topic} <- Metrics], [ok = create_topic(Topic) || {_, Topic} <- Metrics],
% Tick to publish metrics % Tick to publish metrics
{ok, TRef} = timer:send_interval(timer:seconds(emqttd_broker:env(sys_interval)), tick), {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
{ok, #state{tick_tref = TRef}, hibernate}.
handle_call(_Req, _From, State) -> handle_call(_Req, _From, State) ->
{reply, error, State}. {reply, error, State}.
@ -183,7 +182,7 @@ handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{tick_tref = TRef}) -> terminate(_Reason, #state{tick_tref = TRef}) ->
timer:cancel(TRef), ok. emqttd_broker:stop_tick(TRef).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.

View File

@ -128,8 +128,7 @@ init([]) ->
% Create $SYS Topics % Create $SYS Topics
[ok = emqttd_pubsub:create(emqtt_topic:systop(Topic)) || Topic <- Topics], [ok = emqttd_pubsub:create(emqtt_topic:systop(Topic)) || Topic <- Topics],
% Tick to publish stats % Tick to publish stats
{ok, TRef} = timer:send_interval(timer:seconds(emqttd_broker:env(sys_interval)), tick), {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
{ok, #state{tick_tref = TRef}, hibernate}.
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, error, State}. {reply, error, State}.
@ -137,6 +136,7 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
%% Interval Tick.
handle_info(tick, State) -> handle_info(tick, State) ->
[publish(Stat, Val) || {Stat, Val} <- ets:tab2list(?STATS_TAB)], [publish(Stat, Val) || {Stat, Val} <- ets:tab2list(?STATS_TAB)],
{noreply, State, hibernate}; {noreply, State, hibernate};
@ -145,7 +145,7 @@ handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{tick_tref = TRef}) -> terminate(_Reason, #state{tick_tref = TRef}) ->
timer:cancel(TRef), ok. emqttd_broker:stop_tick(TRef).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -157,4 +157,3 @@ publish(Stat, Val) ->
emqttd_pubsub:publish(stats, #mqtt_message{topic = emqtt_topic:systop(Stat), emqttd_pubsub:publish(stats, #mqtt_message{topic = emqtt_topic:systop(Stat),
payload = emqttd_util:integer_to_binary(Val)}). payload = emqttd_util:integer_to_binary(Val)}).