diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index d97c6542f..f329f4b22 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,41 +2,48 @@ {VSN, [ {"4.3.6", - [ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.6"]}} + , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} ]}, {"4.3.5", - [ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.5"]}} + , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} ]}, {"4.3.4", - [ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.4"]}} + , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} ]}, {"4.3.3", - [ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.3"]}} + , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} ]}, {"4.3.2", - [ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.2"]}} + , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {apply,{emqx_stats,cancel_update,[rule_registery_stats]}} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} ]}, {"4.3.1", - [ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.1"]}} + , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {apply,{emqx_stats,cancel_update,[rule_registery_stats]}} , {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} ]}, {"4.3.0", - [ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.0"]}} + , {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {apply,{emqx_stats,cancel_update,[rule_registery_stats]}} @@ -47,41 +54,48 @@ ], [ {"4.3.6", - [ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.6"]}} + , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} ]}, {"4.3.5", - [ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.5"]}} + , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} ]}, {"4.3.4", - [ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.4"]}} + , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} ]}, {"4.3.3", - [ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.3"]}} + , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} ]}, {"4.3.2", - [ {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.2"]}} + , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {apply,{emqx_stats,cancel_update,[rule_registery_stats]}} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} ]}, {"4.3.1", - [ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.1"]}} + , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {apply,{emqx_stats,cancel_update,[rule_registery_stats]}} , {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]} ]}, {"4.3.0", - [ {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]} + [ {update, emqx_rule_metrics, {advanced, ["4.3.0"]}} + , {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]} , {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]} , {apply,{emqx_stats,cancel_update,[rule_registery_stats]}} @@ -90,4 +104,4 @@ ]}, {<<".*">>, []} ] -}. +}. \ No newline at end of file diff --git a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl index bc2b04c07..ed831bc3b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl @@ -25,16 +25,19 @@ , stop/0 ]). --export([ get_rules_matched/1 - , get_actions_taken/1 +-export([ get_actions_taken/1 , get_actions_success/1 , get_actions_error/1 , get_actions_exception/1 , get_actions_retry/1 + , get_rules_matched/1 + , get_rules_failed/1 + , get_rules_passed/1 + , get_rules_exception/1 + , get_rules_no_result/1 ]). --export([ inc_rules_matched/1 - , inc_rules_matched/2 +-export([ inc_rules_matched/2 , inc_actions_taken/1 , inc_actions_taken/2 , inc_actions_success/1 @@ -45,19 +48,21 @@ , inc_actions_exception/2 , inc_actions_retry/1 , inc_actions_retry/2 + , inc_rules_matched/1 + , inc_rules_failed/1 + , inc_rules_passed/1 + , inc_rules_exception/1 + , inc_rules_no_result/1 ]). -export([ inc/2 , inc/3 , get/2 - , get_overall/1 , get_rule_speed/1 - , get_overall_rule_speed/0 , create_rule_metrics/1 , create_metrics/1 , clear_rule_metrics/1 , clear_metrics/1 - , overall_metrics/0 ]). -export([ get_rule_metrics/1 @@ -129,22 +134,18 @@ get(Id, Metric) -> Ref -> counters:get(Ref, metrics_idx(Metric)) end. --spec(get_overall(atom()) -> number()). -get_overall(Metric) -> - emqx_metrics:val(Metric). - -spec(get_rule_speed(rule_id()) -> map()). get_rule_speed(Id) -> gen_server:call(?MODULE, {get_rule_speed, Id}). --spec(get_overall_rule_speed() -> map()). -get_overall_rule_speed() -> - gen_server:call(?MODULE, get_overall_rule_speed). - -spec(get_rule_metrics(rule_id()) -> map()). get_rule_metrics(Id) -> #{max := Max, current := Current, last5m := Last5M} = get_rule_speed(Id), #{matched => get_rules_matched(Id), + failed => get_rules_failed(Id), + passed => get_rules_passed(Id), + exception => get_rules_exception(Id), + no_result => get_rules_no_result(Id), speed => Current, speed_max => Max, speed_last5m => Last5M @@ -174,17 +175,7 @@ inc(Id, Metric, Val) -> counters:add(couters_ref(Id), metrics_idx(Metric), Val); Ref -> counters:add(Ref, metrics_idx(Metric), Val) - end, - inc_overall(Metric, Val). - --spec(inc_overall(atom(), pos_integer()) -> ok). -inc_overall(Metric, Val) -> - emqx_metrics:inc(Metric, Val). - -inc_rules_matched(Id) -> - inc_rules_matched(Id, 1). -inc_rules_matched(Id, Val) -> - inc(Id, 'rules.matched', Val). + end. inc_actions_taken(Id) -> inc_actions_taken(Id, 1). @@ -211,8 +202,32 @@ inc_actions_retry(Id) -> inc_actions_retry(Id, Val) -> inc(Id, 'actions.retry', Val). -get_rules_matched(Id) -> - get(Id, 'rules.matched'). +inc_rules_matched(Id) -> + inc_rules_matched(Id, 1). +inc_rules_matched(Id, Val) -> + inc(Id, 'rules.matched', Val). + +inc_rules_failed(Id) -> + inc_rules_failed(Id, 1). +inc_rules_failed(Id, Val) -> + inc(Id, 'rules.failed', Val). + +inc_rules_passed(Id) -> + inc_rules_passed(Id, 1). +inc_rules_passed(Id, Val) -> + inc(Id, 'rules.passed', Val). + +inc_rules_exception(Id) -> + inc_rules_exception(Id, 1). +inc_rules_exception(Id, Val) -> + inc(Id, 'rules.failed', Val), + inc(Id, 'rules.exception', Val). + +inc_rules_no_result(Id) -> + inc_rules_no_result(Id, 1). +inc_rules_no_result(Id, Val) -> + inc(Id, 'rules.failed', Val), + inc(Id, 'rules.no_result', Val). get_actions_taken(Id) -> get(Id, 'actions.taken'). @@ -229,13 +244,26 @@ get_actions_exception(Id) -> get_actions_retry(Id) -> get(Id, 'actions.retry'). +get_rules_matched(Id) -> + get(Id, 'rules.matched'). + +get_rules_failed(Id) -> + get(Id, 'rules.failed'). + +get_rules_passed(Id) -> + get(Id, 'rules.passed'). + +get_rules_exception(Id) -> + get(Id, 'rules.exception'). + +get_rules_no_result(Id) -> + get(Id, 'rules.no_result'). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> erlang:process_flag(trap_exit, true), - %% the overall counters - [ok = emqx_metrics:ensure(Metric)|| Metric <- overall_metrics()], %% the speed metrics erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {ok, #state{overall_rule_speed = #rule_speed{}}}. @@ -248,9 +276,6 @@ handle_call({get_rule_speed, Id}, _From, State = #state{rule_speeds = RuleSpeeds Speed -> format_rule_speed(Speed) end, State}; -handle_call(get_overall_rule_speed, _From, State = #state{overall_rule_speed = RuleSpeed}) -> - {reply, format_rule_speed(RuleSpeed), State}; - handle_call({create_metrics, Id}, _From, State = #state{metric_ids = MIDs}) -> {reply, create_counters(Id), State#state{metric_ids = sets:add_element(Id, MIDs)}}; @@ -287,61 +312,74 @@ handle_info(ticking, State = #state{rule_speeds = undefined}) -> erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State}; -handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0, - overall_rule_speed = OverallRuleSpeed0}) -> +handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0}) -> RuleSpeeds = maps:map( fun(Id, RuleSpeed) -> calculate_speed(get_rules_matched(Id), RuleSpeed) end, RuleSpeeds0), - OverallRuleSpeed = calculate_speed(get_overall('rules.matched'), OverallRuleSpeed0), async_refresh_resource_status(), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), - {noreply, State#state{rule_speeds = RuleSpeeds, - overall_rule_speed = OverallRuleSpeed}}; + {noreply, State#state{rule_speeds = RuleSpeeds}}; handle_info(_Info, State) -> {noreply, State}. -code_change({down, Vsn}, State = #state{metric_ids = MIDs}, _Extra) - when Vsn =:= "4.2.0"; - Vsn =:= "4.2.1" -> - emqx_metrics:ensure('actions.failure'), - emqx_metrics:set('actions.failure', - emqx_metrics:val('actions.error') - + emqx_metrics:val('actions.exception')), - [begin - Matched = get_rules_matched(Id), - Succ = get_actions_success(Id), - Error = get_actions_error(Id), - Except = get_actions_exception(Id), - ok = delete_counters(Id), - ok = create_counters(Id), - inc_rules_matched(Id, Matched), - inc_actions_success(Id, Succ), - inc_actions_error(Id, Error + Except) - end || Id <- sets:to_list(MIDs)], - {ok, State}; +code_change({down, _Vsn}, State = #state{metric_ids = MIDs}, [Vsn]) -> + case string:tokens(Vsn, ".") of + ["4", "3", SVal] -> + {Val, []} = string:to_integer(SVal), + case Val =< 7 of + true -> + [begin + Passed = get_rules_passed(Id), + Take = get_actions_taken(Id), + Success = get_actions_success(Id), + Error = get_actions_error(Id), + Exception = get_actions_exception(Id), + Retry = get_actions_retry(Id), + ok = delete_counters(Id), + ok = create_counters(Id, 7), + inc_rules_matched(Id, Passed), + inc_actions_taken(Id, Take), + inc_actions_success(Id, Success), + inc_actions_error(Id, Error), + inc_actions_exception(Id, Exception), + inc_actions_retry(Id, Retry) + end || Id <- sets:to_list(MIDs)], + {ok, State}; + false -> {ok, State} + end; + _ -> {ok, State} + end; -code_change(Vsn, State = #state{metric_ids = MIDs}, _Extra) - when Vsn =:= "4.2.0"; - Vsn =:= "4.2.1" -> - [emqx_metrics:ensure(Name) - || Name <- - ['actions.error', 'actions.taken', - 'actions.exception', 'actions.retry' - ]], - emqx_metrics:set('actions.error', emqx_metrics:val('actions.failure')), - [begin - Matched = get_rules_matched(Id), - Succ = get_actions_success(Id), - Error = get_actions_error(Id), - ok = delete_counters(Id), - ok = create_counters(Id), - inc_rules_matched(Id, Matched), - inc_actions_success(Id, Succ), - inc_actions_error(Id, Error) - end || Id <- sets:to_list(MIDs)], - {ok, State}; +code_change(_Vsn, State = #state{metric_ids = MIDs}, [Vsn]) -> + case string:tokens(Vsn, ".") of + ["4", "3", SVal] -> + {Val, []} = string:to_integer(SVal), + case Val =< 7 of + true -> + [begin + Matched = get_rules_matched(Id), + Take = get_actions_taken(Id), + Success = get_actions_success(Id), + Error = get_actions_error(Id), + Exception = get_actions_exception(Id), + Retry = get_actions_retry(Id), + ok = delete_counters(Id), + ok = create_counters(Id), + inc_rules_matched(Id, Matched), + inc_rules_passed(Id, Matched), + inc_actions_taken(Id, Take), + inc_actions_success(Id, Success), + inc_actions_error(Id, Error), + inc_actions_exception(Id, Exception), + inc_actions_retry(Id, Retry) + end || Id <- sets:to_list(MIDs)], + {ok, State}; + false -> {ok, State} + end; + _ -> {ok, State} + end; code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -361,10 +399,12 @@ async_refresh_resource_status() -> spawn(emqx_rule_engine, refresh_resource_status, []). create_counters(Id) -> + create_counters(Id, max_counters_size()). +create_counters(Id, Size) -> case couters_ref(Id) of not_found -> ok = persistent_term:put(?CRefID(Id), - counters:new(max_counters_size(), [write_concurrency])); + counters:new(Size, [write_concurrency])); _Ref -> ok end. @@ -420,7 +460,7 @@ precision(Float, N) -> %% Metrics Definitions %%------------------------------------------------------------------------------ -max_counters_size() -> 7. +max_counters_size() -> 11. metrics_idx('rules.matched') -> 1; metrics_idx('actions.success') -> 2; @@ -428,13 +468,8 @@ metrics_idx('actions.error') -> 3; metrics_idx('actions.taken') -> 4; metrics_idx('actions.exception') -> 5; metrics_idx('actions.retry') -> 6; -metrics_idx(_) -> 7. - -overall_metrics() -> - [ 'rules.matched' - , 'actions.success' - , 'actions.error' - , 'actions.taken' - , 'actions.exception' - , 'actions.retry' - ]. +metrics_idx('rules.failed') -> 7; +metrics_idx('rules.passed') -> 8; +metrics_idx('rules.exception') -> 9; +metrics_idx('rules.no_result') -> 10; +metrics_idx(_) -> 11. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 080498d82..6c2482075 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -55,18 +55,23 @@ apply_rules([Rule = #rule{id = RuleID}|More], Input) -> catch %% ignore the errors if select or match failed _:{select_and_transform_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), ?LOG(warning, "SELECT clause exception for ~s failed: ~p", [RuleID, Error]); _:{match_conditions_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), ?LOG(warning, "WHERE clause exception for ~s failed: ~p", [RuleID, Error]); _:{select_and_collect_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), ?LOG(warning, "FOREACH clause exception for ~s failed: ~p", [RuleID, Error]); _:{match_incase_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), ?LOG(warning, "INCASE clause exception for ~s failed: ~p", [RuleID, Error]); _:Error:StkTrace -> + emqx_rule_metrics:inc_rules_exception(RuleID), ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p", [RuleID, Error, StkTrace]) end, @@ -78,6 +83,7 @@ apply_rule_discard_result(Rule, Input) -> apply_rule(Rule = #rule{id = RuleID}, Input) -> clear_rule_payload(), + ok = emqx_rule_metrics:inc_rules_matched(RuleID), do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). do_apply_rule(#rule{id = RuleId, @@ -94,10 +100,14 @@ do_apply_rule(#rule{id = RuleId, case ?RAISE(match_conditions(Conditions, ColumnsAndSelected), {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> - ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'), Collection2 = filter_collection(Input, InCase, DoEach, Collection), + case Collection2 of + [] -> emqx_rule_metrics:inc_rules_no_result(RuleId); + _ -> emqx_rule_metrics:inc_rules_passed(RuleId) + end, {ok, [take_actions(Actions, Coll, Input, OnFailed) || Coll <- Collection2]}; false -> + ok = emqx_rule_metrics:inc_rules_no_result(RuleId), {error, nomatch} end; @@ -112,9 +122,10 @@ do_apply_rule(#rule{id = RuleId, case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)), {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> - ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'), + ok = emqx_rule_metrics:inc_rules_passed(RuleId), {ok, take_actions(Actions, Selected, Input, OnFailed)}; false -> + ok = emqx_rule_metrics:inc_rules_no_result(RuleId), {error, nomatch} end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index d424c9f8a..f430c3d4e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -38,6 +38,7 @@ all() -> , {group, events} , {group, multi_actions} , {group, bugs} + , {group, rule_metrics} ]. suite() -> @@ -125,6 +126,10 @@ groups() -> t_sqlparse_true_false, t_sqlparse_new_map ]}, + {rule_metrics, [], + [t_metrics, + t_metrics1 + ]}, {events, [], [t_events ]}, @@ -1327,6 +1332,110 @@ t_sqlselect_3(_Config) -> emqtt:stop(Client), emqx_rule_registry:remove_rule(TopicRule). +t_metrics(_Config) -> + ok = emqx_rule_engine:load_providers(), + TopicRule = create_simple_repub_rule( + <<"t2">>, + "SELECT payload.msg as msg, payload.idx as idx " + "FROM \"t1\" " + "WHERE msg = 'hello' and idx + 1 > 2 "), + #rule{id = RuleId} = TopicRule, + ?assertEqual(0, emqx_rule_metrics:get_rules_matched(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_passed(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_failed(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_no_result(RuleId)), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client), + ct:sleep(200), + PublishMoreTimes = fun(SomeMessage, Times) -> + [begin + emqtt:publish(Client, <<"t1">>, SomeMessage, 0), + ct:sleep(200) + end || _ <- lists:seq(1, Times)] end, + PublishMoreTimes(<<"{\"msg\":\"hello\", \"idx\":5}">>, 10), + ?assertEqual(10, emqx_rule_metrics:get_rules_matched(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_failed(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_no_result(RuleId)), + PublishMoreTimes(<<"{\"msg\":\"hello\", \"idx\":0}">>, 10), + ?assertEqual(20, emqx_rule_metrics:get_rules_matched(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_failed(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_no_result(RuleId)), + PublishMoreTimes(<<"{\"msg\":\"hello\", \"idx\":\"somevalue\"}">>, 10), + ?assertEqual(30, emqx_rule_metrics:get_rules_matched(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)), + ?assertEqual(20, emqx_rule_metrics:get_rules_failed(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_exception(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_no_result(RuleId)), + emqtt:stop(Client), + emqx_rule_registry:remove_rule(TopicRule). + +t_metrics1(_Config) -> + ok = emqx_rule_engine:load_providers(), + TopicRule = create_simple_repub_rule( + <<"t2">>, + "FOREACH payload.sensors " + "DO clientid,item.name as name, item.idx + 1 as idx " + "INCASE item.idx >= 1 " + "FROM \"t1\" "), + #rule{id = RuleId} = TopicRule, + ?assertEqual(0, emqx_rule_metrics:get_rules_matched(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_passed(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_failed(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_no_result(RuleId)), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client), + ct:sleep(200), + PublishMoreTimes = fun(SomeMessage, Times) -> + [begin + emqtt:publish(Client, <<"t1">>, SomeMessage, 0), + ct:sleep(200) + end || _ <- lists:seq(1, Times)] end, + Message = <<"{\"date\": \"2020-04-24\", + \"sensors\": [ + {\"name\": \"a\", \"idx\":0}, + {\"name\": \"b\", \"idx\":1}, + {\"name\": \"c\", \"idx\":2} + ]}">>, + PublishMoreTimes(Message, 10), + ?assertEqual(10, emqx_rule_metrics:get_rules_matched(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_failed(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_no_result(RuleId)), + Message1 = <<"{\"date\": \"2020-04-24\", + \"sensors\": [ + {\"name\": \"a\", \"idx\":0}, + {\"name\": \"b\", \"idx\":0}, + {\"name\": \"c\", \"idx\":0} + ]}">>, + PublishMoreTimes(Message1, 10), + ?assertEqual(20, emqx_rule_metrics:get_rules_matched(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_failed(RuleId)), + ?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_no_result(RuleId)), + Message2 = <<"{\"date\": \"2020-04-24\", + \"sensors\": [ + {\"name\": \"a\", \"idx\":0}, + {\"name\": \"b\", \"idx\":1}, + {\"name\": \"c\", \"idx\":\"some string\"} + ]}">>, + PublishMoreTimes(Message2, 10), + ?assertEqual(30, emqx_rule_metrics:get_rules_matched(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)), + ?assertEqual(20, emqx_rule_metrics:get_rules_failed(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_exception(RuleId)), + ?assertEqual(10, emqx_rule_metrics:get_rules_no_result(RuleId)), + + emqtt:stop(Client), + emqx_rule_registry:remove_rule(TopicRule). + t_sqlselect_multi_actoins_1(Config) -> %% We create 2 actions in the same rule: %% The first will fail and we need to make sure the diff --git a/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl index e7c543c91..5a8d99d0e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl @@ -55,7 +55,6 @@ end_per_suite(_Config) -> init_per_testcase(_, Config) -> catch emqx_rule_metrics:stop(), {ok, _} = emqx_rule_metrics:start_link(), - [emqx_metrics:set(M, 0) || M <- emqx_rule_metrics:overall_metrics()], Config. end_per_testcase(_, _Config) -> @@ -81,8 +80,6 @@ t_action(_) -> ?assertEqual(1, emqx_rule_metrics:get_actions_exception(<<"action:1">>)), ?assertEqual(2, emqx_rule_metrics:get_actions_taken(<<"action:2">>)), ?assertEqual(0, emqx_rule_metrics:get_actions_taken(<<"action:3">>)), - ?assertEqual(3, emqx_rule_metrics:get_overall('actions.taken')), - ?assertEqual(1, emqx_rule_metrics:get_overall('actions.exception')), ok = emqx_rule_metrics:clear_metrics(<<"action:1">>), ok = emqx_rule_metrics:clear_metrics(<<"action:2">>), ?assertEqual(0, emqx_rule_metrics:get_actions_taken(<<"action:1">>)), @@ -92,12 +89,19 @@ t_rule(_) -> ok = emqx_rule_metrics:create_rule_metrics(<<"rule:1">>), ok = emqx_rule_metrics:create_rule_metrics(<<"rule2">>), ok = emqx_rule_metrics:inc(<<"rule:1">>, 'rules.matched'), + ok = emqx_rule_metrics:inc(<<"rule:1">>, 'rules.passed'), + ok = emqx_rule_metrics:inc(<<"rule:1">>, 'rules.exception'), + ok = emqx_rule_metrics:inc(<<"rule:1">>, 'rules.no_result'), + ok = emqx_rule_metrics:inc(<<"rule:1">>, 'rules.failed'), ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'), ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'), ?assertEqual(1, emqx_rule_metrics:get(<<"rule:1">>, 'rules.matched')), + ?assertEqual(1, emqx_rule_metrics:get(<<"rule:1">>, 'rules.passed')), + ?assertEqual(1, emqx_rule_metrics:get(<<"rule:1">>, 'rules.exception')), + ?assertEqual(1, emqx_rule_metrics:get(<<"rule:1">>, 'rules.no_result')), + ?assertEqual(1, emqx_rule_metrics:get(<<"rule:1">>, 'rules.failed')), ?assertEqual(2, emqx_rule_metrics:get(<<"rule2">>, 'rules.matched')), ?assertEqual(0, emqx_rule_metrics:get(<<"rule3">>, 'rules.matched')), - ?assertEqual(3, emqx_rule_metrics:get_overall('rules.matched')), ok = emqx_rule_metrics:clear_rule_metrics(<<"rule:1">>), ok = emqx_rule_metrics:clear_rule_metrics(<<"rule2">>). @@ -119,24 +123,11 @@ rule_speed(_) -> ?LET(#{max := Max, current := Current}, emqx_rule_metrics:get_rule_speed(<<"rule1">>), {?assert(Max =< 2), ?assert(Current =< 2)}), - ct:pal("===== Speed: ~p~n", [emqx_rule_metrics:get_overall_rule_speed()]), - ?LET(#{max := Max, current := Current}, emqx_rule_metrics:get_overall_rule_speed(), - {?assert(Max =< 3), - ?assert(Current =< 3)}), ct:sleep(2100), ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_rule_metrics:get_rule_speed(<<"rule1">>), {?assert(Max =< 2), ?assert(Current == 0), ?assert(Last5Min =< 0.67)}), - ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_rule_metrics:get_overall_rule_speed(), - {?assert(Max =< 3), - ?assert(Current == 0), - ?assert(Last5Min =< 1)}), - ct:sleep(3000), - ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_rule_metrics:get_overall_rule_speed(), - {?assert(Max =< 3), - ?assert(Current == 0), - ?assert(Last5Min == 0)}), ok = emqx_rule_metrics:clear_rule_metrics(<<"rule1">>), ok = emqx_rule_metrics:clear_rule_metrics(<<"rule:2">>). @@ -146,14 +137,10 @@ rule_speed(_) -> % t_get(_) -> % error('TODO'). -% t_get_overall(_) -> -% error('TODO'). % t_get_rule_speed(_) -> % error('TODO'). -% t_get_overall_rule_speed(_) -> -% error('TODO'). % t_get_rule_metrics(_) -> % error('TODO'). @@ -163,7 +150,3 @@ rule_speed(_) -> % t_inc(_) -> % error('TODO'). - -% t_overall_metrics(_) -> -% error('TODO'). -