%%-------------------------------------------------------------------- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_topic_metrics). -behaviour(gen_server). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -export([ on_message_publish/1 , on_message_delivered/2 , on_message_dropped/3 ]). %% API functions -export([ start_link/0 , stop/0 ]). -export([ enable/0 , disable/0 ]). -export([ max_limit/0]). -export([ metrics/0 , metrics/1 , register/1 , deregister/1 , deregister_all/0 , is_registered/1 , all_registered_topics/0 , reset/0 , reset/1 ]). %% gen_server callbacks -export([ init/1 , handle_call/3 , handle_info/2 , handle_cast/2 , terminate/2 ]). -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). -endif. -define(MAX_TOPICS, 512). -define(TAB, ?MODULE). -define(TOPIC_METRICS, ['messages.in', 'messages.out', 'messages.qos0.in', 'messages.qos0.out', 'messages.qos1.in', 'messages.qos1.out', 'messages.qos2.in', 'messages.qos2.out', 'messages.dropped' ]). -define(TICKING_INTERVAL, 1). -define(SPEED_AVERAGE_WINDOW_SIZE, 5). -define(SPEED_MEDIUM_WINDOW_SIZE, 60). -define(SPEED_LONG_WINDOW_SIZE, 300). -record(speed, { last = 0 :: number(), last_v = 0 :: number(), last_medium = 0 :: number(), last_long = 0 :: number() }). -record(state, { speeds :: #{{binary(), atom()} => #speed{}} }). %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ max_limit() -> ?MAX_TOPICS. enable() -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}), emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}). disable() -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}), emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}), deregister_all(). on_message_publish(#message{topic = Topic, qos = QoS}) -> case is_registered(Topic) of true -> try_inc(Topic, 'messages.in'), case QoS of ?QOS_0 -> inc(Topic, 'messages.qos0.in'); ?QOS_1 -> inc(Topic, 'messages.qos1.in'); ?QOS_2 -> inc(Topic, 'messages.qos2.in') end; false -> ok end. on_message_delivered(_, #message{topic = Topic, qos = QoS}) -> case is_registered(Topic) of true -> try_inc(Topic, 'messages.out'), case QoS of ?QOS_0 -> inc(Topic, 'messages.qos0.out'); ?QOS_1 -> inc(Topic, 'messages.qos1.out'); ?QOS_2 -> inc(Topic, 'messages.qos2.out') end; false -> ok end. on_message_dropped(#message{topic = Topic}, _, _) -> case is_registered(Topic) of true -> inc(Topic, 'messages.dropped'); false -> ok end. start_link() -> Opts = emqx_conf:get([topic_metrics], []), gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). stop() -> gen_server:stop(?MODULE). metrics() -> [format(TopicMetrics) || TopicMetrics <- ets:tab2list(?TAB)]. metrics(Topic) -> case ets:lookup(?TAB, Topic) of [] -> {error, topic_not_found}; [TopicMetrics] -> format(TopicMetrics) end. register(Topic) when is_binary(Topic) -> gen_server:call(?MODULE, {register, Topic}). deregister(Topic) when is_binary(Topic) -> gen_server:call(?MODULE, {deregister, Topic}). deregister_all() -> gen_server:call(?MODULE, {deregister, all}). is_registered(Topic) -> ets:member(?TAB, Topic). all_registered_topics() -> [Topic || {Topic, _} <- ets:tab2list(?TAB)]. reset(Topic) -> case is_registered(Topic) of true -> gen_server:call(?MODULE, {reset, Topic}); false -> {error, topic_not_found} end. reset() -> gen_server:call(?MODULE, {reset, all}). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([Opts]) -> erlang:process_flag(trap_exit, true), ok = emqx_tables:new(?TAB, [{read_concurrency, true}]), erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking), Fun = fun(#{topic := Topic}, CurrentSpeeds) -> case do_register(Topic, CurrentSpeeds) of {ok, NSpeeds} -> NSpeeds; {error, already_existed} -> CurrentSpeeds; {error, quota_exceeded} -> error("max topic metrics quota exceeded") end end, {ok, #state{speeds = lists:foldl(Fun, #{}, Opts)}, hibernate}. handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) -> case do_register(Topic, Speeds) of {ok, NSpeeds} -> {reply, ok, State#state{speeds = NSpeeds}}; Error -> {reply, Error, State} end; handle_call({deregister, all}, _From, State) -> true = ets:delete_all_objects(?TAB), update_config([]), {reply, ok, State#state{speeds = #{}}}; handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) -> case is_registered(Topic) of false -> {reply, {error, topic_not_found}, State}; true -> true = ets:delete(?TAB, Topic), NSpeeds = lists:foldl(fun(Metric, Acc) -> maps:remove({Topic, Metric}, Acc) end, Speeds, ?TOPIC_METRICS), remove_topic_config(Topic), {reply, ok, State#state{speeds = NSpeeds}} end; handle_call({reset, all}, _From, State = #state{speeds = Speeds}) -> Fun = fun(T, NSpeeds) -> reset_topic(T, NSpeeds) end, {reply, ok, State#state{speeds = lists:foldl(Fun, Speeds, ets:tab2list(?TAB))}}; handle_call({reset, Topic}, _From, State = #state{speeds = Speeds}) -> NSpeeds = reset_topic(Topic, Speeds), {reply, ok, State#state{speeds = NSpeeds}}; handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) -> case is_registered(Topic) of false -> {reply, {error, topic_not_found}, State}; true -> case maps:get({Topic, Metric}, Speeds, undefined) of undefined -> {reply, {error, invalid_metric}, State}; #speed{last = Short, last_medium = Medium, last_long = Long} -> {reply, #{short => Short, medium => Medium, long => Long }, State} end end. handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. handle_info(ticking, State = #state{speeds = Speeds}) -> NSpeeds = maps:map( fun({Topic, Metric}, Speed) -> case val(Topic, Metric) of {error, topic_not_found} -> maps:remove({Topic, Metric}, Speeds); Val -> calculate_speed(Val, Speed) end end, Speeds), erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking), {noreply, State#state{speeds = NSpeeds}}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. terminate(_Reason, _State) -> ok. reset_topic({Topic, Data}, Speeds) -> CRef = maps:get(counter_ref, Data), ok = reset_counter(CRef), ResetTime = emqx_rule_funcs:now_rfc3339(), true = ets:insert(?TAB, {Topic, Data#{reset_time => ResetTime}}), Fun = fun(Metric, CurrentSpeeds) -> maps:put({Topic, Metric}, #speed{}, CurrentSpeeds) end, lists:foldl(Fun, Speeds, ?TOPIC_METRICS); reset_topic(Topic, Speeds) -> T = hd(ets:lookup(?TAB, Topic)), reset_topic(T, Speeds). %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ do_register(Topic, Speeds) -> case is_registered(Topic) of true -> {error, already_existed}; false -> case {number_of_registered_topics() < ?MAX_TOPICS, emqx_topic:wildcard(Topic)} of {true, false} -> CreateTime = emqx_rule_funcs:now_rfc3339(), CRef = counters:new(counters_size(), [write_concurrency]), ok = reset_counter(CRef), Data = #{ counter_ref => CRef, create_time => CreateTime}, true = ets:insert(?TAB, {Topic, Data}), NSpeeds = lists:foldl(fun(Metric, Acc) -> maps:put({Topic, Metric}, #speed{}, Acc) end, Speeds, ?TOPIC_METRICS), add_topic_config(Topic), {ok, NSpeeds}; {true, true} -> {error, bad_topic}; {false, false} -> {error, quota_exceeded}; {false, true} -> {error, {quota_exceeded, bad_topic}} end end. format({Topic, Data}) -> CRef = maps:get(counter_ref, Data), Fun = fun(Key, Metrics) -> CounterKey = to_count(Key), Counter = counters:get(CRef, metric_idx(Key)), RateKey = to_rate(Key), Rate = emqx_rule_funcs:float(rate(Topic, Key), 4), maps:put(RateKey, Rate, maps:put(CounterKey, Counter, Metrics)) end, Metrics = lists:foldl(Fun, #{}, ?TOPIC_METRICS), CreateTime = maps:get(create_time, Data), TopicMetrics = #{ topic => Topic, metrics => Metrics, create_time => CreateTime }, case maps:get(reset_time, Data, undefined) of undefined -> TopicMetrics; ResetTime -> TopicMetrics#{reset_time => ResetTime} end. remove_topic_config(Topic) when is_binary(Topic) -> Topics = emqx_config:get_raw([<<"topic_metrics">>], []) -- [#{<<"topic">> => Topic}], update_config(Topics). add_topic_config(Topic) when is_binary(Topic) -> Topics = emqx_config:get_raw([<<"topic_metrics">>], []) ++ [#{<<"topic">> => Topic}], update_config(lists:usort(Topics)). update_config(Topics) when is_list(Topics) -> {ok, _} = emqx:update_config([topic_metrics], Topics), ok. try_inc(Topic, Metric) -> _ = inc(Topic, Metric), ok. inc(Topic, Metric) -> inc(Topic, Metric, 1). inc(Topic, Metric, Val) -> case get_counters(Topic) of {error, topic_not_found} -> {error, topic_not_found}; CRef -> case metric_idx(Metric) of {error, invalid_metric} -> {error, invalid_metric}; Idx -> counters:add(CRef, Idx, Val) end end. val(Topic, Metric) -> case ets:lookup(?TAB, Topic) of [] -> {error, topic_not_found}; [{Topic, Data}] -> CRef = maps:get(counter_ref, Data), case metric_idx(Metric) of {error, invalid_metric} -> {error, invalid_metric}; Idx -> counters:get(CRef, Idx) end end. rate(Topic, Metric) -> case gen_server:call(?MODULE, {get_rates, Topic, Metric}) of #{short := Last} -> Last; {error, Reason} -> {error, Reason} end. metric_idx('messages.in') -> 01; metric_idx('messages.out') -> 02; metric_idx('messages.qos0.in') -> 03; metric_idx('messages.qos0.out') -> 04; metric_idx('messages.qos1.in') -> 05; metric_idx('messages.qos1.out') -> 06; metric_idx('messages.qos2.in') -> 07; metric_idx('messages.qos2.out') -> 08; metric_idx('messages.dropped') -> 09; metric_idx(_) -> {error, invalid_metric}. to_count('messages.in') -> 'messages.in.count'; to_count('messages.out') -> 'messages.out.count'; to_count('messages.qos0.in') -> 'messages.qos0.in.count'; to_count('messages.qos0.out') -> 'messages.qos0.out.count'; to_count('messages.qos1.in') -> 'messages.qos1.in.count'; to_count('messages.qos1.out') -> 'messages.qos1.out.count'; to_count('messages.qos2.in') -> 'messages.qos2.in.count'; to_count('messages.qos2.out') -> 'messages.qos2.out.count'; to_count('messages.dropped') -> 'messages.dropped.count'. to_rate('messages.in') -> 'messages.in.rate'; to_rate('messages.out') -> 'messages.out.rate'; to_rate('messages.qos0.in') -> 'messages.qos0.in.rate'; to_rate('messages.qos0.out') -> 'messages.qos0.out.rate'; to_rate('messages.qos1.in') -> 'messages.qos1.in.rate'; to_rate('messages.qos1.out') -> 'messages.qos1.out.rate'; to_rate('messages.qos2.in') -> 'messages.qos2.in.rate'; to_rate('messages.qos2.out') -> 'messages.qos2.out.rate'; to_rate('messages.dropped') -> 'messages.dropped.rate'. reset_counter(CRef) -> [counters:put(CRef, Idx, 0) || Idx <- lists:seq(1, counters_size())], ok. get_counters(Topic) -> case ets:lookup(?TAB, Topic) of [] -> {error, topic_not_found}; [{Topic, Data}] -> maps:get(counter_ref, Data) end. counters_size() -> length(?TOPIC_METRICS). number_of_registered_topics() -> proplists:get_value(size, ets:info(?TAB)). calculate_speed(CurVal, #speed{last = Last, last_v = LastVal, last_medium = LastMedium, last_long = LastLong }) -> %% calculate the current speed based on the last value of the counter CurSpeed = (CurVal - LastVal) / ?TICKING_INTERVAL, #speed{ last_v = CurVal, last = short_mma(Last, CurSpeed), last_medium = medium_mma(LastMedium, CurSpeed), last_long = long_mma(LastLong, CurSpeed) }. %% Modified Moving Average ref: https://en.wikipedia.org/wiki/Moving_average mma(WindowSize, LastSpeed, CurSpeed) -> (LastSpeed * (WindowSize - 1) + CurSpeed) / WindowSize. short_mma(LastSpeed, CurSpeed) -> mma(?SPEED_AVERAGE_WINDOW_SIZE, LastSpeed, CurSpeed). medium_mma(LastSpeed, CurSpeed) -> mma(?SPEED_MEDIUM_WINDOW_SIZE, LastSpeed, CurSpeed). long_mma(LastSpeed, CurSpeed) -> mma(?SPEED_LONG_WINDOW_SIZE, LastSpeed, CurSpeed).