From abb74056bd6a281b8c9b34d0c59b2ba2b2c0ec39 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 2 Nov 2020 12:12:06 +0100 Subject: [PATCH 1/2] refactor(metrics): Use modified moving average for topic metrics speed --- apps/emqx/src/emqx_mod_topic_metrics.erl | 39 +++++++++--------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/apps/emqx/src/emqx_mod_topic_metrics.erl b/apps/emqx/src/emqx_mod_topic_metrics.erl index cfcf1ae79..2041acb96 100644 --- a/apps/emqx/src/emqx_mod_topic_metrics.erl +++ b/apps/emqx/src/emqx_mod_topic_metrics.erl @@ -43,7 +43,6 @@ -export([ inc/2 , inc/3 , val/2 - , rate/2 , metrics/1 , register/1 , unregister/1 @@ -52,6 +51,10 @@ , all_registered_topics/0 ]). +%% Exposed for test only. +-export([ rate/2 + ]). + %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -78,13 +81,11 @@ ]). -define(TICKING_INTERVAL, 1). +-define(SPEED_AVERAGE_WINDOW_SIZE, 5). -record(speed, { last = 0 :: number(), - tick = 1 :: number(), - last_v = 0 :: number(), - acc = 0 :: number(), - samples = [] :: list() + last_v = 0 :: number() }). -record(state, { @@ -358,25 +359,15 @@ counters_size() -> number_of_registered_topics() -> proplists:get_value(size, ets:info(?TAB)). -calculate_speed(CurVal, #speed{last_v = LastVal, tick = Tick, acc = Acc, samples = Samples}) -> +calculate_speed(CurVal, #speed{last = Last, + last_v = LastVal + }) -> %% calculate the current speed based on the last value of the counter CurSpeed = (CurVal - LastVal) / ?TICKING_INTERVAL, + #speed{last = mma(?SPEED_AVERAGE_WINDOW_SIZE, Last, CurSpeed), + last_v = CurVal + }. - %% calculate the average speed in last 5 seconds - case Tick < 5 of - true -> - Acc1 = Acc + CurSpeed, - #speed{last = Acc1 / Tick, - last_v = CurVal, - acc = Acc1, - samples = Samples ++ [CurSpeed], - tick = Tick + 1}; - false -> - [FirstSpeed | Speeds] = Samples, - Acc1 = Acc + CurSpeed - FirstSpeed, - #speed{last = Acc1 / Tick, - last_v = CurVal, - acc = Acc1, - samples = Speeds ++ [CurSpeed], - tick = Tick} - end. +%% Modified Moving Average ref: https://en.wikipedia.org/wiki/Moving_average +mma(WindowSize, LastSpeed, CurSpeed) -> + (LastSpeed * (WindowSize - 1) + CurSpeed) / WindowSize. From 4eef9f691e4e181e2a2e43d80f39e777e0ce1403 Mon Sep 17 00:00:00 2001 From: "ayodele.akingbule" Date: Mon, 2 Nov 2020 23:51:44 +0100 Subject: [PATCH 2/2] refactor(metrics): Add short, medium and long windowsize and expose stats --- apps/emqx/src/emqx_mod_topic_metrics.erl | 54 ++++++++++++++----- .../test/emqx_mod_topic_metrics_SUITE.erl | 5 +- 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/apps/emqx/src/emqx_mod_topic_metrics.erl b/apps/emqx/src/emqx_mod_topic_metrics.erl index 2041acb96..9e7abd3a6 100644 --- a/apps/emqx/src/emqx_mod_topic_metrics.erl +++ b/apps/emqx/src/emqx_mod_topic_metrics.erl @@ -43,6 +43,7 @@ -export([ inc/2 , inc/3 , val/2 + , rate/2 , metrics/1 , register/1 , unregister/1 @@ -51,9 +52,8 @@ , all_registered_topics/0 ]). -%% Exposed for test only. --export([ rate/2 - ]). +%% stats. +-export([ rates/2 ]). %% gen_server callbacks -export([ init/1 @@ -82,10 +82,14 @@ -define(TICKING_INTERVAL, 1). -define(SPEED_AVERAGE_WINDOW_SIZE, 5). +-define(SPEED_MEDIUM_WINDOW_SIZE, 60). +-define(SPEED_LONG_WINDOW_SIZE, 300). -record(speed, { last = 0 :: number(), - last_v = 0 :: number() + last_v = 0 :: number(), + last_medium = 0 :: number(), + last_long = 0 :: number() }). -record(state, { @@ -181,7 +185,15 @@ val(Topic, Metric) -> end. rate(Topic, Metric) -> - gen_server:call(?MODULE, {get_rate, Topic, Metric}). + case rates(Topic, Metric) of + #{short := Last} -> + Last; + {error, Reason} -> + {error, Reason} + end. + +rates(Topic, Metric) -> + gen_server:call(?MODULE, {get_rates, Topic, Metric}). metrics(Topic) -> case ets:lookup(?TAB, Topic) of @@ -254,7 +266,7 @@ handle_call({unregister, Topic}, _From, State = #state{speeds = Speeds}) -> {reply, ok, State#state{speeds = NSpeeds}} end; -handle_call({get_rate, Topic, Metric}, _From, State = #state{speeds = Speeds}) -> +handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) -> case is_registered(Topic) of false -> {reply, {error, topic_not_found}, State}; @@ -262,8 +274,8 @@ handle_call({get_rate, Topic, Metric}, _From, State = #state{speeds = Speeds}) - case maps:get({Topic, Metric}, Speeds, undefined) of undefined -> {reply, {error, invalid_metric}, State}; - #speed{last = Last} -> - {reply, Last, State} + #speed{last = Short, last_medium = Medium, last_long = Long} -> + {reply, #{ short => Short, medium => Medium, long => Long }, State} end end. @@ -360,14 +372,28 @@ number_of_registered_topics() -> proplists:get_value(size, ets:info(?TAB)). calculate_speed(CurVal, #speed{last = Last, - last_v = LastVal - }) -> + last_v = LastVal, + last_medium = LastMedium, + last_long = LastLong +}) -> %% calculate the current speed based on the last value of the counter CurSpeed = (CurVal - LastVal) / ?TICKING_INTERVAL, - #speed{last = mma(?SPEED_AVERAGE_WINDOW_SIZE, Last, CurSpeed), - last_v = CurVal - }. + #speed{ + last_v = CurVal, + last = short_mma(Last, CurSpeed), + last_medium = medium_mma(LastMedium, CurSpeed), + last_long = long_mma(LastLong, CurSpeed) + }. %% Modified Moving Average ref: https://en.wikipedia.org/wiki/Moving_average mma(WindowSize, LastSpeed, CurSpeed) -> - (LastSpeed * (WindowSize - 1) + CurSpeed) / WindowSize. + (LastSpeed * (WindowSize - 1) + CurSpeed) / WindowSize. + +short_mma(LastSpeed, CurSpeed) -> + mma(?SPEED_AVERAGE_WINDOW_SIZE, LastSpeed, CurSpeed). + +medium_mma(LastSpeed, CurSpeed) -> + mma(?SPEED_MEDIUM_WINDOW_SIZE, LastSpeed, CurSpeed). + +long_mma(LastSpeed, CurSpeed) -> + mma(?SPEED_LONG_WINDOW_SIZE, LastSpeed, CurSpeed). diff --git a/apps/emqx/test/emqx_mod_topic_metrics_SUITE.erl b/apps/emqx/test/emqx_mod_topic_metrics_SUITE.erl index 4ac1b00eb..188b5881b 100644 --- a/apps/emqx/test/emqx_mod_topic_metrics_SUITE.erl +++ b/apps/emqx/test/emqx_mod_topic_metrics_SUITE.erl @@ -36,11 +36,13 @@ t_nonexistent_topic_metrics(_) -> ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in')), + ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'messages.in')), emqx_mod_topic_metrics:register(<<"a/b/c">>), ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')), ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')), ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')), + ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')), emqx_mod_topic_metrics:unregister(<<"a/b/c">>), emqx_mod_topic_metrics:unload([]). @@ -57,6 +59,7 @@ t_topic_metrics(_) -> ?assertEqual(ok, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assert(emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0), + ?assert(emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= #{long => 0,medium => 0,short => 0}), emqx_mod_topic_metrics:unregister(<<"a/b/c">>), emqx_mod_topic_metrics:unload([]). @@ -89,4 +92,4 @@ t_hook(_) -> ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), emqx_mod_topic_metrics:unregister(<<"a/b/c">>), - emqx_mod_topic_metrics:unload([]). \ No newline at end of file + emqx_mod_topic_metrics:unload([]).