diff --git a/apps/emqx/src/emqx_metrics_worker.erl b/apps/emqx/src/emqx_metrics_worker.erl index 241ba599f..5f41346cb 100644 --- a/apps/emqx/src/emqx_metrics_worker.erl +++ b/apps/emqx/src/emqx_metrics_worker.erl @@ -31,6 +31,7 @@ -export([ inc/3, inc/4, + observe/4, get/3, get_gauge/3, set_gauge/5, @@ -38,6 +39,8 @@ get_gauges/2, delete_gauges/2, get_rate/2, + get_slide/2, + get_slide/3, get_counters/2, create_metrics/3, create_metrics/4, @@ -67,7 +70,16 @@ -define(SAMPLING, 1). -endif. --export_type([metrics/0, handler_name/0, metric_id/0]). +-export_type([metrics/0, handler_name/0, metric_id/0, metric_spec/0]). + +% Default +-type metric_type() :: + %% Simple counter + counter + %% Sliding window average + | slide. + +-type metric_spec() :: {metric_type(), atom()}. -type rate() :: #{ current := float(), @@ -77,6 +89,7 @@ -type metrics() :: #{ counters := #{metric_name() => integer()}, gauges := #{metric_name() => integer()}, + slides := #{metric_name() => number()}, rate := #{metric_name() => rate()} }. -type handler_name() :: atom(). @@ -103,9 +116,22 @@ last5m_smpl = [] :: list() }). +-record(slide_datapoint, { + sum :: non_neg_integer(), + samples :: non_neg_integer(), + time :: non_neg_integer() +}). + +-record(slide, { + %% Total number of samples through the history + n_samples = 0 :: non_neg_integer(), + datapoints = [] :: [#slide_datapoint{}] +}). + -record(state, { metric_ids = sets:new(), - rates :: undefined | #{metric_id() => #rate{}} + rates :: #{metric_id() => #{metric_name() => #rate{}}} | undefined, + slides = #{} :: #{metric_id() => #{metric_name() => #slide{}}} }). %%------------------------------------------------------------------------------ @@ -126,14 +152,18 @@ child_spec(ChldName, Name) -> modules => [emqx_metrics_worker] }. --spec create_metrics(handler_name(), metric_id(), [metric_name()]) -> ok | {error, term()}. +-spec create_metrics(handler_name(), metric_id(), [metric_spec() | metric_name()]) -> + ok | {error, term()}. create_metrics(Name, Id, Metrics) -> - create_metrics(Name, Id, Metrics, Metrics). + Metrics1 = desugar(Metrics), + Counters = filter_counters(Metrics1), + create_metrics(Name, Id, Metrics1, Counters). --spec create_metrics(handler_name(), metric_id(), [metric_name()], [metric_name()]) -> +-spec create_metrics(handler_name(), metric_id(), [metric_spec() | metric_name()], [atom()]) -> ok | {error, term()}. create_metrics(Name, Id, Metrics, RateMetrics) -> - gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}). + Metrics1 = desugar(Metrics), + gen_server:call(Name, {create_metrics, Id, Metrics1, RateMetrics}). -spec clear_metrics(handler_name(), metric_id()) -> ok. clear_metrics(Name, Id) -> @@ -156,7 +186,7 @@ get(Name, Id, Metric) -> not_found -> 0; Ref when is_atom(Metric) -> - counters:get(Ref, idx_metric(Name, Id, Metric)); + counters:get(Ref, idx_metric(Name, Id, counter, Metric)); Ref when is_integer(Metric) -> counters:get(Ref, Metric) end. @@ -171,21 +201,37 @@ get_counters(Name, Id) -> fun(_Metric, Index) -> get(Name, Id, Index) end, - get_indexes(Name, Id) + get_indexes(Name, counter, Id) ). +-spec get_slide(handler_name(), metric_id()) -> map(). +get_slide(Name, Id) -> + gen_server:call(Name, {get_slide, Id}). + +%% Get the average for a specified sliding window period. +%% +%% It will only account for the samples recorded in the past `Window' seconds. +-spec get_slide(handler_name(), metric_id(), non_neg_integer()) -> number(). +get_slide(Name, Id, Window) -> + gen_server:call(Name, {get_slide, Id, Window}). + -spec reset_counters(handler_name(), metric_id()) -> ok. reset_counters(Name, Id) -> - Indexes = maps:values(get_indexes(Name, Id)), - Ref = get_ref(Name, Id), - lists:foreach(fun(Idx) -> counters:put(Ref, Idx, 0) end, Indexes). + case get_ref(Name, Id) of + not_found -> + ok; + Ref -> + #{size := Size} = counters:info(Ref), + lists:foreach(fun(Idx) -> counters:put(Ref, Idx, 0) end, lists:seq(1, Size)) + end. -spec get_metrics(handler_name(), metric_id()) -> metrics(). get_metrics(Name, Id) -> #{ rate => get_rate(Name, Id), counters => get_counters(Name, Id), - gauges => get_gauges(Name, Id) + gauges => get_gauges(Name, Id), + slides => get_slide(Name, Id) }. -spec inc(handler_name(), metric_id(), atom()) -> ok. @@ -194,7 +240,37 @@ inc(Name, Id, Metric) -> -spec inc(handler_name(), metric_id(), metric_name(), integer()) -> ok. inc(Name, Id, Metric, Val) -> - counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val). + counters:add(get_ref(Name, Id), idx_metric(Name, Id, counter, Metric), Val). + +%% Add a sample to the slide. +%% +%% Slide is short for "sliding window average" type of metric. +%% +%% It allows to monitor an average of some observed values in time, +%% and it's mainly used for performance analysis. For example, it can +%% be used to report run time of operations. +%% +%% Consider an example: +%% +%% ``` +%% emqx_metrics_worker:create_metrics(Name, Id, [{slide, a}]), +%% emqx_metrics_worker:observe(Name, Id, a, 10), +%% emqx_metrics_worker:observe(Name, Id, a, 30), +%% #{a := 20} = emqx_metrics_worker:get_slide(Name, Id, _Window = 1). +%% ''' +%% +%% After recording 2 samples, this metric becomes 20 (the average of 10 and 30). +%% +%% But after 1 second it becomes 0 again, unless new samples are recorded. +%% +-spec observe(handler_name(), metric_id(), atom(), integer()) -> ok. +observe(Name, Id, Metric, Val) -> + #{ref := CRef, slide := Idx} = maps:get(Id, get_pterm(Name)), + Index = maps:get(Metric, Idx), + %% Update sum: + counters:add(CRef, Index, Val), + %% Update number of samples: + counters:add(CRef, Index + 1, 1). -spec set_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok. set_gauge(Name, Id, WorkerId, Metric, Val) -> @@ -300,9 +376,9 @@ handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) -> handle_call( {create_metrics, Id, Metrics, RateMetrics}, _From, - State = #state{metric_ids = MIDs, rates = Rates} + State = #state{metric_ids = MIDs, rates = Rates, slides = Slides} ) -> - case RateMetrics -- Metrics of + case RateMetrics -- filter_counters(Metrics) of [] -> RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]), Rate1 = @@ -310,9 +386,11 @@ handle_call( undefined -> #{Id => RatePerId}; _ -> Rates#{Id => RatePerId} end, + Slides1 = Slides#{Id => create_slides(Metrics)}, {reply, create_counters(get_self_name(), Id, Metrics), State#state{ metric_ids = sets:add_element(Id, MIDs), - rates = Rate1 + rates = Rate1, + slides = Slides1 }}; _ -> {reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State} @@ -320,7 +398,7 @@ handle_call( handle_call( {delete_metrics, Id}, _From, - State = #state{metric_ids = MIDs, rates = Rates} + State = #state{metric_ids = MIDs, rates = Rates, slides = Slides} ) -> Name = get_self_name(), delete_counters(Name, Id), @@ -331,29 +409,43 @@ handle_call( case Rates of undefined -> undefined; _ -> maps:remove(Id, Rates) - end + end, + slides = maps:remove(Id, Slides) }}; handle_call( {reset_metrics, Id}, _From, - State = #state{rates = Rates} + State = #state{rates = Rates, slides = Slides} ) -> - Name = get_self_name(), - delete_gauges(Name, Id), - {reply, reset_counters(Name, Id), State#state{ + delete_gauges(get_self_name(), Id), + NewRates = + case Rates of + undefined -> + undefined; + _ -> + ResetRate = + maps:map( + fun(_Key, _Value) -> #rate{} end, + maps:get(Id, Rates, #{}) + ), + maps:put(Id, ResetRate, Rates) + end, + SlideSpecs = [{slide, I} || I <- maps:keys(maps:get(Id, Slides, #{}))], + NewSlides = Slides#{Id => create_slides(SlideSpecs)}, + {reply, reset_counters(get_self_name(), Id), State#state{ rates = - case Rates of - undefined -> - undefined; - _ -> - ResetRate = - maps:map( - fun(_Key, _Value) -> #rate{} end, - maps:get(Id, Rates, #{}) - ), - maps:put(Id, ResetRate, Rates) - end + NewRates, + slides = NewSlides }}; +handle_call({get_slide, Id}, _From, State = #state{slides = Slides}) -> + SlidesForID = maps:get(Id, Slides, #{}), + {reply, maps:map(fun(Metric, Slide) -> do_get_slide(Id, Metric, Slide) end, SlidesForID), + State}; +handle_call({get_slide, Id, Window}, _From, State = #state{slides = Slides}) -> + SlidesForID = maps:get(Id, Slides, #{}), + {reply, + maps:map(fun(Metric, Slide) -> do_get_slide(Window, Id, Metric, Slide) end, SlidesForID), + State}; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -363,7 +455,7 @@ handle_cast(_Msg, State) -> handle_info(ticking, State = #state{rates = undefined}) -> erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State}; -handle_info(ticking, State = #state{rates = Rates0}) -> +handle_info(ticking, State = #state{rates = Rates0, slides = Slides0}) -> Rates = maps:map( fun(Id, RatesPerID) -> @@ -376,8 +468,20 @@ handle_info(ticking, State = #state{rates = Rates0}) -> end, Rates0 ), + Slides = + maps:map( + fun(Id, SlidesPerID) -> + maps:map( + fun(Metric, Slide) -> + update_slide(Id, Metric, Slide) + end, + SlidesPerID + ) + end, + Slides0 + ), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), - {noreply, State#state{rates = Rates}}; + {noreply, State#state{rates = Rates, slides = Slides}}; handle_info(_Info, State) -> {noreply, State}. @@ -408,17 +512,18 @@ create_counters(_Name, _Id, []) -> error({create_counter_error, must_provide_a_list_of_metrics}); create_counters(Name, Id, Metrics) -> %% backup the old counters - OlderCounters = maps:with(Metrics, get_counters(Name, Id)), + OlderCounters = maps:with(filter_counters(Metrics), get_counters(Name, Id)), %% create the new counter - Size = length(Metrics), - Indexes = maps:from_list(lists:zip(Metrics, lists:seq(1, Size))), + {Size, Indexes} = create_metric_indexes(Metrics), Counters = get_pterm(Name), CntrRef = counters:new(Size, [write_concurrency]), persistent_term:put( ?CntrRef(Name), - Counters#{Id => #{ref => CntrRef, indexes => Indexes}} + Counters#{Id => Indexes#{ref => CntrRef}} ), - %% restore the old counters + %% Restore the old counters. Slides are not restored, since they + %% are periodically zeroed anyway. We do lose some samples in the + %% current interval, but that's acceptable for now. lists:foreach( fun({Metric, N}) -> inc(Name, Id, Metric, N) @@ -426,6 +531,16 @@ create_counters(Name, Id, Metrics) -> maps:to_list(OlderCounters) ). +create_metric_indexes(Metrics) -> + create_metric_indexes(Metrics, 1, [], []). + +create_metric_indexes([], Size, Counters, Slides) -> + {Size, #{counter => maps:from_list(Counters), slide => maps:from_list(Slides)}}; +create_metric_indexes([{counter, Id} | Rest], Index, Counters, Slides) -> + create_metric_indexes(Rest, Index + 1, [{Id, Index} | Counters], Slides); +create_metric_indexes([{slide, Id} | Rest], Index, Counters, Slides) -> + create_metric_indexes(Rest, Index + 2, Counters, [{Id, Index} | Slides]). + delete_counters(Name, Id) -> persistent_term:put(?CntrRef(Name), maps:remove(Id, get_pterm(Name))). @@ -435,12 +550,12 @@ get_ref(Name, Id) -> error -> not_found end. -idx_metric(Name, Id, Metric) -> - maps:get(Metric, get_indexes(Name, Id)). +idx_metric(Name, Id, Type, Metric) -> + maps:get(Metric, get_indexes(Name, Type, Id)). -get_indexes(Name, Id) -> +get_indexes(Name, Type, Id) -> case maps:find(Id, get_pterm(Name)) of - {ok, #{indexes := Indexes}} -> Indexes; + {ok, #{Type := Indexes}} -> Indexes; error -> #{} end. @@ -488,6 +603,53 @@ calculate_rate(CurrVal, #rate{ tick = Tick + 1 }. +do_get_slide(Id, Metric, S = #slide{n_samples = NSamples}) -> + #{ + n_samples => NSamples, + current => do_get_slide(2, Id, Metric, S), + last5m => do_get_slide(?SECS_5M, Id, Metric, S) + }. + +do_get_slide(Window, Id, Metric, #slide{datapoints = DP0}) -> + Datapoint = get_slide_datapoint(Id, Metric), + {N, Sum} = get_slide_window(os:system_time(second) - Window, [Datapoint | DP0], 0, 0), + case N > 0 of + true -> Sum div N; + false -> 0 + end. + +get_slide_window(_StartTime, [], N, S) -> + {N, S}; +get_slide_window(StartTime, [#slide_datapoint{time = T} | _], N, S) when T < StartTime -> + {N, S}; +get_slide_window(StartTime, [#slide_datapoint{samples = N, sum = S} | Rest], AccN, AccS) -> + get_slide_window(StartTime, Rest, AccN + N, AccS + S). + +get_slide_datapoint(Id, Metric) -> + Name = get_self_name(), + CRef = get_ref(Name, Id), + Index = idx_metric(Name, Id, slide, Metric), + Total = counters:get(CRef, Index), + N = counters:get(CRef, Index + 1), + #slide_datapoint{ + sum = Total, + samples = N, + time = os:system_time(second) + }. + +update_slide(Id, Metric, Slide0 = #slide{n_samples = NSamples, datapoints = DPs}) -> + Datapoint = get_slide_datapoint(Id, Metric), + %% Reset counters: + Name = get_self_name(), + CRef = get_ref(Name, Id), + Index = idx_metric(Name, Id, slide, Metric), + counters:put(CRef, Index, 0), + counters:put(CRef, Index + 1, 0), + Slide0#slide{ + datapoints = [Datapoint | lists:droplast(DPs)], + n_samples = Datapoint#slide_datapoint.samples + NSamples + }. + format_rates_of_id(RatesPerId) -> maps:map( fun(_Metric, Rates) -> @@ -510,6 +672,27 @@ precision(Float, N) -> Base = math:pow(10, N), round(Float * Base) / Base. +desugar(Metrics) -> + lists:map( + fun + (Atom) when is_atom(Atom) -> + {counter, Atom}; + (Spec = {_, _}) -> + Spec + end, + Metrics + ). + +filter_counters(Metrics) -> + [K || {counter, K} <- Metrics]. + +create_slides(Metrics) -> + EmptyDatapoints = [ + #slide_datapoint{sum = 0, samples = 0, time = 0} + || _ <- lists:seq(1, ?SECS_5M div ?SAMPLING) + ], + maps:from_list([{K, #slide{datapoints = EmptyDatapoints}} || {slide, K} <- Metrics]). + get_self_name() -> {registered_name, Name} = process_info(self(), registered_name), Name. diff --git a/apps/emqx/test/emqx_metrics_worker_SUITE.erl b/apps/emqx/test/emqx_metrics_worker_SUITE.erl index 113e8650f..194c9cc99 100644 --- a/apps/emqx/test/emqx_metrics_worker_SUITE.erl +++ b/apps/emqx/test/emqx_metrics_worker_SUITE.erl @@ -46,7 +46,7 @@ end_per_testcase(_, _Config) -> ok. t_get_metrics(_) -> - Metrics = [a, b, c], + Metrics = [a, b, c, {slide, d}], Id = <<"testid">>, ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics), %% all the metrics are set to zero at start @@ -73,6 +73,8 @@ t_get_metrics(_) -> ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), + ok = emqx_metrics_worker:observe(?NAME, Id, d, 10), + ok = emqx_metrics_worker:observe(?NAME, Id, d, 30), ct:sleep(1500), ?LET( #{ @@ -89,6 +91,9 @@ t_get_metrics(_) -> a := 1, b := 1, c := 2 + } = Counters, + slides := #{ + d := #{n_samples := 2, last5m := 20, current := _} } }, emqx_metrics_worker:get_metrics(?NAME, Id), @@ -100,7 +105,8 @@ t_get_metrics(_) -> ?assert(MaxB > 0), ?assert(MaxC > 0), ?assert(Inflight == 12), - ?assert(Queuing == 9) + ?assert(Queuing == 9), + ?assertNot(maps:is_key(d, Counters)) } ), ok = emqx_metrics_worker:clear_metrics(?NAME, Id). @@ -117,6 +123,7 @@ t_clear_metrics(_Config) -> c := #{current := 0.0, max := 0.0, last5m := 0.0} }, gauges := #{}, + slides := #{}, counters := #{ a := 0, b := 0, @@ -138,14 +145,15 @@ t_clear_metrics(_Config) -> #{ counters => #{}, gauges => #{}, - rate => #{current => 0.0, last5m => 0.0, max => 0.0} + rate => #{current => 0.0, last5m => 0.0, max => 0.0}, + slides => #{} }, emqx_metrics_worker:get_metrics(?NAME, Id) ), ok. t_reset_metrics(_) -> - Metrics = [a, b, c], + Metrics = [a, b, c, {slide, d}], Id = <<"testid">>, ok = emqx_metrics_worker:create_metrics(?NAME, Id, Metrics), %% all the metrics are set to zero at start @@ -161,6 +169,9 @@ t_reset_metrics(_) -> a := 0, b := 0, c := 0 + }, + slides := #{ + d := #{n_samples := 0, last5m := 0, current := 0} } }, emqx_metrics_worker:get_metrics(?NAME, Id) @@ -172,7 +183,12 @@ t_reset_metrics(_) -> ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id0, inflight, 5), ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id1, inflight, 7), ok = emqx_metrics_worker:set_gauge(?NAME, Id, worker_id2, queuing, 9), + ok = emqx_metrics_worker:observe(?NAME, Id, d, 100), + ok = emqx_metrics_worker:observe(?NAME, Id, d, 200), ct:sleep(1500), + ?assertMatch( + #{d := #{n_samples := 2}}, emqx_metrics_worker:get_slide(?NAME, <<"testid">>) + ), ok = emqx_metrics_worker:reset_metrics(?NAME, Id), ?LET( #{ @@ -186,6 +202,9 @@ t_reset_metrics(_) -> a := 0, b := 0, c := 0 + }, + slides := #{ + d := #{n_samples := 0, last5m := 0, current := 0} } }, emqx_metrics_worker:get_metrics(?NAME, Id), @@ -202,7 +221,7 @@ t_reset_metrics(_) -> ok = emqx_metrics_worker:clear_metrics(?NAME, Id). t_get_metrics_2(_) -> - Metrics = [a, b, c], + Metrics = [a, b, c, {slide, d}], Id = <<"testid">>, ok = emqx_metrics_worker:create_metrics( ?NAME, diff --git a/changes/v5.0.14/feat-9671.en.md b/changes/v5.0.14/feat-9671.en.md new file mode 100644 index 000000000..dd5ed5e4d --- /dev/null +++ b/changes/v5.0.14/feat-9671.en.md @@ -0,0 +1 @@ +Implement sliding window average metrics. diff --git a/changes/v5.0.14/feat-9671.zh.md b/changes/v5.0.14/feat-9671.zh.md new file mode 100644 index 000000000..cbcae01fe --- /dev/null +++ b/changes/v5.0.14/feat-9671.zh.md @@ -0,0 +1 @@ +实施滑动窗口平均度量。