emqx/apps/emqx_modules/src/emqx_topic_metrics.erl

500 lines
15 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_topic_metrics).
-behaviour(gen_server).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-export([ on_message_publish/1
, on_message_delivered/2
, on_message_dropped/3
]).
%% API functions
-export([ start_link/0
, stop/0
]).
-export([ enable/0
, disable/0
]).
-export([ max_limit/0]).
-export([ metrics/0
, metrics/1
, register/1
, deregister/1
, deregister_all/0
, is_registered/1
, all_registered_topics/0
, reset/0
, reset/1
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_info/2
, handle_cast/2
, terminate/2
]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-define(MAX_TOPICS, 512).
-define(TAB, ?MODULE).
-define(TOPIC_METRICS,
['messages.in',
'messages.out',
'messages.qos0.in',
'messages.qos0.out',
'messages.qos1.in',
'messages.qos1.out',
'messages.qos2.in',
'messages.qos2.out',
'messages.dropped'
]).
-define(TICKING_INTERVAL, 1).
-define(SPEED_AVERAGE_WINDOW_SIZE, 5).
-define(SPEED_MEDIUM_WINDOW_SIZE, 60).
-define(SPEED_LONG_WINDOW_SIZE, 300).
-record(speed, {
last = 0 :: number(),
last_v = 0 :: number(),
last_medium = 0 :: number(),
last_long = 0 :: number()
}).
-record(state, {
speeds :: #{{binary(), atom()} => #speed{}}
}).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
max_limit() ->
?MAX_TOPICS.
enable() ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}),
emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}).
disable() ->
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}),
emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}),
deregister_all().
on_message_publish(#message{topic = Topic, qos = QoS}) ->
case is_registered(Topic) of
true ->
try_inc(Topic, 'messages.in'),
case QoS of
?QOS_0 -> inc(Topic, 'messages.qos0.in');
?QOS_1 -> inc(Topic, 'messages.qos1.in');
?QOS_2 -> inc(Topic, 'messages.qos2.in')
end;
false ->
ok
end.
on_message_delivered(_, #message{topic = Topic, qos = QoS}) ->
case is_registered(Topic) of
true ->
try_inc(Topic, 'messages.out'),
case QoS of
?QOS_0 -> inc(Topic, 'messages.qos0.out');
?QOS_1 -> inc(Topic, 'messages.qos1.out');
?QOS_2 -> inc(Topic, 'messages.qos2.out')
end;
false ->
ok
end.
on_message_dropped(#message{topic = Topic}, _, _) ->
case is_registered(Topic) of
true ->
inc(Topic, 'messages.dropped');
false ->
ok
end.
start_link() ->
Opts = emqx_conf:get([topic_metrics], []),
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
stop() ->
gen_server:stop(?MODULE).
metrics() ->
[format(TopicMetrics) || TopicMetrics <- ets:tab2list(?TAB)].
metrics(Topic) ->
case ets:lookup(?TAB, Topic) of
[] ->
{error, topic_not_found};
[TopicMetrics] ->
format(TopicMetrics)
end.
register(Topic) when is_binary(Topic) ->
gen_server:call(?MODULE, {register, Topic}).
deregister(Topic) when is_binary(Topic) ->
gen_server:call(?MODULE, {deregister, Topic}).
deregister_all() ->
gen_server:call(?MODULE, {deregister, all}).
is_registered(Topic) ->
ets:member(?TAB, Topic).
all_registered_topics() ->
[Topic || {Topic, _} <- ets:tab2list(?TAB)].
reset(Topic) ->
case is_registered(Topic) of
true ->
gen_server:call(?MODULE, {reset, Topic});
false ->
{error, topic_not_found}
end.
reset() ->
gen_server:call(?MODULE, {reset, all}).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Opts]) ->
erlang:process_flag(trap_exit, true),
ok = emqx_tables:new(?TAB, [{read_concurrency, true}]),
erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking),
Fun =
fun(#{topic := Topic}, CurrentSpeeds) ->
case do_register(Topic, CurrentSpeeds) of
{ok, NSpeeds} ->
NSpeeds;
{error, already_existed} ->
CurrentSpeeds;
{error, quota_exceeded} ->
error("max topic metrics quota exceeded")
end
end,
{ok, #state{speeds = lists:foldl(Fun, #{}, Opts)}, hibernate}.
handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) ->
case do_register(Topic, Speeds) of
{ok, NSpeeds} ->
{reply, ok, State#state{speeds = NSpeeds}};
Error ->
{reply, Error, State}
end;
handle_call({deregister, all}, _From, State) ->
true = ets:delete_all_objects(?TAB),
update_config([]),
{reply, ok, State#state{speeds = #{}}};
handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) ->
case is_registered(Topic) of
false ->
{reply, {error, topic_not_found}, State};
true ->
true = ets:delete(?TAB, Topic),
NSpeeds = lists:foldl(fun(Metric, Acc) ->
maps:remove({Topic, Metric}, Acc)
end, Speeds, ?TOPIC_METRICS),
remove_topic_config(Topic),
{reply, ok, State#state{speeds = NSpeeds}}
end;
handle_call({reset, all}, _From, State = #state{speeds = Speeds}) ->
Fun =
fun(T, NSpeeds) ->
reset_topic(T, NSpeeds)
end,
{reply, ok, State#state{speeds = lists:foldl(Fun, Speeds, ets:tab2list(?TAB))}};
handle_call({reset, Topic}, _From, State = #state{speeds = Speeds}) ->
NSpeeds = reset_topic(Topic, Speeds),
{reply, ok, State#state{speeds = NSpeeds}};
handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) ->
case is_registered(Topic) of
false ->
{reply, {error, topic_not_found}, State};
true ->
case maps:get({Topic, Metric}, Speeds, undefined) of
undefined ->
{reply, {error, invalid_metric}, State};
#speed{last = Short, last_medium = Medium, last_long = Long} ->
{reply, #{short => Short, medium => Medium, long => Long }, State}
end
end.
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, State}.
handle_info(ticking, State = #state{speeds = Speeds}) ->
NSpeeds = maps:map(
fun({Topic, Metric}, Speed) ->
case val(Topic, Metric) of
{error, topic_not_found} -> maps:remove({Topic, Metric}, Speeds);
Val -> calculate_speed(Val, Speed)
end
end, Speeds),
erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking),
{noreply, State#state{speeds = NSpeeds}};
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
reset_topic({Topic, Data}, Speeds) ->
CRef = maps:get(counter_ref, Data),
ok = reset_counter(CRef),
ResetTime = emqx_rule_funcs:now_rfc3339(),
true = ets:insert(?TAB, {Topic, Data#{reset_time => ResetTime}}),
Fun =
fun(Metric, CurrentSpeeds) ->
maps:put({Topic, Metric}, #speed{}, CurrentSpeeds)
end,
lists:foldl(Fun, Speeds, ?TOPIC_METRICS);
reset_topic(Topic, Speeds) ->
T = hd(ets:lookup(?TAB, Topic)),
reset_topic(T, Speeds).
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
do_register(Topic, Speeds) ->
case is_registered(Topic) of
true ->
{error, already_existed};
false ->
case {number_of_registered_topics() < ?MAX_TOPICS, emqx_topic:wildcard(Topic)} of
{true, false} ->
CreateTime = emqx_rule_funcs:now_rfc3339(),
CRef = counters:new(counters_size(), [write_concurrency]),
ok = reset_counter(CRef),
Data = #{
counter_ref => CRef,
create_time => CreateTime},
true = ets:insert(?TAB, {Topic, Data}),
NSpeeds = lists:foldl(fun(Metric, Acc) ->
maps:put({Topic, Metric}, #speed{}, Acc)
end, Speeds, ?TOPIC_METRICS),
add_topic_config(Topic),
{ok, NSpeeds};
{true, true} ->
{error, bad_topic};
{false, false} ->
{error, quota_exceeded};
{false, true} ->
{error, {quota_exceeded, bad_topic}}
end
end.
format({Topic, Data}) ->
CRef = maps:get(counter_ref, Data),
Fun =
fun(Key, Metrics) ->
CounterKey = to_count(Key),
Counter = counters:get(CRef, metric_idx(Key)),
RateKey = to_rate(Key),
Rate = emqx_rule_funcs:float(rate(Topic, Key), 4),
maps:put(RateKey, Rate, maps:put(CounterKey, Counter, Metrics))
end,
Metrics = lists:foldl(Fun, #{}, ?TOPIC_METRICS),
CreateTime = maps:get(create_time, Data),
TopicMetrics = #{
topic => Topic,
metrics => Metrics,
create_time => CreateTime
},
case maps:get(reset_time, Data, undefined) of
undefined ->
TopicMetrics;
ResetTime ->
TopicMetrics#{reset_time => ResetTime}
end.
remove_topic_config(Topic) when is_binary(Topic) ->
Topics = emqx_config:get_raw([<<"topic_metrics">>], []) -- [#{<<"topic">> => Topic}],
update_config(Topics).
add_topic_config(Topic) when is_binary(Topic) ->
Topics = emqx_config:get_raw([<<"topic_metrics">>], []) ++ [#{<<"topic">> => Topic}],
update_config(lists:usort(Topics)).
update_config(Topics) when is_list(Topics) ->
{ok, _} = emqx:update_config([topic_metrics], Topics),
ok.
try_inc(Topic, Metric) ->
_ = inc(Topic, Metric),
ok.
inc(Topic, Metric) ->
inc(Topic, Metric, 1).
inc(Topic, Metric, Val) ->
case get_counters(Topic) of
{error, topic_not_found} ->
{error, topic_not_found};
CRef ->
case metric_idx(Metric) of
{error, invalid_metric} ->
{error, invalid_metric};
Idx ->
counters:add(CRef, Idx, Val)
end
end.
val(Topic, Metric) ->
case ets:lookup(?TAB, Topic) of
[] ->
{error, topic_not_found};
[{Topic, Data}] ->
CRef = maps:get(counter_ref, Data),
case metric_idx(Metric) of
{error, invalid_metric} ->
{error, invalid_metric};
Idx ->
counters:get(CRef, Idx)
end
end.
rate(Topic, Metric) ->
case gen_server:call(?MODULE, {get_rates, Topic, Metric}) of
#{short := Last} ->
Last;
{error, Reason} ->
{error, Reason}
end.
metric_idx('messages.in') -> 01;
metric_idx('messages.out') -> 02;
metric_idx('messages.qos0.in') -> 03;
metric_idx('messages.qos0.out') -> 04;
metric_idx('messages.qos1.in') -> 05;
metric_idx('messages.qos1.out') -> 06;
metric_idx('messages.qos2.in') -> 07;
metric_idx('messages.qos2.out') -> 08;
metric_idx('messages.dropped') -> 09;
metric_idx(_) ->
{error, invalid_metric}.
to_count('messages.in') ->
'messages.in.count';
to_count('messages.out') ->
'messages.out.count';
to_count('messages.qos0.in') ->
'messages.qos0.in.count';
to_count('messages.qos0.out') ->
'messages.qos0.out.count';
to_count('messages.qos1.in') ->
'messages.qos1.in.count';
to_count('messages.qos1.out') ->
'messages.qos1.out.count';
to_count('messages.qos2.in') ->
'messages.qos2.in.count';
to_count('messages.qos2.out') ->
'messages.qos2.out.count';
to_count('messages.dropped') ->
'messages.dropped.count'.
to_rate('messages.in') ->
'messages.in.rate';
to_rate('messages.out') ->
'messages.out.rate';
to_rate('messages.qos0.in') ->
'messages.qos0.in.rate';
to_rate('messages.qos0.out') ->
'messages.qos0.out.rate';
to_rate('messages.qos1.in') ->
'messages.qos1.in.rate';
to_rate('messages.qos1.out') ->
'messages.qos1.out.rate';
to_rate('messages.qos2.in') ->
'messages.qos2.in.rate';
to_rate('messages.qos2.out') ->
'messages.qos2.out.rate';
to_rate('messages.dropped') ->
'messages.dropped.rate'.
reset_counter(CRef) ->
[counters:put(CRef, Idx, 0) || Idx <- lists:seq(1, counters_size())],
ok.
get_counters(Topic) ->
case ets:lookup(?TAB, Topic) of
[] -> {error, topic_not_found};
[{Topic, Data}] -> maps:get(counter_ref, Data)
end.
counters_size() ->
length(?TOPIC_METRICS).
number_of_registered_topics() ->
proplists:get_value(size, ets:info(?TAB)).
calculate_speed(CurVal, #speed{last = Last,
last_v = LastVal,
last_medium = LastMedium,
last_long = LastLong
}) ->
%% calculate the current speed based on the last value of the counter
CurSpeed = (CurVal - LastVal) / ?TICKING_INTERVAL,
#speed{
last_v = CurVal,
last = short_mma(Last, CurSpeed),
last_medium = medium_mma(LastMedium, CurSpeed),
last_long = long_mma(LastLong, CurSpeed)
}.
%% Modified Moving Average ref: https://en.wikipedia.org/wiki/Moving_average
mma(WindowSize, LastSpeed, CurSpeed) ->
(LastSpeed * (WindowSize - 1) + CurSpeed) / WindowSize.
short_mma(LastSpeed, CurSpeed) ->
mma(?SPEED_AVERAGE_WINDOW_SIZE, LastSpeed, CurSpeed).
medium_mma(LastSpeed, CurSpeed) ->
mma(?SPEED_MEDIUM_WINDOW_SIZE, LastSpeed, CurSpeed).
long_mma(LastSpeed, CurSpeed) ->
mma(?SPEED_LONG_WINDOW_SIZE, LastSpeed, CurSpeed).