diff --git a/apps/emqx_modules/etc/emqx_modules.conf b/apps/emqx_modules/etc/emqx_modules.conf index 7b6f60cf1..3bbf8b52c 100644 --- a/apps/emqx_modules/etc/emqx_modules.conf +++ b/apps/emqx_modules/etc/emqx_modules.conf @@ -24,7 +24,7 @@ event_message { } topic_metrics { - topics = ["topic/#"] + topics = [] } rewrite { diff --git a/apps/emqx_modules/src/emqx_topic_metrics.erl b/apps/emqx_modules/src/emqx_topic_metrics.erl index fa226c9a0..3a7e5e3c0 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics.erl @@ -37,12 +37,17 @@ , disable/0 ]). --export([ metrics/1 +-export([ max_limit/0]). + +-export([ metrics/0 + , metrics/1 , register/1 - , unregister/1 - , unregister_all/0 + , deregister/1 + , deregister_all/0 , is_registered/1 , all_registered_topics/0 + , reset/0 + , reset/1 ]). %% gen_server callbacks @@ -92,6 +97,9 @@ %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ +max_limit() -> + ?MAX_TOPICS. + enable() -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}), @@ -100,7 +108,8 @@ enable() -> disable() -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}), - emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}). + emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}), + deregister_all(). on_message_publish(#message{topic = Topic, qos = QoS}) -> case is_registered(Topic) of @@ -143,77 +152,101 @@ start_link() -> stop() -> gen_server:stop(?MODULE). +metrics() -> + [format(TopicMetrics) || TopicMetrics <- ets:tab2list(?TAB)]. + metrics(Topic) -> case ets:lookup(?TAB, Topic) of [] -> {error, topic_not_found}; - [{Topic, CRef}] -> - lists:foldl(fun(Metric, Acc) -> - [{to_count(Metric), counters:get(CRef, metric_idx(Metric))}, - {to_rate(Metric), rate(Topic, Metric)} | Acc] - end, [], ?TOPIC_METRICS) + [TopicMetrics] -> + format(TopicMetrics) end. register(Topic) when is_binary(Topic) -> gen_server:call(?MODULE, {register, Topic}). -unregister(Topic) when is_binary(Topic) -> - gen_server:call(?MODULE, {unregister, Topic}). +deregister(Topic) when is_binary(Topic) -> + gen_server:call(?MODULE, {deregister, Topic}). -unregister_all() -> - gen_server:call(?MODULE, {unregister, all}). +deregister_all() -> + gen_server:call(?MODULE, {deregister, all}). is_registered(Topic) -> ets:member(?TAB, Topic). all_registered_topics() -> - [Topic || {Topic, _CRef} <- ets:tab2list(?TAB)]. + [Topic || {Topic, _} <- ets:tab2list(?TAB)]. + +reset(Topic) -> + case is_registered(Topic) of + true -> + gen_server:call(?MODULE, {reset, Topic}); + false -> + {error, topic_not_found} + end. + +reset() -> + gen_server:call(?MODULE, {reset, all}). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- -init([_Opts]) -> +init([Opts]) -> erlang:process_flag(trap_exit, true), ok = emqx_tables:new(?TAB, [{read_concurrency, true}]), erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking), - {ok, #state{speeds = #{}}, hibernate}. + Fun = + fun(Topic, CurrentSpeeds) -> + case do_register(Topic, CurrentSpeeds) of + {ok, NSpeeds} -> + NSpeeds; + {error, already_existed} -> + CurrentSpeeds; + {error, quota_exceeded} -> + error("max topic metrics quota exceeded") + end + end, + {ok, #state{speeds = lists:foldl(Fun, #{}, maps:get(topics, Opts, []))}, hibernate}. handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) -> - case is_registered(Topic) of - true -> - {reply, {error, already_existed}, State}; - false -> - case number_of_registered_topics() < ?MAX_TOPICS of - true -> - CRef = counters:new(counters_size(), [write_concurrency]), - true = ets:insert(?TAB, {Topic, CRef}), - [counters:put(CRef, Idx, 0) || Idx <- lists:seq(1, counters_size())], - NSpeeds = lists:foldl(fun(Metric, Acc) -> - maps:put({Topic, Metric}, #speed{}, Acc) - end, Speeds, ?TOPIC_METRICS), - {reply, ok, State#state{speeds = NSpeeds}}; - false -> - {reply, {error, quota_exceeded}, State} - end + case do_register(Topic, Speeds) of + {ok, NSpeeds} -> + {reply, ok, State#state{speeds = NSpeeds}}; + Error -> + {reply, Error, State} end; -handle_call({unregister, all}, _From, State) -> - [delete_counters(Topic) || {Topic, _CRef} <- ets:tab2list(?TAB)], +handle_call({deregister, all}, _From, State) -> + true = ets:delete_all_objects(?TAB), + update_config([]), {reply, ok, State#state{speeds = #{}}}; -handle_call({unregister, Topic}, _From, State = #state{speeds = Speeds}) -> +handle_call({deregister, Topic}, _From, State = #state{speeds = Speeds}) -> case is_registered(Topic) of false -> {reply, ok, State}; true -> - ok = delete_counters(Topic), + true = ets:delete(?TAB, Topic), NSpeeds = lists:foldl(fun(Metric, Acc) -> maps:remove({Topic, Metric}, Acc) end, Speeds, ?TOPIC_METRICS), + remove_topic_config(Topic), {reply, ok, State#state{speeds = NSpeeds}} end; +handle_call({reset, all}, _From, State = #state{speeds = Speeds}) -> + Fun = + fun(T, NSpeeds) -> + reset_topic(T, NSpeeds) + end, + {reply, ok, State#state{speeds = lists:foldl(Fun, Speeds, ets:tab2list(?TAB))}}; + +handle_call({reset, Topic}, _From, State = #state{speeds = Speeds}) -> + NSpeeds = reset_topic(Topic, Speeds), + {reply, ok, State#state{speeds = NSpeeds}}; + handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) -> case is_registered(Topic) of false -> @@ -249,9 +282,83 @@ handle_info(Info, State) -> terminate(_Reason, _State) -> ok. +reset_topic({Topic, Data}, Speeds) -> + CRef = maps:get(counter_ref, Data), + ok = reset_counter(CRef), + ResetTime = emqx_rule_funcs:now_rfc3339(), + true = ets:insert(?TAB, {Topic, Data#{reset_time => ResetTime}}), + Fun = + fun(Metric, CurrentSpeeds) -> + maps:put({Topic, Metric}, #speed{}, CurrentSpeeds) + end, + lists:foldl(Fun, Speeds, ?TOPIC_METRICS); +reset_topic(Topic, Speeds) -> + T = hd(ets:lookup(?TAB, Topic)), + reset_topic(T, Speeds). + %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ +do_register(Topic, Speeds) -> + case is_registered(Topic) of + true -> + {error, already_existed}; + false -> + case number_of_registered_topics() < ?MAX_TOPICS of + true -> + CreateTime = emqx_rule_funcs:now_rfc3339(), + CRef = counters:new(counters_size(), [write_concurrency]), + ok = reset_counter(CRef), + Data = #{ + counter_ref => CRef, + create_time => CreateTime}, + true = ets:insert(?TAB, {Topic, Data}), + NSpeeds = lists:foldl(fun(Metric, Acc) -> + maps:put({Topic, Metric}, #speed{}, Acc) + end, Speeds, ?TOPIC_METRICS), + add_topic_config(Topic), + {ok, NSpeeds}; + false -> + {error, quota_exceeded} + end + end. + +format({Topic, Data}) -> + CRef = maps:get(counter_ref, Data), + Fun = + fun(Key, Metrics) -> + CounterKey = to_count(Key), + Counter = counters:get(CRef, metric_idx(Key)), + RateKey = to_rate(Key), + Rate = emqx_rule_funcs:float(rate(Topic, Key), 4), + maps:put(RateKey, Rate, maps:put(CounterKey, Counter, Metrics)) + end, + Metrics = lists:foldl(Fun, #{}, ?TOPIC_METRICS), + CreateTime = maps:get(create_time, Data), + TopicMetrics = #{ + topic => Topic, + metrics => Metrics, + create_time => CreateTime + }, + case maps:get(reset_time, Data, undefined) of + undefined -> + TopicMetrics; + ResetTime -> + TopicMetrics#{reset_time => ResetTime} + end. + +remove_topic_config(Topic) when is_binary(Topic) -> + Topics = emqx_config:get_raw([<<"topic_metrics">>, <<"topics">>], []) -- [Topic], + update_config(Topics). + +add_topic_config(Topic) when is_binary(Topic) -> + Topics = emqx_config:get_raw([<<"topic_metrics">>, <<"topics">>], []) ++ [Topic], + update_config(Topics). + +update_config(Topics) when is_list(Topics) -> + Opts = emqx_config:get_raw([<<"topic_metrics">>], #{}), + {ok, _} = emqx:update_config([topic_metrics], maps:put(<<"topics">>, Topics, Opts)), + ok. try_inc(Topic, Metric) -> _ = inc(Topic, Metric), @@ -277,7 +384,8 @@ val(Topic, Metric) -> case ets:lookup(?TAB, Topic) of [] -> {error, topic_not_found}; - [{Topic, CRef}] -> + [{Topic, Data}] -> + CRef = maps:get(counter_ref, Data), case metric_idx(Metric) of {error, invalid_metric} -> {error, invalid_metric}; @@ -344,14 +452,14 @@ to_rate('messages.qos2.out') -> to_rate('messages.dropped') -> 'messages.dropped.rate'. -delete_counters(Topic) -> - true = ets:delete(?TAB, Topic), +reset_counter(CRef) -> + [counters:put(CRef, Idx, 0) || Idx <- lists:seq(1, counters_size())], ok. get_counters(Topic) -> case ets:lookup(?TAB, Topic) of [] -> {error, topic_not_found}; - [{Topic, CRef}] -> CRef + [{Topic, Data}] -> maps:get(counter_ref, Data) end. counters_size() -> diff --git a/apps/emqx_modules/src/emqx_topic_metrics_api.erl b/apps/emqx_modules/src/emqx_topic_metrics_api.erl index 1a2365703..1f16b3759 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics_api.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics_api.erl @@ -13,195 +13,241 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- - +%% TODO: refactor uri path -module(emqx_topic_metrics_api). -% -rest_api(#{name => list_all_topic_metrics, -% method => 'GET', -% path => "/topic-metrics", -% func => list, -% descr => "A list of all topic metrics of all nodes in the cluster"}). +-behavior(minirest_api). -% -rest_api(#{name => list_topic_metrics, -% method => 'GET', -% path => "/topic-metrics/:bin:topic", -% func => list, -% descr => "A list of specfied topic metrics of all nodes in the cluster"}). +-import(emqx_mgmt_util, [ request_body_schema/1 + , response_schema/1 + , response_schema/2 + , response_array_schema/2 + , response_error_schema/2 + ]). -% -rest_api(#{name => register_topic_metrics, -% method => 'POST', -% path => "/topic-metrics", -% func => register, -% descr => "Register topic metrics"}). +-export([api_spec/0]). -% -rest_api(#{name => unregister_all_topic_metrics, -% method => 'DELETE', -% path => "/topic-metrics", -% func => unregister, -% descr => "Unregister all topic metrics"}). +-export([ list_topic/2 + , list_topic_metrics/2 + , operate_topic_metrics/2 + , reset_all_topic_metrics/2 + , reset_topic_metrics/2 + ]). -% -rest_api(#{name => unregister_topic_metrics, -% method => 'DELETE', -% path => "/topic-metrics/:bin:topic", -% func => unregister, -% descr => "Unregister topic metrics"}). +-define(ERROR_TOPIC, 'ERROR_TOPIC'). -% -export([ list/2 -% , register/2 -% , unregister/2 -% ]). +-define(EXCEED_LIMIT, 'EXCEED_LIMIT'). -% -export([ get_topic_metrics/2 -% , register_topic_metrics/2 -% , unregister_topic_metrics/2 -% , unregister_all_topic_metrics/1 -% ]). +-define(BAD_REQUEST, 'BAD_REQUEST'). -% list(#{topic := Topic0}, _Params) -> -% execute_when_enabled(fun() -> -% Topic = emqx_mgmt_util:urldecode(Topic0), -% case safe_validate(Topic) of -% true -> -% case get_topic_metrics(Topic) of -% {error, Reason} -> return({error, Reason}); -% Metrics -> return({ok, maps:from_list(Metrics)}) -% end; -% false -> -% return({error, invalid_topic_name}) -% end -% end); +api_spec() -> + { + [ + list_topic_api(), + list_topic_metrics_api(), + get_topic_metrics_api(), + reset_all_topic_metrics_api(), + reset_topic_metrics_api() + ], + [ + topic_metrics_schema() + ] + }. -% list(_Bindings, _Params) -> -% execute_when_enabled(fun() -> -% case get_all_topic_metrics() of -% {error, Reason} -> return({error, Reason}); -% Metrics -> return({ok, Metrics}) -% end -% end). +topic_metrics_schema() -> + #{ + topic_metrics => #{ + type => object, + properties => #{ + topic => #{type => string}, + create_time => #{ + type => string, + description => <<"Date time, rfc3339">> + }, + reset_time => #{ + type => string, + description => <<"Nullable. Date time, rfc3339.">> + }, + metrics => #{ + type => object, + properties => #{ + 'messages.dropped.count' => #{type => integer}, + 'messages.dropped.rate' => #{type => number}, + 'messages.in.count' => #{type => integer}, + 'messages.in.rate' => #{type => number}, + 'messages.out.count' => #{type => integer}, + 'messages.out.rate' => #{type => number}, + 'messages.qos0.in.count' => #{type => integer}, + 'messages.qos0.in.rate' => #{type => number}, + 'messages.qos0.out.count' => #{type => integer}, + 'messages.qos0.out.rate' => #{type => number}, + 'messages.qos1.in.count' => #{type => integer}, + 'messages.qos1.in.rate' => #{type => number}, + 'messages.qos1.out.count' => #{type => integer}, + 'messages.qos1.out.rate' => #{type => number}, + 'messages.qos2.in.count' => #{type => integer}, + 'messages.qos2.in.rate' => #{type => number}, + 'messages.qos2.out.count' => #{type => integer}, + 'messages.qos2.out.rate' => #{type => number} + } + } + } + } + }. -% register(_Bindings, Params) -> -% execute_when_enabled(fun() -> -% case proplists:get_value(<<"topic">>, Params) of -% undefined -> -% return({error, missing_required_params}); -% Topic -> -% case safe_validate(Topic) of -% true -> -% register_topic_metrics(Topic), -% return(ok); -% false -> -% return({error, invalid_topic_name}) -% end -% end -% end). +list_topic_api() -> + Path = "/mqtt/topic_metrics", + TopicSchema = #{ + type => object, + properties => #{ + topic => #{ + type => string}}}, + MetaData = #{ + get => #{ + description => <<"List topic">>, + responses => #{ + <<"200">> => + response_array_schema(<<"List topic">>, TopicSchema)}}}, + {Path, MetaData, list_topic}. -% unregister(Bindings, _Params) when map_size(Bindings) =:= 0 -> -% execute_when_enabled(fun() -> -% unregister_all_topic_metrics(), -% return(ok) -% end); +list_topic_metrics_api() -> + Path = "/mqtt/topic_metrics/metrics", + MetaData = #{ + get => #{ + description => <<"List topic metrics">>, + responses => #{ + <<"200">> => + response_array_schema(<<"List topic metrics">>, topic_metrics)}}}, + {Path, MetaData, list_topic_metrics}. -% unregister(#{topic := Topic0}, _Params) -> -% execute_when_enabled(fun() -> -% Topic = emqx_mgmt_util:urldecode(Topic0), -% case safe_validate(Topic) of -% true -> -% unregister_topic_metrics(Topic), -% return(ok); -% false -> -% return({error, invalid_topic_name}) -% end -% end). +get_topic_metrics_api() -> + Path = "/mqtt/topic_metrics/metrics/:topic", + MetaData = #{ + get => #{ + description => <<"List topic metrics">>, + parameters => [topic_param()], + responses => #{ + <<"200">> => + response_schema(<<"List topic metrics">>, topic_metrics)}}, + put => #{ + description => <<"Register topic metrics">>, + parameters => [topic_param()], + responses => #{ + <<"200">> => + response_schema(<<"Register topic metrics">>), + <<"409">> => + response_error_schema(<<"Topic metrics max limit">>, [?EXCEED_LIMIT]), + <<"400">> => + response_error_schema(<<"Topic metrics already exist">>, [?BAD_REQUEST])}}, + delete => #{ + description => <<"Deregister topic metrics">>, + parameters => [topic_param()], + responses => #{ + <<"200">> => + response_schema(<<"Deregister topic metrics">>)}}}, + {Path, MetaData, operate_topic_metrics}. -% execute_when_enabled(Fun) -> -% case emqx_modules:find_module(topic_metrics) of -% true -> -% Fun(); -% false -> -% return({error, module_not_loaded}) -% end. +reset_all_topic_metrics_api() -> + Path = "/mqtt/topic_metrics/reset", + MetaData = #{ + put => #{ + description => <<"Reset all topic metrics">>, + responses => #{ + <<"200">> => + response_schema(<<"Reset all topic metrics">>)}}}, + {Path, MetaData, reset_all_topic_metrics}. -% safe_validate(Topic) -> -% try emqx_topic:validate(name, Topic) of -% true -> true -% catch -% error:_Error -> -% false -% end. +reset_topic_metrics_api() -> + Path = "/mqtt/topic_metrics/reset/:topic", + MetaData = #{ + put => #{ + description => <<"Reset topic metrics">>, + parameters => [topic_param()], + responses => #{ + <<"200">> => + response_schema(<<"Reset topic metrics">>)}}}, + {Path, MetaData, reset_topic_metrics}. -% get_all_topic_metrics() -> -% lists:foldl(fun(Topic, Acc) -> -% case get_topic_metrics(Topic) of -% {error, _Reason} -> -% Acc; -% Metrics -> -% [#{topic => Topic, metrics => Metrics} | Acc] -% end -% end, [], emqx_mod_topic_metrics:all_registered_topics()). +topic_param() -> + #{ + name => topic, + in => path, + required => true, + schema => #{type => string} + }. -% get_topic_metrics(Topic) -> -% lists:foldl(fun(Node, Acc) -> -% case get_topic_metrics(Node, Topic) of -% {error, _Reason} -> -% Acc; -% Metrics -> -% case Acc of -% [] -> Metrics; -% _ -> -% lists:foldl(fun({K, V}, Acc0) -> -% [{K, V + proplists:get_value(K, Metrics, 0)} | Acc0] -% end, [], Acc) -% end -% end -% end, [], ekka_mnesia:running_nodes()). +topic_param(Request) -> + cowboy_req:binding(topic, Request). -% get_topic_metrics(Node, Topic) when Node =:= node() -> -% emqx_mod_topic_metrics:metrics(Topic); -% get_topic_metrics(Node, Topic) -> -% rpc_call(Node, get_topic_metrics, [Node, Topic]). +%%-------------------------------------------------------------------- +%% api callback +list_topic(get, _) -> + list_topics(). -% register_topic_metrics(Topic) -> -% Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()], -% case lists:any(fun(Item) -> Item =:= ok end, Results) of -% true -> ok; -% false -> lists:last(Results) -% end. +list_topic_metrics(get, _) -> + list_metrics(). -% register_topic_metrics(Node, Topic) when Node =:= node() -> -% emqx_mod_topic_metrics:register(Topic); -% register_topic_metrics(Node, Topic) -> -% rpc_call(Node, register_topic_metrics, [Node, Topic]). +operate_topic_metrics(Method, Request) -> + Topic = topic_param(Request), + case Method of + get -> + get_metrics(Topic); + put -> + register(Topic); + delete -> + deregister(Topic) + end. -% unregister_topic_metrics(Topic) -> -% Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()], -% case lists:any(fun(Item) -> Item =:= ok end, Results) of -% true -> ok; -% false -> lists:last(Results) -% end. +reset_all_topic_metrics(put, _) -> + reset(). -% unregister_topic_metrics(Node, Topic) when Node =:= node() -> -% emqx_mod_topic_metrics:unregister(Topic); -% unregister_topic_metrics(Node, Topic) -> -% rpc_call(Node, unregister_topic_metrics, [Node, Topic]). +reset_topic_metrics(put, Request) -> + Topic = topic_param(Request), + reset(Topic). -% unregister_all_topic_metrics() -> -% Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()], -% case lists:any(fun(Item) -> Item =:= ok end, Results) of -% true -> ok; -% false -> lists:last(Results) -% end. +%%-------------------------------------------------------------------- +%% api apply +list_topics() -> + {200, emqx_topic_metrics:all_registered_topics()}. -% unregister_all_topic_metrics(Node) when Node =:= node() -> -% emqx_mod_topic_metrics:unregister_all(); -% unregister_all_topic_metrics(Node) -> -% rpc_call(Node, unregister_topic_metrics, [Node]). +list_metrics() -> + {200, emqx_topic_metrics:metrics()}. -% rpc_call(Node, Fun, Args) -> -% case rpc:call(Node, ?MODULE, Fun, Args) of -% {badrpc, Reason} -> {error, Reason}; -% Res -> Res -% end. +register(Topic) -> + case emqx_topic_metrics:register(Topic) of + {error, quota_exceeded} -> + Message = list_to_binary(io_lib:format("Max topic metrics count is ~p", + [emqx_topic_metrics:max_limit()])), + {409, #{code => ?EXCEED_LIMIT, message => Message}}; + {error, already_existed} -> + Message = list_to_binary(io_lib:format("Topic ~p already registered", [Topic])), + {400, #{code => ?BAD_REQUEST, message => Message}}; + ok -> + {200} + end. -% return(_) -> -% %% TODO: V5 API -% ok. +deregister(Topic) -> + _ = emqx_topic_metrics:deregister(Topic), + {200}. + +get_metrics(Topic) -> + case emqx_topic_metrics:metrics(Topic) of + {error, topic_not_found} -> + Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])), + {404, #{code => ?ERROR_TOPIC, message => Message}}; + Metrics -> + {200, Metrics} + end. + +reset() -> + ok = emqx_topic_metrics:reset(), + {200}. + +reset(Topic) -> + case emqx_topic_metrics:reset(Topic) of + {error, topic_not_found} -> + Message = list_to_binary(io_lib:format("Topic ~p not found", [Topic])), + {404, #{code => ?ERROR_TOPIC, message => Message}}; + ok -> + {200} + end. diff --git a/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl b/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl index 5d1c5f84a..958131716 100644 --- a/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl +++ b/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl @@ -19,6 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). + +-define(TOPIC, <<""" +topic_metrics: { + topics : []}""">>). + -include_lib("eunit/include/eunit.hrl"). all() -> emqx_ct:all(?MODULE). @@ -26,6 +31,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([emqx_modules]), + ok = emqx_config:init_load(emqx_modules_schema, ?TOPIC), Config. end_per_suite(_Config) -> @@ -43,7 +49,7 @@ t_nonexistent_topic_metrics(_) -> ?assertEqual({error, invalid_metric}, emqx_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')), ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')), % ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')), - emqx_topic_metrics:unregister(<<"a/b/c">>), + emqx_topic_metrics:deregister(<<"a/b/c">>), emqx_topic_metrics:disable(). t_topic_metrics(_) -> @@ -60,7 +66,7 @@ t_topic_metrics(_) -> ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assert(emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0), % ?assert(emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= #{long => 0,medium => 0,short => 0}), - emqx_topic_metrics:unregister(<<"a/b/c">>), + emqx_topic_metrics:deregister(<<"a/b/c">>), emqx_topic_metrics:disable(). t_hook(_) -> @@ -91,5 +97,5 @@ t_hook(_) -> ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.out')), ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), - emqx_topic_metrics:unregister(<<"a/b/c">>), + emqx_topic_metrics:deregister(<<"a/b/c">>), emqx_topic_metrics:disable(). diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index a96ee7a62..89fdde579 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -93,6 +93,7 @@ , bool/1 , int/1 , float/1 + , float/2 , map/1 , bin2hexstr/1 , hexstr2bin/1 @@ -516,6 +517,10 @@ int(Data) -> float(Data) -> emqx_rule_utils:float(Data). +float(Data, Decimals) when Decimals > 0 -> + Data1 = ?MODULE:float(Data), + list_to_float(float_to_list(Data1, [{decimals, Decimals}])). + map(Data) -> emqx_rule_utils:map(Data).