diff --git a/apps/emqx_opentelemetry/src/emqx_otel_config.erl b/apps/emqx_opentelemetry/src/emqx_otel_config.erl index 62d9427cd..6817df7d1 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_config.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_config.erl @@ -124,9 +124,12 @@ ensure_otel_metrics( ) -> ok; ensure_otel_metrics(#{metrics := #{enable := true}} = Conf, _Old) -> + ok = emqx_otel_cpu_sup:stop_otel_cpu_sup(), + _ = emqx_otel_cpu_sup:start_otel_cpu_sup(Conf), _ = emqx_otel_metrics:stop_otel(), emqx_otel_metrics:start_otel(Conf); ensure_otel_metrics(#{metrics := #{enable := false}}, _Old) -> + ok = emqx_otel_cpu_sup:stop_otel_cpu_sup(), emqx_otel_metrics:stop_otel(); ensure_otel_metrics(_, _) -> ok. diff --git a/apps/emqx_opentelemetry/src/emqx_otel_cpu_sup.erl b/apps/emqx_opentelemetry/src/emqx_otel_cpu_sup.erl new file mode 100644 index 000000000..fc67831be --- /dev/null +++ b/apps/emqx_opentelemetry/src/emqx_otel_cpu_sup.erl @@ -0,0 +1,146 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_otel_cpu_sup). + +-behaviour(gen_server). + +-include_lib("emqx/include/logger.hrl"). + +%% gen_server APIs +-export([start_link/1]). + +-export([ + start_otel_cpu_sup/1, + stop_otel_cpu_sup/0, + stats/1 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_continue/2, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-define(REFRESH, refresh). +-define(OTEL_CPU_USAGE_WORKER, ?MODULE). +-define(SUPERVISOR, emqx_otel_sup). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +start_otel_cpu_sup(Conf) -> + Spec = emqx_otel_sup:worker_spec(?MODULE, Conf), + assert_started(supervisor:start_child(?SUPERVISOR, Spec)). + +stop_otel_cpu_sup() -> + case erlang:whereis(?SUPERVISOR) of + undefined -> + ok; + Pid -> + case supervisor:terminate_child(Pid, ?MODULE) of + ok -> supervisor:delete_child(Pid, ?MODULE); + {error, not_found} -> ok; + Error -> Error + end + end. + +stats(Name) -> + gen_server:call(?OTEL_CPU_USAGE_WORKER, {?FUNCTION_NAME, Name}, infinity). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%% simply handle cpu_sup:util/0,1 called in one process +%%-------------------------------------------------------------------- + +start_link(Conf) -> + gen_server:start_link({local, ?OTEL_CPU_USAGE_WORKER}, ?MODULE, Conf, []). + +init(Conf) -> + {ok, _InitState = #{}, {continue, {setup, Conf}}}. + +%% Interval in milliseconds +handle_continue({setup, #{metrics := #{enable := true, interval := Interval}}}, State) -> + %% start os_mon temporarily + {ok, _} = application:ensure_all_started(os_mon), + %% The returned value of the first call to cpu_sup:util/0 or cpu_sup:util/1 by a + %% process will on most systems be the CPU utilization since system boot, + %% but this is not guaranteed and the value should therefore be regarded as garbage. + %% This also applies to the first call after a restart of cpu_sup. + _Val = cpu_sup:util(), + TRef = start_refresh_timer(Interval), + {noreply, State#{interval => Interval, refresh_time_ref => TRef}}. + +handle_call({stats, Name}, _From, State) -> + {reply, get_stats(Name, State), State}; +handle_call(stop, _From, State) -> + cancel_outdated_timer(State), + {stop, normal, State}; +handle_call(Req, _From, State) -> + ?SLOG(error, #{msg => "unexpected_call", call => Req}), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), + {noreply, State}. + +handle_info({timeout, _Timer, ?REFRESH}, State) -> + {noreply, refresh(State)}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +refresh(#{interval := Interval} = State) -> + NState = + case cpu_sup:util([]) of + {all, U, I, _} -> + State#{'cpu.use' => U, 'cpu.idle' => I}; + _ -> + State#{'cpu.use' => 0, 'cpu.idle' => 0} + end, + TRef = start_refresh_timer(Interval), + NState#{refresh_time_ref => TRef}. + +get_stats(Name, State) -> + maps:get(Name, State, 0). + +cancel_outdated_timer(#{refresh_time_ref := TRef}) -> + emqx_utils:cancel_timer(TRef), + ok. + +start_refresh_timer(Interval) -> + start_timer(Interval, ?REFRESH). + +start_timer(Interval, Msg) -> + emqx_utils:start_timer(Interval, Msg). + +assert_started({ok, _Pid}) -> ok; +assert_started({ok, _Pid, _Info}) -> ok; +assert_started({error, {already_started, _Pid}}) -> ok; +assert_started({error, Reason}) -> {error, Reason}. diff --git a/apps/emqx_opentelemetry/src/emqx_otel_metrics.erl b/apps/emqx_opentelemetry/src/emqx_otel_metrics.erl index 185a40228..dee51ffe6 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_metrics.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_metrics.erl @@ -197,6 +197,10 @@ bytes_metrics() -> get_stats_gauge(Name) -> [{emqx_stats:getstat(Name), #{}}]. +get_vm_gauge('cpu.use') -> + [{emqx_otel_cpu_sup:stats('cpu.use'), #{}}]; +get_vm_gauge('cpu.idle') -> + [{emqx_otel_cpu_sup:stats('cpu.idle'), #{}}]; get_vm_gauge(Name) -> [{emqx_mgmt:vm_stats(Name), #{}}]. @@ -254,8 +258,6 @@ create_counter(Meter, Counters, CallBack) -> Counters ). -%% Note: list_to_existing_atom("cpu.use") will crash -%% so we make sure the atom is already existing here normalize_name(cpu_use) -> 'cpu.use'; normalize_name(cpu_idle) -> diff --git a/apps/emqx_opentelemetry/src/emqx_otel_sup.erl b/apps/emqx_opentelemetry/src/emqx_otel_sup.erl index c2894d6e3..f06b5ec2b 100644 --- a/apps/emqx_opentelemetry/src/emqx_otel_sup.erl +++ b/apps/emqx_opentelemetry/src/emqx_otel_sup.erl @@ -42,7 +42,12 @@ init([]) -> }, Children = case emqx_conf:get([opentelemetry]) of - #{metrics := #{enable := false}} -> []; - #{metrics := #{enable := true}} = Conf -> [worker_spec(emqx_otel_metrics, Conf)] + #{metrics := #{enable := false}} -> + []; + #{metrics := #{enable := true}} = Conf -> + [ + worker_spec(emqx_otel_metrics, Conf), + worker_spec(emqx_otel_cpu_sup, Conf) + ] end, {ok, {SupFlags, Children}}.