%%-------------------------------------------------------------------- %% Copyright (c) 2012-2017 Feng Lee . %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqttd_broker). -behaviour(gen_server). -include("emqttd.hrl"). -include("emqttd_internal.hrl"). %% API Function Exports -export([start_link/0]). %% Event API -export([subscribe/1, notify/2]). %% Broker API -export([version/0, uptime/0, datetime/0, sysdescr/0]). %% Tick API -export([start_tick/1, stop_tick/1]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, {started_at, sys_interval, heartbeat, tick_tref, version, sysdescr}). -define(SERVER, ?MODULE). -define(BROKER_TAB, mqtt_broker). %% $SYS Topics of Broker -define(SYSTOP_BROKERS, [ version, % Broker version uptime, % Broker uptime datetime, % Broker local datetime sysdescr % Broker description ]). %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- %% @doc Start emqttd broker -spec(start_link() -> {ok, pid()} | ignore | {error, any()}). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %% @doc Subscribe broker event -spec(subscribe(EventType :: any()) -> ok). subscribe(EventType) -> gproc:reg({p, l, {broker, EventType}}). %% @doc Notify broker event -spec(notify(EventType :: any(), Event :: any()) -> ok). notify(EventType, Event) -> gproc:send({p, l, {broker, EventType}}, {notify, EventType, self(), Event}). %% @doc Get broker version -spec(version() -> string()). version() -> {ok, Version} = application:get_key(emqttd, vsn), Version. %% @doc Get broker description -spec(sysdescr() -> string()). sysdescr() -> {ok, Descr} = application:get_key(emqttd, description), Descr. %% @doc Get broker uptime -spec(uptime() -> string()). uptime() -> gen_server:call(?SERVER, uptime). %% @doc Get broker datetime -spec(datetime() -> string()). datetime() -> {{Y, M, D}, {H, MM, S}} = calendar:local_time(), lists:flatten( io_lib:format( "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])). %% @doc Start a tick timer start_tick(Msg) -> start_tick(timer:seconds(emqttd:env(broker_sys_interval, 60)), Msg). start_tick(0, _Msg) -> undefined; start_tick(Interval, Msg) when Interval > 0 -> {ok, TRef} = timer:send_interval(Interval, Msg), TRef. %% @doc Start tick timer stop_tick(undefined) -> ok; stop_tick(TRef) -> timer:cancel(TRef). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> emqttd_time:seed(), ets:new(?BROKER_TAB, [set, public, named_table]), % Tick {ok, #state{started_at = os:timestamp(), heartbeat = start_tick(1000, heartbeat), version = list_to_binary(version()), sysdescr = list_to_binary(sysdescr()), tick_tref = start_tick(tick)}, hibernate}. handle_call(uptime, _From, State) -> {reply, uptime(State), State}; handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). handle_info(heartbeat, State) -> publish(uptime, list_to_binary(uptime(State))), publish(datetime, list_to_binary(datetime())), {noreply, State, hibernate}; handle_info(tick, State = #state{version = Version, sysdescr = Descr}) -> retain(brokers), retain(version, Version), retain(sysdescr, Descr), {noreply, State, hibernate}; handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) -> stop_tick(Hb), stop_tick(TRef), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- retain(brokers) -> Payload = list_to_binary(string:join([atom_to_list(N) || N <- emqttd_mnesia:running_nodes()], ",")), Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload), Msg1 = emqttd_message:set_flag(sys, emqttd_message:set_flag(retain, Msg)), emqttd:publish(Msg1). retain(Topic, Payload) when is_binary(Payload) -> Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), Msg1 = emqttd_message:set_flag(sys, emqttd_message:set_flag(retain, Msg)), emqttd:publish(Msg1). publish(Topic, Payload) when is_binary(Payload) -> Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), emqttd:publish(emqttd_message:set_flag(sys, Msg)). uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, lists:flatten(uptime(seconds, Secs)). uptime(seconds, Secs) when Secs < 60 -> [integer_to_list(Secs), " seconds"]; uptime(seconds, Secs) -> [uptime(minutes, Secs div 60), integer_to_list(Secs rem 60), " seconds"]; uptime(minutes, M) when M < 60 -> [integer_to_list(M), " minutes, "]; uptime(minutes, M) -> [uptime(hours, M div 60), integer_to_list(M rem 60), " minutes, "]; uptime(hours, H) when H < 24 -> [integer_to_list(H), " hours, "]; uptime(hours, H) -> [uptime(days, H div 24), integer_to_list(H rem 24), " hours, "]; uptime(days, D) -> [integer_to_list(D), " days,"].