226 lines
6.9 KiB
Erlang
226 lines
6.9 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2018-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_sys).
|
|
|
|
-behaviour(gen_server).
|
|
|
|
-include("emqx.hrl").
|
|
-include("types.hrl").
|
|
-include("logger.hrl").
|
|
|
|
-logger_header("[SYS]").
|
|
|
|
-export([ start_link/0
|
|
, stop/0
|
|
]).
|
|
|
|
-export([ version/0
|
|
, uptime/0
|
|
, datetime/0
|
|
, sysdescr/0
|
|
, sys_interval/0
|
|
, sys_heatbeat_interval/0
|
|
]).
|
|
|
|
-export([info/0]).
|
|
|
|
%% gen_server callbacks
|
|
-export([ init/1
|
|
, handle_call/3
|
|
, handle_cast/2
|
|
, handle_info/2
|
|
, terminate/2
|
|
]).
|
|
|
|
-ifdef(TEST).
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
-endif.
|
|
|
|
-import(emqx_topic, [systop/1]).
|
|
-import(emqx_misc, [start_timer/2]).
|
|
|
|
-record(state,
|
|
{ start_time :: erlang:timestamp()
|
|
, heartbeat :: maybe(reference())
|
|
, ticker :: maybe(reference())
|
|
, version :: binary()
|
|
, sysdescr :: binary()
|
|
}).
|
|
|
|
-define(APP, emqx).
|
|
-define(SYS, ?MODULE).
|
|
|
|
-define(INFO_KEYS,
|
|
[ version % Broker version
|
|
, uptime % Broker uptime
|
|
, datetime % Broker local datetime
|
|
, sysdescr % Broker description
|
|
]).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% APIs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
|
|
start_link() ->
|
|
gen_server:start_link({local, ?SYS}, ?MODULE, [], []).
|
|
|
|
stop() ->
|
|
gen_server:stop(?SYS).
|
|
|
|
%% @doc Get sys version
|
|
-spec(version() -> string()).
|
|
version() -> emqx_app:get_release().
|
|
|
|
%% @doc Get sys description
|
|
-spec(sysdescr() -> string()).
|
|
sysdescr() -> emqx_app:get_description().
|
|
|
|
%% @doc Get sys uptime
|
|
-spec(uptime() -> string()).
|
|
uptime() ->
|
|
gen_server:call(?SYS, uptime).
|
|
|
|
%% @doc Get sys datetime
|
|
-spec(datetime() -> string()).
|
|
datetime() ->
|
|
{{Y, M, D}, {H, MM, S}} = calendar:local_time(),
|
|
lists:flatten(
|
|
io_lib:format(
|
|
"~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
|
|
|
|
%% @doc Get sys interval
|
|
-spec(sys_interval() -> pos_integer()).
|
|
sys_interval() ->
|
|
emqx:get_env(broker_sys_interval, 60000).
|
|
|
|
%% @doc Get sys heatbeat interval
|
|
-spec(sys_heatbeat_interval() -> pos_integer()).
|
|
sys_heatbeat_interval() ->
|
|
emqx:get_env(broker_sys_heartbeat, 30000).
|
|
|
|
%% @doc Get sys info
|
|
-spec(info() -> list(tuple())).
|
|
info() ->
|
|
[{version, version()},
|
|
{sysdescr, sysdescr()},
|
|
{uptime, uptime()},
|
|
{datetime, datetime()}].
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% gen_server callbacks
|
|
%%------------------------------------------------------------------------------
|
|
|
|
init([]) ->
|
|
State = #state{start_time = erlang:timestamp(),
|
|
version = iolist_to_binary(version()),
|
|
sysdescr = iolist_to_binary(sysdescr())},
|
|
{ok, heartbeat(tick(State))}.
|
|
|
|
heartbeat(State) ->
|
|
State#state{heartbeat = start_timer(sys_heatbeat_interval(), heartbeat)}.
|
|
tick(State) ->
|
|
State#state{ticker = start_timer(sys_interval(), tick)}.
|
|
|
|
handle_call(uptime, _From, State) ->
|
|
{reply, uptime(State), State};
|
|
|
|
handle_call(Req, _From, State) ->
|
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
|
{reply, ignored, State}.
|
|
|
|
handle_cast(Msg, State) ->
|
|
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
|
{noreply, State}.
|
|
|
|
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
|
|
publish_any(uptime, iolist_to_binary(uptime(State))),
|
|
publish_any(datetime, iolist_to_binary(datetime())),
|
|
{noreply, heartbeat(State)};
|
|
|
|
handle_info({timeout, TRef, tick},
|
|
State = #state{ticker = TRef, version = Version, sysdescr = Descr}) ->
|
|
publish_any(version, Version),
|
|
publish_any(sysdescr, Descr),
|
|
publish_any(brokers, ekka_mnesia:running_nodes()),
|
|
publish_any(stats, emqx_stats:getstats()),
|
|
publish_any(metrics, emqx_metrics:all()),
|
|
{noreply, tick(State), hibernate};
|
|
|
|
handle_info(Info, State) ->
|
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->
|
|
lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]).
|
|
|
|
%%-----------------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%-----------------------------------------------------------------------------
|
|
|
|
uptime(#state{start_time = Ts}) ->
|
|
Secs = timer:now_diff(erlang:timestamp(), Ts) div 1000000,
|
|
lists:flatten(uptime(seconds, Secs)).
|
|
uptime(seconds, Secs) when Secs < 60 ->
|
|
[integer_to_list(Secs), " seconds"];
|
|
uptime(seconds, Secs) ->
|
|
[uptime(minutes, Secs div 60), integer_to_list(Secs rem 60), " seconds"];
|
|
uptime(minutes, M) when M < 60 ->
|
|
[integer_to_list(M), " minutes, "];
|
|
uptime(minutes, M) ->
|
|
[uptime(hours, M div 60), integer_to_list(M rem 60), " minutes, "];
|
|
uptime(hours, H) when H < 24 ->
|
|
[integer_to_list(H), " hours, "];
|
|
uptime(hours, H) ->
|
|
[uptime(days, H div 24), integer_to_list(H rem 24), " hours, "];
|
|
uptime(days, D) ->
|
|
[integer_to_list(D), " days, "].
|
|
|
|
publish_any(Name, Value) ->
|
|
_ = publish(Name, Value),
|
|
ok.
|
|
|
|
publish(uptime, Uptime) ->
|
|
safe_publish(systop(uptime), Uptime);
|
|
publish(datetime, Datetime) ->
|
|
safe_publish(systop(datetime), Datetime);
|
|
publish(version, Version) ->
|
|
safe_publish(systop(version), #{retain => true}, Version);
|
|
publish(sysdescr, Descr) ->
|
|
safe_publish(systop(sysdescr), #{retain => true}, Descr);
|
|
publish(brokers, Nodes) ->
|
|
Payload = string:join([atom_to_list(N) || N <- Nodes], ","),
|
|
safe_publish(<<"$SYS/brokers">>, #{retain => true}, Payload);
|
|
publish(stats, Stats) ->
|
|
[safe_publish(systop(lists:concat(['stats/', Stat])), integer_to_binary(Val))
|
|
|| {Stat, Val} <- Stats, is_atom(Stat), is_integer(Val)];
|
|
publish(metrics, Metrics) ->
|
|
[safe_publish(systop(metric_topic(Name)), integer_to_binary(Val))
|
|
|| {Name, Val} <- Metrics, is_atom(Name), is_integer(Val)].
|
|
|
|
metric_topic(Name) ->
|
|
lists:concat(["metrics/", string:replace(atom_to_list(Name), ".", "/", all)]).
|
|
|
|
safe_publish(Topic, Payload) ->
|
|
safe_publish(Topic, #{}, Payload).
|
|
safe_publish(Topic, Flags, Payload) ->
|
|
emqx_broker:safe_publish(
|
|
emqx_message:set_flags(
|
|
maps:merge(#{sys => true}, Flags),
|
|
emqx_message:make(?SYS, Topic, iolist_to_binary(Payload)))).
|