%%-------------------------------------------------------------------- %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_metrics_worker). -behaviour(gen_server). -include_lib("stdlib/include/ms_transform.hrl"). %% API functions -export([ start_link/1, stop/1, child_spec/1, child_spec/2 ]). -export([ inc/3, inc/4, observe/4, get/3, get_gauge/3, set_gauge/5, shift_gauge/5, get_gauges/2, delete_gauges/2, get_rate/2, get_slide/2, get_slide/3, get_counters/2, create_metrics/3, create_metrics/4, clear_metrics/2, reset_metrics/2, has_metrics/2 ]). -export([get_metrics/2]). %% gen_server callbacks -export([ init/1, handle_call/3, handle_info/2, handle_cast/2, code_change/3, terminate/2 ]). -ifndef(TEST). -define(SECS_5M, 300). -define(SAMPLING, 10). -else. %% Use 5 secs average rate instead of 5 mins in case of testing -define(SECS_5M, 5). -define(SAMPLING, 1). -endif. -export_type([metrics/0, handler_name/0, metric_id/0, metric_spec/0]). % Default -type metric_type() :: %% Simple counter counter %% Sliding window average | slide. -type metric_spec() :: {metric_type(), atom()}. -type rate() :: #{ current := float(), max := float(), last5m := float() }. -type metrics() :: #{ counters := #{metric_name() => integer()}, gauges := #{metric_name() => integer()}, slides := #{metric_name() => number()}, rate := #{metric_name() => rate()} }. -type handler_name() :: atom(). %% metric_id() is actually a resource id -type metric_id() :: binary() | atom(). -type metric_name() :: atom(). -type worker_id() :: term(). -define(CntrRef(Name), {?MODULE, Name}). -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)). -define(GAUGE_TABLE(NAME), list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(NAME) ++ "_gauge") ). -record(rate, { max = 0 :: number(), current = 0 :: number(), last5m = 0 :: number(), %% metadata for calculating the avg rate tick = 1 :: number(), last_v = 0 :: number(), %% metadata for calculating the 5min avg rate last5m_acc = 0 :: number(), last5m_smpl = [] :: list() }). -record(slide_datapoint, { sum :: non_neg_integer(), samples :: non_neg_integer(), time :: non_neg_integer() }). -record(slide, { %% Total number of samples through the history n_samples = 0 :: non_neg_integer(), datapoints = [] :: [#slide_datapoint{}] }). -record(state, { metric_ids = sets:new(), rates :: #{metric_id() => #{metric_name() => #rate{}}} | undefined, slides = #{} :: #{metric_id() => #{metric_name() => #slide{}}} }). %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ -spec child_spec(handler_name()) -> supervisor:child_spec(). child_spec(Name) -> child_spec(emqx_metrics_worker, Name). child_spec(ChldName, Name) -> #{ id => ChldName, start => {emqx_metrics_worker, start_link, [Name]}, restart => permanent, shutdown => 5000, type => worker, modules => [emqx_metrics_worker] }. -spec create_metrics(handler_name(), metric_id(), [metric_spec() | metric_name()]) -> ok | {error, term()}. create_metrics(Name, Id, Metrics) -> Metrics1 = desugar(Metrics), Counters = filter_counters(Metrics1), create_metrics(Name, Id, Metrics1, Counters). -spec create_metrics(handler_name(), metric_id(), [metric_spec() | metric_name()], [atom()]) -> ok | {error, term()}. create_metrics(Name, Id, Metrics, RateMetrics) -> Metrics1 = desugar(Metrics), gen_server:call(Name, {create_metrics, Id, Metrics1, RateMetrics}). -spec clear_metrics(handler_name(), metric_id()) -> ok. clear_metrics(Name, Id) -> gen_server:call(Name, {delete_metrics, Id}). -spec reset_metrics(handler_name(), metric_id()) -> ok. reset_metrics(Name, Id) -> gen_server:call(Name, {reset_metrics, Id}). -spec has_metrics(handler_name(), metric_id()) -> boolean(). has_metrics(Name, Id) -> case get_ref(Name, Id) of not_found -> false; _ -> true end. -spec get(handler_name(), metric_id(), metric_name() | integer()) -> number(). get(Name, Id, Metric) -> case get_ref(Name, Id) of not_found -> 0; Ref when is_atom(Metric) -> counters:get(Ref, idx_metric(Name, Id, counter, Metric)); Ref when is_integer(Metric) -> counters:get(Ref, Metric) end. -spec get_rate(handler_name(), metric_id()) -> map(). get_rate(Name, Id) -> gen_server:call(Name, {get_rate, Id}). -spec get_counters(handler_name(), metric_id()) -> map(). get_counters(Name, Id) -> maps:map( fun(_Metric, Index) -> get(Name, Id, Index) end, get_indexes(Name, counter, Id) ). -spec get_slide(handler_name(), metric_id()) -> map(). get_slide(Name, Id) -> gen_server:call(Name, {get_slide, Id}). %% Get the average for a specified sliding window period. %% %% It will only account for the samples recorded in the past `Window' seconds. -spec get_slide(handler_name(), metric_id(), non_neg_integer()) -> number(). get_slide(Name, Id, Window) -> gen_server:call(Name, {get_slide, Id, Window}). -spec reset_counters(handler_name(), metric_id()) -> ok. reset_counters(Name, Id) -> case get_ref(Name, Id) of not_found -> ok; Ref -> #{size := Size} = counters:info(Ref), lists:foreach(fun(Idx) -> counters:put(Ref, Idx, 0) end, lists:seq(1, Size)) end. -spec get_metrics(handler_name(), metric_id()) -> metrics(). get_metrics(Name, Id) -> #{ rate => get_rate(Name, Id), counters => get_counters(Name, Id), gauges => get_gauges(Name, Id), slides => get_slide(Name, Id) }. -spec inc(handler_name(), metric_id(), atom()) -> ok. inc(Name, Id, Metric) -> inc(Name, Id, Metric, 1). -spec inc(handler_name(), metric_id(), metric_name(), integer()) -> ok. inc(Name, Id, Metric, Val) -> counters:add(get_ref(Name, Id), idx_metric(Name, Id, counter, Metric), Val). %% Add a sample to the slide. %% %% Slide is short for "sliding window average" type of metric. %% %% It allows to monitor an average of some observed values in time, %% and it's mainly used for performance analysis. For example, it can %% be used to report run time of operations. %% %% Consider an example: %% %% ``` %% emqx_metrics_worker:create_metrics(Name, Id, [{slide, a}]), %% emqx_metrics_worker:observe(Name, Id, a, 10), %% emqx_metrics_worker:observe(Name, Id, a, 30), %% #{a := 20} = emqx_metrics_worker:get_slide(Name, Id, _Window = 1). %% ''' %% %% After recording 2 samples, this metric becomes 20 (the average of 10 and 30). %% %% But after 1 second it becomes 0 again, unless new samples are recorded. %% -spec observe(handler_name(), metric_id(), atom(), integer()) -> ok. observe(Name, Id, Metric, Val) -> #{ref := CRef, slide := Idx} = maps:get(Id, get_pterm(Name)), Index = maps:get(Metric, Idx), %% Update sum: counters:add(CRef, Index, Val), %% Update number of samples: counters:add(CRef, Index + 1, 1). -spec set_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok. set_gauge(Name, Id, WorkerId, Metric, Val) -> Table = ?GAUGE_TABLE(Name), try true = ets:insert(Table, {{Id, Metric, WorkerId}, Val}), ok catch error:badarg -> ok end. -spec shift_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok. shift_gauge(Name, Id, WorkerId, Metric, Val) -> Table = ?GAUGE_TABLE(Name), try _ = ets:update_counter( Table, {Id, Metric, WorkerId}, Val, {{Id, Metric, WorkerId}, 0} ), ok catch error:badarg -> ok end. -spec get_gauge(handler_name(), metric_id(), metric_name()) -> integer(). get_gauge(Name, Id, Metric) -> Table = ?GAUGE_TABLE(Name), MatchSpec = ets:fun2ms( fun({{Id0, Metric0, _WorkerId}, Val}) when Id0 =:= Id, Metric0 =:= Metric -> Val end ), try lists:sum(ets:select(Table, MatchSpec)) catch error:badarg -> 0 end. -spec get_gauges(handler_name(), metric_id()) -> map(). get_gauges(Name, Id) -> Table = ?GAUGE_TABLE(Name), MatchSpec = ets:fun2ms( fun({{Id0, Metric, _WorkerId}, Val}) when Id0 =:= Id -> {Metric, Val} end ), try lists:foldr( fun({Metric, Val}, Acc) -> maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc) end, #{}, ets:select(Table, MatchSpec) ) catch error:badarg -> #{} end. -spec delete_gauges(handler_name(), metric_id()) -> ok. delete_gauges(Name, Id) -> Table = ?GAUGE_TABLE(Name), MatchSpec = ets:fun2ms( fun({{Id0, _Metric, _WorkerId}, _Val}) when Id0 =:= Id -> true end ), try _ = ets:select_delete(Table, MatchSpec), ok catch error:badarg -> ok end. start_link(Name) -> gen_server:start_link({local, Name}, ?MODULE, Name, []). init(Name) -> erlang:process_flag(trap_exit, true), %% the rate metrics erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), persistent_term:put(?CntrRef(Name), #{}), _ = ets:new(?GAUGE_TABLE(Name), [named_table, ordered_set, public, {write_concurrency, true}]), {ok, #state{}}. handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) -> {reply, make_rate(0, 0, 0), State}; handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) -> {reply, case maps:get(Id, Rates, undefined) of undefined -> make_rate(0, 0, 0); RatesPerId -> format_rates_of_id(RatesPerId) end, State}; handle_call( {create_metrics, Id, Metrics, RateMetrics}, _From, State = #state{metric_ids = MIDs, rates = Rates, slides = Slides} ) -> case RateMetrics -- filter_counters(Metrics) of [] -> RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]), Rate1 = case Rates of undefined -> #{Id => RatePerId}; _ -> Rates#{Id => RatePerId} end, Slides1 = Slides#{Id => create_slides(Metrics)}, {reply, create_counters(get_self_name(), Id, Metrics), State#state{ metric_ids = sets:add_element(Id, MIDs), rates = Rate1, slides = Slides1 }}; _ -> {reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State} end; handle_call( {delete_metrics, Id}, _From, State = #state{metric_ids = MIDs, rates = Rates, slides = Slides} ) -> Name = get_self_name(), delete_counters(Name, Id), delete_gauges(Name, Id), {reply, ok, State#state{ metric_ids = sets:del_element(Id, MIDs), rates = case Rates of undefined -> undefined; _ -> maps:remove(Id, Rates) end, slides = maps:remove(Id, Slides) }}; handle_call( {reset_metrics, Id}, _From, State = #state{rates = Rates, slides = Slides} ) -> delete_gauges(get_self_name(), Id), NewRates = case Rates of undefined -> undefined; _ -> ResetRate = maps:map( fun(_Key, _Value) -> #rate{} end, maps:get(Id, Rates, #{}) ), maps:put(Id, ResetRate, Rates) end, SlideSpecs = [{slide, I} || I <- maps:keys(maps:get(Id, Slides, #{}))], NewSlides = Slides#{Id => create_slides(SlideSpecs)}, {reply, reset_counters(get_self_name(), Id), State#state{ rates = NewRates, slides = NewSlides }}; handle_call({get_slide, Id}, _From, State = #state{slides = Slides}) -> SlidesForID = maps:get(Id, Slides, #{}), {reply, maps:map(fun(Metric, Slide) -> do_get_slide(Id, Metric, Slide) end, SlidesForID), State}; handle_call({get_slide, Id, Window}, _From, State = #state{slides = Slides}) -> SlidesForID = maps:get(Id, Slides, #{}), {reply, maps:map(fun(Metric, Slide) -> do_get_slide(Window, Id, Metric, Slide) end, SlidesForID), State}; handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info(ticking, State = #state{rates = undefined}) -> erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State}; handle_info(ticking, State = #state{rates = Rates0, slides = Slides0}) -> Rates = maps:map( fun(Id, RatesPerID) -> maps:map( fun(Metric, Rate) -> calculate_rate(get(get_self_name(), Id, Metric), Rate) end, RatesPerID ) end, Rates0 ), Slides = maps:map( fun(Id, SlidesPerID) -> maps:map( fun(Metric, Slide) -> update_slide(Id, Metric, Slide) end, SlidesPerID ) end, Slides0 ), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State#state{rates = Rates, slides = Slides}}; handle_info(_Info, State) -> {noreply, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(_Reason, #state{metric_ids = MIDs}) -> Name = get_self_name(), [delete_counters(Name, Id) || Id <- sets:to_list(MIDs)], persistent_term:erase(?CntrRef(Name)). stop(Name) -> try gen_server:stop(Name, normal, 10000) catch exit:noproc -> ok; exit:timeout -> %% after timeout, the process killed by gen.erl ok end. %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ create_counters(_Name, _Id, []) -> error({create_counter_error, must_provide_a_list_of_metrics}); create_counters(Name, Id, Metrics) -> %% backup the old counters OlderCounters = maps:with(filter_counters(Metrics), get_counters(Name, Id)), %% create the new counter {Size, Indexes} = create_metric_indexes(Metrics), Counters = get_pterm(Name), CntrRef = counters:new(Size, [write_concurrency]), persistent_term:put( ?CntrRef(Name), Counters#{Id => Indexes#{ref => CntrRef}} ), %% Restore the old counters. Slides are not restored, since they %% are periodically zeroed anyway. We do lose some samples in the %% current interval, but that's acceptable for now. lists:foreach( fun({Metric, N}) -> inc(Name, Id, Metric, N) end, maps:to_list(OlderCounters) ). create_metric_indexes(Metrics) -> create_metric_indexes(Metrics, 1, [], []). create_metric_indexes([], Size, Counters, Slides) -> {Size, #{counter => maps:from_list(Counters), slide => maps:from_list(Slides)}}; create_metric_indexes([{counter, Id} | Rest], Index, Counters, Slides) -> create_metric_indexes(Rest, Index + 1, [{Id, Index} | Counters], Slides); create_metric_indexes([{slide, Id} | Rest], Index, Counters, Slides) -> create_metric_indexes(Rest, Index + 2, Counters, [{Id, Index} | Slides]). delete_counters(Name, Id) -> persistent_term:put(?CntrRef(Name), maps:remove(Id, get_pterm(Name))). get_ref(Name, Id) -> case maps:find(Id, get_pterm(Name)) of {ok, #{ref := Ref}} -> Ref; error -> not_found end. idx_metric(Name, Id, Type, Metric) -> maps:get(Metric, get_indexes(Name, Type, Id)). get_indexes(Name, Type, Id) -> case maps:find(Id, get_pterm(Name)) of {ok, #{Type := Indexes}} -> Indexes; error -> #{} end. get_pterm(Name) -> persistent_term:get(?CntrRef(Name), #{}). calculate_rate(_CurrVal, undefined) -> undefined; calculate_rate(CurrVal, #rate{ max = MaxRate0, last_v = LastVal, tick = Tick, last5m_acc = AccRate5Min0, last5m_smpl = Last5MinSamples0 }) -> %% calculate the current rate based on the last value of the counter CurrRate = (CurrVal - LastVal) / ?SAMPLING, %% calculate the max rate since the emqx startup MaxRate = case MaxRate0 >= CurrRate of true -> MaxRate0; false -> CurrRate end, %% calculate the average rate in last 5 mins {Last5MinSamples, Acc5Min, Last5Min} = case Tick =< ?SAMPCOUNT_5M of true -> Acc = AccRate5Min0 + CurrRate, {lists:reverse([CurrRate | lists:reverse(Last5MinSamples0)]), Acc, Acc / Tick}; false -> [FirstRate | Rates] = Last5MinSamples0, Acc = AccRate5Min0 + CurrRate - FirstRate, {lists:reverse([CurrRate | lists:reverse(Rates)]), Acc, Acc / ?SAMPCOUNT_5M} end, #rate{ max = MaxRate, current = CurrRate, last5m = Last5Min, last_v = CurrVal, last5m_acc = Acc5Min, last5m_smpl = Last5MinSamples, tick = Tick + 1 }. do_get_slide(Id, Metric, S = #slide{n_samples = NSamples}) -> #{ n_samples => NSamples, current => do_get_slide(2, Id, Metric, S), last5m => do_get_slide(?SECS_5M, Id, Metric, S) }. do_get_slide(Window, Id, Metric, #slide{datapoints = DP0}) -> Datapoint = get_slide_datapoint(Id, Metric), {N, Sum} = get_slide_window(os:system_time(second) - Window, [Datapoint | DP0], 0, 0), case N > 0 of true -> Sum div N; false -> 0 end. get_slide_window(_StartTime, [], N, S) -> {N, S}; get_slide_window(StartTime, [#slide_datapoint{time = T} | _], N, S) when T < StartTime -> {N, S}; get_slide_window(StartTime, [#slide_datapoint{samples = N, sum = S} | Rest], AccN, AccS) -> get_slide_window(StartTime, Rest, AccN + N, AccS + S). get_slide_datapoint(Id, Metric) -> Name = get_self_name(), CRef = get_ref(Name, Id), Index = idx_metric(Name, Id, slide, Metric), Total = counters:get(CRef, Index), N = counters:get(CRef, Index + 1), #slide_datapoint{ sum = Total, samples = N, time = os:system_time(second) }. update_slide(Id, Metric, Slide0 = #slide{n_samples = NSamples, datapoints = DPs}) -> Datapoint = get_slide_datapoint(Id, Metric), %% Reset counters: Name = get_self_name(), CRef = get_ref(Name, Id), Index = idx_metric(Name, Id, slide, Metric), counters:put(CRef, Index, 0), counters:put(CRef, Index + 1, 0), Slide0#slide{ datapoints = [Datapoint | lists:droplast(DPs)], n_samples = Datapoint#slide_datapoint.samples + NSamples }. format_rates_of_id(RatesPerId) -> maps:map( fun(_Metric, Rates) -> format_rate(Rates) end, RatesPerId ). format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) -> make_rate(Current, Max, Last5Min). make_rate(Current, Max, Last5Min) -> #{ current => precision(Current, 2), max => precision(Max, 2), last5m => precision(Last5Min, 2) }. precision(Float, N) -> Base = math:pow(10, N), round(Float * Base) / Base. desugar(Metrics) -> lists:map( fun (Atom) when is_atom(Atom) -> {counter, Atom}; (Spec = {_, _}) -> Spec end, Metrics ). filter_counters(Metrics) -> [K || {counter, K} <- Metrics]. create_slides(Metrics) -> EmptyDatapoints = [ #slide_datapoint{sum = 0, samples = 0, time = 0} || _ <- lists:seq(1, ?SECS_5M div ?SAMPLING) ], maps:from_list([{K, #slide{datapoints = EmptyDatapoints}} || {slide, K} <- Metrics]). get_self_name() -> {registered_name, Name} = process_info(self(), registered_name), Name.