diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index c31650b07..35f8908da 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -1,4 +1,5 @@ {emqx,1}. +{emqx_rule_engine,1}. {emqx_bridge,1}. {emqx_authn,1}. {emqx_authz,1}. diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 84920109c..f9a3a6434 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -52,6 +52,7 @@ , update/3 , stop/2 , restart/2 + , reset_metrics/1 ]). -export([ send_message/2 @@ -210,6 +211,9 @@ lookup(Type, Name, RawConf) -> raw_config => RawConf}} end. +reset_metrics(ResourceId) -> + emqx_resource:reset_metrics(ResourceId). + stop(Type, Name) -> emqx_resource:stop(resource_id(Type, Name)). diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 8e1968049..12cbee000 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -34,6 +34,7 @@ , '/bridges/:id'/2 , '/bridges/:id/operation/:operation'/2 , '/nodes/:node/bridges/:id/operation/:operation'/2 + , '/bridges/:id/reset_metrics'/2 ]). -export([ lookup_from_local_node/2 @@ -76,7 +77,8 @@ api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation", - "/nodes/:node/bridges/:id/operation/:operation"]. + "/nodes/:node/bridges/:id/operation/:operation", + "/bridges/:id/reset_metrics"]. error_schema(Code, Message) when is_atom(Code) -> error_schema([Code], Message); @@ -282,6 +284,20 @@ schema("/bridges/:id") -> } }; +schema("/bridges/:id/reset_metrics") -> + #{ + 'operationId' => '/bridges/:id/reset_metrics', + put => #{ + tags => [<<"bridges">>], + summary => <<"Reset Bridge Metrics">>, + description => <<"Reset a bridge metrics by Id">>, + parameters => [param_path_id()], + responses => #{ + 200 => <<"Reset success">>, + 400 => error_schema(['BAD_REQUEST'], "RPC Call Failed") + } + } + }; schema("/bridges/:id/operation/:operation") -> #{ 'operationId' => '/bridges/:id/operation/:operation', @@ -363,6 +379,12 @@ schema("/nodes/:node/bridges/:id/operation/:operation") -> {500, error_msg('INTERNAL_ERROR', Reason)} end). +'/bridges/:id/reset_metrics'(put, #{bindings := #{id := Id}}) -> + case emqx_bridge:reset_metrics(Id) of + ok -> {200, <<"Reset success">>}; + Reason -> {400, error_msg('BAD_REQUEST', Reason)} + end. + lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> Nodes = mria_mnesia:running_nodes(), case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index e707bf262..b4215b6ab 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -333,6 +333,30 @@ t_enable_disable_bridges(_) -> {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). +t_reset_bridges(_) -> + %% assert we there's no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + Port = start_http_server(fun handle_fun_200_ok/2), + URL1 = ?URL(Port, "abc"), + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)), + %ct:pal("the bridge ==== ~p", [Bridge]), + #{ <<"type">> := ?BRIDGE_TYPE + , <<"name">> := ?BRIDGE_NAME + , <<"status">> := <<"connected">> + , <<"node_status">> := [_|_] + , <<"metrics">> := _ + , <<"node_metrics">> := [_|_] + , <<"url">> := URL1 + } = jsx:decode(Bridge), + BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + {ok, 200, <<"Reset success">>} = request(put, uri(["bridges", BridgeID, "reset_metrics"]), []), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). + request(Method, Url, Body) -> request(<<"bridge_admin">>, Method, Url, Body). diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index 241f8ff2a..98c4490ee 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -32,6 +32,7 @@ , create_metrics/3 , create_metrics/4 , clear_metrics/2 + , reset_metrics/2 , has_metrics/2 ]). @@ -116,6 +117,10 @@ create_metrics(Name, Id, Metrics, RateMetrics) -> clear_metrics(Name, Id) -> gen_server:call(Name, {delete_metrics, Id}). +-spec(reset_metrics(handler_name(), metric_id()) -> ok). +reset_metrics(Name, Id) -> + gen_server:call(Name, {reset_metrics, Id}). + -spec(has_metrics(handler_name(), metric_id()) -> boolean()). has_metrics(Name, Id) -> case get_ref(Name, Id) of @@ -143,6 +148,13 @@ get_counters(Name, Id) -> get(Name, Id, Index) end, get_indexes(Name, Id)). +-spec reset_counters(handler_name(), metric_id()) -> ok. +reset_counters(Name, Id) -> + Indexes = maps:values(get_indexes(Name, Id)), + Ref = get_ref(Name, Id), + [counters:put(Ref, Idx, 0) || Idx <- Indexes ], + ok. + -spec(get_metrics(handler_name(), metric_id()) -> metrics()). get_metrics(Name, Id) -> #{rate => get_rate(Name, Id), counters => get_counters(Name, Id)}. @@ -198,6 +210,17 @@ handle_call({delete_metrics, Id}, _From, _ -> maps:remove(Id, Rates) end}}; +handle_call({reset_metrics, Id}, _From, + State = #state{rates = Rates}) -> + {reply, reset_counters(get_self_name(), Id), + State#state{rates = case Rates of + undefined -> undefined; + _ -> ResetRate = + maps:map(fun(_Key, _Value) -> #rate{} end, + maps:get(Id, Rates, #{})), + maps:put(Id, ResetRate, Rates) + end}}; + handle_call(_Request, _From, State) -> {reply, ok, State}. diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl index 25c253a9a..3ae480c22 100644 --- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl @@ -85,6 +85,44 @@ t_get_metrics(_) -> ?assert(MaxA > 0), ?assert(MaxB > 0), ?assert(MaxC > 0)}), ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). +t_reset_metrics(_) -> + Metrics = [a, b, c], + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics), + %% all the metrics are set to zero at start + ?assertMatch(#{ + rate := #{ + a := #{current := 0.0, max := 0.0, last5m := 0.0}, + b := #{current := 0.0, max := 0.0, last5m := 0.0}, + c := #{current := 0.0, max := 0.0, last5m := 0.0} + }, + counters := #{ + a := 0, + b := 0, + c := 0 + } + }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ct:sleep(1500), + ok = emqx_plugin_libs_metrics:reset_metrics(?NAME, <<"testid">>), + ?LET(#{ + rate := #{ + a := #{current := CurrA, max := MaxA, last5m := _}, + b := #{current := CurrB, max := MaxB, last5m := _}, + c := #{current := CurrC, max := MaxC, last5m := _} + }, + counters := #{ + a := 0, + b := 0, + c := 0 + } + }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>), + {?assert(CurrA == 0), ?assert(CurrB == 0), ?assert(CurrC == 0), + ?assert(MaxA == 0), ?assert(MaxB == 0), ?assert(MaxC == 0)}), + ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). + t_get_metrics_2(_) -> Metrics = [a, b, c], ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics, diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 99d663b09..7bafb57bb 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -53,6 +53,8 @@ , recreate_local/4 , remove/1 %% remove the config and stop the instance , remove_local/1 + , reset_metrics/1 + , reset_metrics_local/1 ]). %% Calls to the callback module with current resource state @@ -184,6 +186,14 @@ remove(InstId) -> remove_local(InstId) -> call_instance(InstId, {remove, InstId}). +-spec reset_metrics_local(instance_id()) -> ok. +reset_metrics_local(InstId) -> + call_instance(InstId, {reset_metrics, InstId}). + +-spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}. +reset_metrics(InstId) -> + wrap_rpc(emqx_resource_proto_v1:reset_metrics(InstId)). + %% ================================================================================= -spec query(instance_id(), Request :: term()) -> Result :: term(). query(InstId, Request) -> diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 41852cafb..af266f763 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -25,6 +25,7 @@ %% load resource instances from *.conf files -export([ lookup/1 , get_metrics/1 + , reset_metrics/1 , list_all/0 , list_group/1 ]). @@ -77,6 +78,9 @@ make_test_id() -> get_metrics(InstId) -> emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId). +reset_metrics(InstId) -> + emqx_plugin_libs_metrics:reset_metrics(resource_metrics, InstId). + force_lookup(InstId) -> {ok, _Group, Data} = lookup(InstId), Data. @@ -114,6 +118,9 @@ handle_call({create_dry_run, ResourceType, Config}, _From, State) -> handle_call({recreate, InstId, ResourceType, Config, Opts}, _From, State) -> {reply, do_recreate(InstId, ResourceType, Config, Opts), State}; +handle_call({reset_metrics, InstId}, _From, State) -> + {reply, do_reset_metrics(InstId), State}; + handle_call({remove, InstId}, _From, State) -> {reply, do_remove(InstId), State}; @@ -222,6 +229,9 @@ do_create_dry_run(ResourceType, Config) -> {error, Reason} end. +do_reset_metrics(Instance) -> + reset_metrics(Instance). + do_remove(Instance) -> do_remove(Instance, true). diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl index 3d62603fa..f39533c82 100644 --- a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -24,6 +24,7 @@ , create_dry_run/2 , recreate/4 , remove/1 + , reset_metrics/1 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -61,3 +62,8 @@ recreate(InstId, ResourceType, Config, Opts) -> emqx_cluster_rpc:multicall_return(ok). remove(InstId) -> emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]). + +-spec reset_metrics(emqx_resource:instance_id()) -> + emqx_cluster_rpc:multicall_return(ok). +reset_metrics(InstId) -> + emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [InstId]). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 4019f983e..8a952e036 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -294,7 +294,7 @@ t_create_dry_run_local(_) -> ?assertEqual(undefined, whereis(test_resource)). -t_create_dry_run_local_failed(_) -> +t_create_dry_run_local_failed(_) -> {Res, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE, #{cteate_error => true}), ?assertEqual(error, Res), @@ -313,6 +313,19 @@ t_test_func(_) -> ?assertEqual(ok, erlang:apply(emqx_resource_validator:max(array, 10), [[a,b,c,d]])), ?assertEqual(ok, erlang:apply(emqx_resource_validator:max(string, 10), ["less10"])). +t_reset_metrics(_) -> + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}), + + #{pid := Pid} = emqx_resource:query(?ID, get_state), + emqx_resource:reset_metrics(?ID), + ?assert(is_process_alive(Pid)), + ok = emqx_resource:remove(?ID), + ?assertNot(is_process_alive(Pid)). + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index f42094b0c..597daa378 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -56,6 +56,7 @@ , unload_hooks_for_rule/1 , maybe_add_metrics_for_rule/1 , clear_metrics_for_rule/1 + , reset_metrics_for_rule/1 ]). %% exported for `emqx_telemetry' @@ -195,6 +196,10 @@ maybe_add_metrics_for_rule(Id) -> clear_metrics_for_rule(Id) -> ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id). +-spec(reset_metrics_for_rule(rule_id()) -> ok). +reset_metrics_for_rule(Id) -> + emqx_plugin_libs_metrics:reset_metrics(rule_metrics, Id). + unload_hooks_for_rule(#{id := Id, from := Topics}) -> lists:foreach(fun(Topic) -> case get_rules_with_same_event(Topic) of diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 181a5a244..cc2290e05 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -28,7 +28,7 @@ -export([api_spec/0, paths/0, schema/1, namespace/0]). %% API callbacks --export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2]). +-export(['/rule_events'/2, '/rule_test'/2, '/rules'/2, '/rules/:id'/2, '/rules/:id/reset_metrics'/2]). -define(ERR_NO_RULE(ID), list_to_binary(io_lib:format("Rule ~ts Not Found", [(ID)]))). -define(ERR_BADARGS(REASON), @@ -166,6 +166,21 @@ schema("/rules/:id") -> } }; +schema("/rules/:id/reset_metrics") -> + #{ + operationId => '/rules/:id/reset_metrics', + put => #{ + tags => [<<"rules">>], + description => <<"Reset a rule metrics">>, + summary => <<"Reset a Rule Metrics">>, + parameters => param_path_id(), + responses => #{ + 400 => error_schema('BAD_REQUEST', "RPC Call Failed"), + 200 => <<"Reset Success">> + } + } + }; + schema("/rule_test") -> #{ operationId => '/rule_test', @@ -262,10 +277,17 @@ replace_sql_clrf(#{ <<"sql">> := SQL } = Params) -> id => Id, reason => Reason}), {500, #{code => 'INTERNAL_ERROR', message => ?ERR_BADARGS(Reason)}} end. +'/rules/:id/reset_metrics'(put, #{bindings := #{id := RuleId}}) -> + case emqx_rule_engine_proto_v1:reset_metrics(RuleId) of + {ok, _TxnId, _Result} -> {200, <<"Reset Success">>}; + Failed -> {400, #{code => 'BAD_REQUEST', + message => err_msg(Failed)}} + end. %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ + err_msg(Msg) -> list_to_binary(io_lib:format("~0p", [Msg])). diff --git a/apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl b/apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl new file mode 100644 index 000000000..cc3a79a74 --- /dev/null +++ b/apps/emqx_rule_engine/src/proto/emqx_rule_engine_proto_v1.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ introduced_in/0 + + , reset_metrics/1 + ]). + +-include_lib("emqx/include/bpapi.hrl"). +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). + +introduced_in() -> + "5.0.0". + +-spec reset_metrics(rule_id()) -> + emqx_cluster_rpc:multicall_return(ok). +reset_metrics(RuleId) -> + emqx_cluster_rpc:multicall(emqx_rule_engine, reset_metrics_for_rule, [RuleId]). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index 39532a41a..498e34b4b 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -46,6 +46,9 @@ t_crud_rule_api(_Config) -> ct:pal("RList : ~p", [Rules]), ?assert(length(Rules) > 0), + {200, Rule0} = emqx_rule_engine_api:'/rules/:id/reset_metrics'(put, #{bindings => #{id => RuleID}}), + ?assertEqual(<<"Reset Success">>, Rule0), + {200, Rule1} = emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}}), ct:pal("RShow : ~p", [Rule1]), ?assertEqual(Rule, Rule1),