refactor(metrics): Add short, medium and long windowsize and expose stats

This commit is contained in:
ayodele.akingbule 2020-11-02 23:51:44 +01:00 committed by Zaiming Shi
parent abb74056bd
commit 4eef9f691e
2 changed files with 44 additions and 15 deletions

View File

@ -43,6 +43,7 @@
-export([ inc/2 -export([ inc/2
, inc/3 , inc/3
, val/2 , val/2
, rate/2
, metrics/1 , metrics/1
, register/1 , register/1
, unregister/1 , unregister/1
@ -51,9 +52,8 @@
, all_registered_topics/0 , all_registered_topics/0
]). ]).
%% Exposed for test only. %% stats.
-export([ rate/2 -export([ rates/2 ]).
]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([ init/1
@ -82,10 +82,14 @@
-define(TICKING_INTERVAL, 1). -define(TICKING_INTERVAL, 1).
-define(SPEED_AVERAGE_WINDOW_SIZE, 5). -define(SPEED_AVERAGE_WINDOW_SIZE, 5).
-define(SPEED_MEDIUM_WINDOW_SIZE, 60).
-define(SPEED_LONG_WINDOW_SIZE, 300).
-record(speed, { -record(speed, {
last = 0 :: number(), last = 0 :: number(),
last_v = 0 :: number() last_v = 0 :: number(),
last_medium = 0 :: number(),
last_long = 0 :: number()
}). }).
-record(state, { -record(state, {
@ -181,7 +185,15 @@ val(Topic, Metric) ->
end. end.
rate(Topic, Metric) -> rate(Topic, Metric) ->
gen_server:call(?MODULE, {get_rate, Topic, Metric}). case rates(Topic, Metric) of
#{short := Last} ->
Last;
{error, Reason} ->
{error, Reason}
end.
rates(Topic, Metric) ->
gen_server:call(?MODULE, {get_rates, Topic, Metric}).
metrics(Topic) -> metrics(Topic) ->
case ets:lookup(?TAB, Topic) of case ets:lookup(?TAB, Topic) of
@ -254,7 +266,7 @@ handle_call({unregister, Topic}, _From, State = #state{speeds = Speeds}) ->
{reply, ok, State#state{speeds = NSpeeds}} {reply, ok, State#state{speeds = NSpeeds}}
end; end;
handle_call({get_rate, Topic, Metric}, _From, State = #state{speeds = Speeds}) -> handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) ->
case is_registered(Topic) of case is_registered(Topic) of
false -> false ->
{reply, {error, topic_not_found}, State}; {reply, {error, topic_not_found}, State};
@ -262,8 +274,8 @@ handle_call({get_rate, Topic, Metric}, _From, State = #state{speeds = Speeds}) -
case maps:get({Topic, Metric}, Speeds, undefined) of case maps:get({Topic, Metric}, Speeds, undefined) of
undefined -> undefined ->
{reply, {error, invalid_metric}, State}; {reply, {error, invalid_metric}, State};
#speed{last = Last} -> #speed{last = Short, last_medium = Medium, last_long = Long} ->
{reply, Last, State} {reply, #{ short => Short, medium => Medium, long => Long }, State}
end end
end. end.
@ -360,14 +372,28 @@ number_of_registered_topics() ->
proplists:get_value(size, ets:info(?TAB)). proplists:get_value(size, ets:info(?TAB)).
calculate_speed(CurVal, #speed{last = Last, calculate_speed(CurVal, #speed{last = Last,
last_v = LastVal last_v = LastVal,
}) -> last_medium = LastMedium,
last_long = LastLong
}) ->
%% calculate the current speed based on the last value of the counter %% calculate the current speed based on the last value of the counter
CurSpeed = (CurVal - LastVal) / ?TICKING_INTERVAL, CurSpeed = (CurVal - LastVal) / ?TICKING_INTERVAL,
#speed{last = mma(?SPEED_AVERAGE_WINDOW_SIZE, Last, CurSpeed), #speed{
last_v = CurVal last_v = CurVal,
}. last = short_mma(Last, CurSpeed),
last_medium = medium_mma(LastMedium, CurSpeed),
last_long = long_mma(LastLong, CurSpeed)
}.
%% Modified Moving Average ref: https://en.wikipedia.org/wiki/Moving_average %% Modified Moving Average ref: https://en.wikipedia.org/wiki/Moving_average
mma(WindowSize, LastSpeed, CurSpeed) -> mma(WindowSize, LastSpeed, CurSpeed) ->
(LastSpeed * (WindowSize - 1) + CurSpeed) / WindowSize. (LastSpeed * (WindowSize - 1) + CurSpeed) / WindowSize.
short_mma(LastSpeed, CurSpeed) ->
mma(?SPEED_AVERAGE_WINDOW_SIZE, LastSpeed, CurSpeed).
medium_mma(LastSpeed, CurSpeed) ->
mma(?SPEED_MEDIUM_WINDOW_SIZE, LastSpeed, CurSpeed).
long_mma(LastSpeed, CurSpeed) ->
mma(?SPEED_LONG_WINDOW_SIZE, LastSpeed, CurSpeed).

View File

@ -36,11 +36,13 @@ t_nonexistent_topic_metrics(_) ->
?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in')), ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'messages.in')),
emqx_mod_topic_metrics:register(<<"a/b/c">>), emqx_mod_topic_metrics:register(<<"a/b/c">>),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')), ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')), ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')), ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')),
emqx_mod_topic_metrics:unregister(<<"a/b/c">>), emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
emqx_mod_topic_metrics:unload([]). emqx_mod_topic_metrics:unload([]).
@ -57,6 +59,7 @@ t_topic_metrics(_) ->
?assertEqual(ok, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), ?assertEqual(ok, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assert(emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0), ?assert(emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0),
?assert(emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= #{long => 0,medium => 0,short => 0}),
emqx_mod_topic_metrics:unregister(<<"a/b/c">>), emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
emqx_mod_topic_metrics:unload([]). emqx_mod_topic_metrics:unload([]).
@ -89,4 +92,4 @@ t_hook(_) ->
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
emqx_mod_topic_metrics:unregister(<<"a/b/c">>), emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
emqx_mod_topic_metrics:unload([]). emqx_mod_topic_metrics:unload([]).