emqx/apps/emqx_statsd/src/emqx_statsd.erl

172 lines
5.7 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd).
-behaviour(gen_server).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-include("emqx_statsd.hrl").
-include_lib("emqx/include/logger.hrl").
-export([ update/1
, start/0
, stop/0
, restart/0
%% for rpc
, do_start/0
, do_stop/0
, do_restart/0
]).
%% Interface
-export([start_link/1]).
%% Internal Exports
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, code_change/3
, terminate/2
]).
-record(state, {
timer :: reference() | undefined,
sample_time_interval :: pos_integer(),
flush_time_interval :: pos_integer(),
estatsd_pid :: pid()
}).
update(Config) ->
case emqx_conf:update([statsd],
Config,
#{rawconf_with_defaults => true, override_to => cluster}) of
{ok, #{raw_config := NewConfigRows}} ->
ok = stop(),
case maps:get(<<"enable">>, Config, true) of
true ->
ok = start();
false ->
ignore
end,
{ok, NewConfigRows};
{error, Reason} ->
{error, Reason}
end.
start() -> check_multicall_result(emqx_statsd_proto_v1:start(mria_mnesia:running_nodes())).
stop() -> check_multicall_result(emqx_statsd_proto_v1:stop(mria_mnesia:running_nodes())).
restart() -> check_multicall_result(emqx_statsd_proto_v1:restart(mria_mnesia:running_nodes())).
do_start() ->
emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})).
do_stop() ->
emqx_statsd_sup:ensure_child_stopped(?APP).
do_restart() ->
ok = do_stop(),
ok = do_start(),
ok.
start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
init([Opts]) ->
process_flag(trap_exit, true),
Tags = tags(maps:get(tags, Opts, #{})),
{Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}),
Opts1 = maps:without([sample_time_interval,
flush_time_interval], Opts#{tags => Tags,
host => Host,
port => Port,
prefix => <<"emqx">>}),
{ok, Pid} = estatsd:start_link(maps:to_list(Opts1)),
SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
{ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval,
flush_time_interval = FlushTimeInterval,
estatsd_pid = Pid})}.
handle_call(_Req, _From, State) ->
{noreply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, Ref, sample_timeout},
State = #state{sample_time_interval = SampleTimeInterval,
flush_time_interval = FlushTimeInterval,
estatsd_pid = Pid,
timer = Ref}) ->
Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(),
SampleRate = SampleTimeInterval / FlushTimeInterval,
StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics],
estatsd:submit(Pid, StatsdMetrics),
{noreply, ensure_timer(State)};
handle_info({'EXIT', Pid, Error}, State = #state{estatsd_pid = Pid}) ->
{stop, {shutdown, Error}, State};
handle_info(_Msg, State) ->
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, #state{estatsd_pid = Pid}) ->
estatsd:stop(Pid),
ok.
%%------------------------------------------------------------------------------
%% Internal function
%%------------------------------------------------------------------------------
trans_metrics_name(Name) ->
Name0 = atom_to_binary(Name, utf8),
binary_to_atom(<<"emqx.", Name0/binary>>, utf8).
emqx_vm_data() ->
Idle = case cpu_sup:util([detailed]) of
{_, 0, 0, _} -> 0; %% Not support for Windows
{_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0)
end,
RunQueue = erlang:statistics(run_queue),
[{run_queue, RunQueue},
{cpu_idle, Idle},
{cpu_use, 100 - Idle}] ++ emqx_vm:mem_info().
tags(Map) ->
Tags = maps:to_list(Map),
[{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags].
ensure_timer(State =#state{sample_time_interval = SampleTimeInterval}) ->
State#state{timer = emqx_misc:start_timer(SampleTimeInterval, sample_timeout)}.
check_multicall_result({Results, []}) ->
case lists:all(fun(ok) -> true; (_) -> false end, Results) of
true -> ok;
false -> error({bad_result, Results})
end;
check_multicall_result({_, _}) ->
error(multicall_failed).