Merge pull request #12744 from JimMoen/EMQX-12046/fix-otel-cpu-sup

fix(otel): cpu usage/idle metrics for opentelemetry
This commit is contained in:
JimMoen 2024-03-20 17:52:29 +08:00 committed by GitHub
commit 5390203184
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 160 additions and 4 deletions

View File

@ -124,9 +124,12 @@ ensure_otel_metrics(
) ->
ok;
ensure_otel_metrics(#{metrics := #{enable := true}} = Conf, _Old) ->
ok = emqx_otel_cpu_sup:stop_otel_cpu_sup(),
_ = emqx_otel_cpu_sup:start_otel_cpu_sup(Conf),
_ = emqx_otel_metrics:stop_otel(),
emqx_otel_metrics:start_otel(Conf);
ensure_otel_metrics(#{metrics := #{enable := false}}, _Old) ->
ok = emqx_otel_cpu_sup:stop_otel_cpu_sup(),
emqx_otel_metrics:stop_otel();
ensure_otel_metrics(_, _) ->
ok.

View File

@ -0,0 +1,146 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_otel_cpu_sup).
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
%% gen_server APIs
-export([start_link/1]).
-export([
start_otel_cpu_sup/1,
stop_otel_cpu_sup/0,
stats/1
]).
%% gen_server callbacks
-export([
init/1,
handle_continue/2,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-define(REFRESH, refresh).
-define(OTEL_CPU_USAGE_WORKER, ?MODULE).
-define(SUPERVISOR, emqx_otel_sup).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
start_otel_cpu_sup(Conf) ->
Spec = emqx_otel_sup:worker_spec(?MODULE, Conf),
assert_started(supervisor:start_child(?SUPERVISOR, Spec)).
stop_otel_cpu_sup() ->
case erlang:whereis(?SUPERVISOR) of
undefined ->
ok;
Pid ->
case supervisor:terminate_child(Pid, ?MODULE) of
ok -> supervisor:delete_child(Pid, ?MODULE);
{error, not_found} -> ok;
Error -> Error
end
end.
stats(Name) ->
gen_server:call(?OTEL_CPU_USAGE_WORKER, {?FUNCTION_NAME, Name}, infinity).
%%--------------------------------------------------------------------
%% gen_server callbacks
%% simply handle cpu_sup:util/0,1 called in one process
%%--------------------------------------------------------------------
start_link(Conf) ->
gen_server:start_link({local, ?OTEL_CPU_USAGE_WORKER}, ?MODULE, Conf, []).
init(Conf) ->
{ok, _InitState = #{}, {continue, {setup, Conf}}}.
%% Interval in milliseconds
handle_continue({setup, #{metrics := #{enable := true, interval := Interval}}}, State) ->
%% start os_mon temporarily
{ok, _} = application:ensure_all_started(os_mon),
%% The returned value of the first call to cpu_sup:util/0 or cpu_sup:util/1 by a
%% process will on most systems be the CPU utilization since system boot,
%% but this is not guaranteed and the value should therefore be regarded as garbage.
%% This also applies to the first call after a restart of cpu_sup.
_Val = cpu_sup:util(),
TRef = start_refresh_timer(Interval),
{noreply, State#{interval => Interval, refresh_time_ref => TRef}}.
handle_call({stats, Name}, _From, State) ->
{reply, get_stats(Name, State), State};
handle_call(stop, _From, State) ->
cancel_outdated_timer(State),
{stop, normal, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}.
handle_info({timeout, _Timer, ?REFRESH}, State) ->
{noreply, refresh(State)}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
refresh(#{interval := Interval} = State) ->
NState =
case cpu_sup:util([]) of
{all, U, I, _} ->
State#{'cpu.use' => U, 'cpu.idle' => I};
_ ->
State#{'cpu.use' => 0, 'cpu.idle' => 0}
end,
TRef = start_refresh_timer(Interval),
NState#{refresh_time_ref => TRef}.
get_stats(Name, State) ->
maps:get(Name, State, 0).
cancel_outdated_timer(#{refresh_time_ref := TRef}) ->
emqx_utils:cancel_timer(TRef),
ok.
start_refresh_timer(Interval) ->
start_timer(Interval, ?REFRESH).
start_timer(Interval, Msg) ->
emqx_utils:start_timer(Interval, Msg).
assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_started, _Pid}}) -> ok;
assert_started({error, Reason}) -> {error, Reason}.

View File

@ -197,6 +197,10 @@ bytes_metrics() ->
get_stats_gauge(Name) ->
[{emqx_stats:getstat(Name), #{}}].
get_vm_gauge('cpu.use') ->
[{emqx_otel_cpu_sup:stats('cpu.use'), #{}}];
get_vm_gauge('cpu.idle') ->
[{emqx_otel_cpu_sup:stats('cpu.idle'), #{}}];
get_vm_gauge(Name) ->
[{emqx_mgmt:vm_stats(Name), #{}}].
@ -254,8 +258,6 @@ create_counter(Meter, Counters, CallBack) ->
Counters
).
%% Note: list_to_existing_atom("cpu.use") will crash
%% so we make sure the atom is already existing here
normalize_name(cpu_use) ->
'cpu.use';
normalize_name(cpu_idle) ->

View File

@ -42,7 +42,12 @@ init([]) ->
},
Children =
case emqx_conf:get([opentelemetry]) of
#{metrics := #{enable := false}} -> [];
#{metrics := #{enable := true}} = Conf -> [worker_spec(emqx_otel_metrics, Conf)]
#{metrics := #{enable := false}} ->
[];
#{metrics := #{enable := true}} = Conf ->
[
worker_spec(emqx_otel_metrics, Conf),
worker_spec(emqx_otel_cpu_sup, Conf)
]
end,
{ok, {SupFlags, Children}}.