emqx/src/emqx_sys.erl

226 lines
6.9 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_sys).
-behaviour(gen_server).
-include("emqx.hrl").
-include("types.hrl").
-include("logger.hrl").
-logger_header("[SYS]").
-export([ start_link/0
, stop/0
]).
-export([ version/0
, uptime/0
, datetime/0
, sysdescr/0
, sys_interval/0
, sys_heatbeat_interval/0
]).
-export([info/0]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-import(emqx_topic, [systop/1]).
-import(emqx_misc, [start_timer/2]).
-record(state,
{ start_time :: erlang:timestamp()
, heartbeat :: maybe(reference())
, ticker :: maybe(reference())
, version :: binary()
, sysdescr :: binary()
}).
-define(APP, emqx).
-define(SYS, ?MODULE).
-define(INFO_KEYS,
[ version % Broker version
, uptime % Broker uptime
, datetime % Broker local datetime
, sysdescr % Broker description
]).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
gen_server:start_link({local, ?SYS}, ?MODULE, [], []).
stop() ->
gen_server:stop(?SYS).
%% @doc Get sys version
-spec(version() -> string()).
version() -> emqx_app:get_release().
%% @doc Get sys description
-spec(sysdescr() -> string()).
sysdescr() -> emqx_app:get_description().
%% @doc Get sys uptime
-spec(uptime() -> string()).
uptime() ->
gen_server:call(?SYS, uptime).
%% @doc Get sys datetime
-spec(datetime() -> string()).
datetime() ->
{{Y, M, D}, {H, MM, S}} = calendar:local_time(),
lists:flatten(
io_lib:format(
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
%% @doc Get sys interval
-spec(sys_interval() -> pos_integer()).
sys_interval() ->
emqx:get_env(broker_sys_interval, 60000).
%% @doc Get sys heatbeat interval
-spec(sys_heatbeat_interval() -> pos_integer()).
sys_heatbeat_interval() ->
emqx:get_env(broker_sys_heartbeat, 30000).
%% @doc Get sys info
-spec(info() -> list(tuple())).
info() ->
[{version, version()},
{sysdescr, sysdescr()},
{uptime, uptime()},
{datetime, datetime()}].
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
init([]) ->
State = #state{start_time = erlang:timestamp(),
version = iolist_to_binary(version()),
sysdescr = iolist_to_binary(sysdescr())},
{ok, heartbeat(tick(State))}.
heartbeat(State) ->
State#state{heartbeat = start_timer(sys_heatbeat_interval(), heartbeat)}.
tick(State) ->
State#state{ticker = start_timer(sys_interval(), tick)}.
handle_call(uptime, _From, State) ->
{reply, uptime(State), State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
publish_any(uptime, iolist_to_binary(uptime(State))),
publish_any(datetime, iolist_to_binary(datetime())),
{noreply, heartbeat(State)};
handle_info({timeout, TRef, tick},
State = #state{ticker = TRef, version = Version, sysdescr = Descr}) ->
publish_any(version, Version),
publish_any(sysdescr, Descr),
publish_any(brokers, ekka_mnesia:running_nodes()),
publish_any(stats, emqx_stats:getstats()),
publish_any(metrics, emqx_metrics:all()),
{noreply, tick(State), hibernate};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->
lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]).
%%-----------------------------------------------------------------------------
%% Internal functions
%%-----------------------------------------------------------------------------
uptime(#state{start_time = Ts}) ->
Secs = timer:now_diff(erlang:timestamp(), Ts) div 1000000,
lists:flatten(uptime(seconds, Secs)).
uptime(seconds, Secs) when Secs < 60 ->
[integer_to_list(Secs), " seconds"];
uptime(seconds, Secs) ->
[uptime(minutes, Secs div 60), integer_to_list(Secs rem 60), " seconds"];
uptime(minutes, M) when M < 60 ->
[integer_to_list(M), " minutes, "];
uptime(minutes, M) ->
[uptime(hours, M div 60), integer_to_list(M rem 60), " minutes, "];
uptime(hours, H) when H < 24 ->
[integer_to_list(H), " hours, "];
uptime(hours, H) ->
[uptime(days, H div 24), integer_to_list(H rem 24), " hours, "];
uptime(days, D) ->
[integer_to_list(D), " days, "].
publish_any(Name, Value) ->
_ = publish(Name, Value),
ok.
publish(uptime, Uptime) ->
safe_publish(systop(uptime), Uptime);
publish(datetime, Datetime) ->
safe_publish(systop(datetime), Datetime);
publish(version, Version) ->
safe_publish(systop(version), #{retain => true}, Version);
publish(sysdescr, Descr) ->
safe_publish(systop(sysdescr), #{retain => true}, Descr);
publish(brokers, Nodes) ->
Payload = string:join([atom_to_list(N) || N <- Nodes], ","),
safe_publish(<<"$SYS/brokers">>, #{retain => true}, Payload);
publish(stats, Stats) ->
[safe_publish(systop(lists:concat(['stats/', Stat])), integer_to_binary(Val))
|| {Stat, Val} <- Stats, is_atom(Stat), is_integer(Val)];
publish(metrics, Metrics) ->
[safe_publish(systop(metric_topic(Name)), integer_to_binary(Val))
|| {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)].
metric_topic(Name) ->
lists:concat(["metrics/", string:replace(atom_to_list(Name), ".", "/", all)]).
safe_publish(Topic, Payload) ->
safe_publish(Topic, #{}, Payload).
safe_publish(Topic, Flags, Payload) ->
emqx_broker:safe_publish(
emqx_message:set_flags(
maps:merge(#{sys => true}, Flags),
emqx_message:make(?SYS, Topic, iolist_to_binary(Payload)))).