Merge pull request #6731 from HJianBo/main-v4.4-merged-main-v4.3
Sync main-v4.3 into main-v4.4
This commit is contained in:
commit
0157ef830e
|
@ -25,16 +25,19 @@
|
||||||
, stop/0
|
, stop/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ get_rules_matched/1
|
-export([ get_actions_taken/1
|
||||||
, get_actions_taken/1
|
|
||||||
, get_actions_success/1
|
, get_actions_success/1
|
||||||
, get_actions_error/1
|
, get_actions_error/1
|
||||||
, get_actions_exception/1
|
, get_actions_exception/1
|
||||||
, get_actions_retry/1
|
, get_actions_retry/1
|
||||||
|
, get_rules_matched/1
|
||||||
|
, get_rules_failed/1
|
||||||
|
, get_rules_passed/1
|
||||||
|
, get_rules_exception/1
|
||||||
|
, get_rules_no_result/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ inc_rules_matched/1
|
-export([ inc_rules_matched/2
|
||||||
, inc_rules_matched/2
|
|
||||||
, inc_actions_taken/1
|
, inc_actions_taken/1
|
||||||
, inc_actions_taken/2
|
, inc_actions_taken/2
|
||||||
, inc_actions_success/1
|
, inc_actions_success/1
|
||||||
|
@ -45,19 +48,21 @@
|
||||||
, inc_actions_exception/2
|
, inc_actions_exception/2
|
||||||
, inc_actions_retry/1
|
, inc_actions_retry/1
|
||||||
, inc_actions_retry/2
|
, inc_actions_retry/2
|
||||||
|
, inc_rules_matched/1
|
||||||
|
, inc_rules_failed/1
|
||||||
|
, inc_rules_passed/1
|
||||||
|
, inc_rules_exception/1
|
||||||
|
, inc_rules_no_result/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ inc/2
|
-export([ inc/2
|
||||||
, inc/3
|
, inc/3
|
||||||
, get/2
|
, get/2
|
||||||
, get_overall/1
|
|
||||||
, get_rule_speed/1
|
, get_rule_speed/1
|
||||||
, get_overall_rule_speed/0
|
|
||||||
, create_rule_metrics/1
|
, create_rule_metrics/1
|
||||||
, create_metrics/1
|
, create_metrics/1
|
||||||
, clear_rule_metrics/1
|
, clear_rule_metrics/1
|
||||||
, clear_metrics/1
|
, clear_metrics/1
|
||||||
, overall_metrics/0
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ get_rule_metrics/1
|
-export([ get_rule_metrics/1
|
||||||
|
@ -131,22 +136,18 @@ get(Id, Metric) ->
|
||||||
Ref -> counters:get(Ref, metrics_idx(Metric))
|
Ref -> counters:get(Ref, metrics_idx(Metric))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(get_overall(atom()) -> number()).
|
|
||||||
get_overall(Metric) ->
|
|
||||||
emqx_metrics:val(Metric).
|
|
||||||
|
|
||||||
-spec(get_rule_speed(rule_id()) -> map()).
|
-spec(get_rule_speed(rule_id()) -> map()).
|
||||||
get_rule_speed(Id) ->
|
get_rule_speed(Id) ->
|
||||||
gen_server:call(?MODULE, {get_rule_speed, Id}).
|
gen_server:call(?MODULE, {get_rule_speed, Id}).
|
||||||
|
|
||||||
-spec(get_overall_rule_speed() -> map()).
|
|
||||||
get_overall_rule_speed() ->
|
|
||||||
gen_server:call(?MODULE, get_overall_rule_speed).
|
|
||||||
|
|
||||||
-spec(get_rule_metrics(rule_id()) -> map()).
|
-spec(get_rule_metrics(rule_id()) -> map()).
|
||||||
get_rule_metrics(Id) ->
|
get_rule_metrics(Id) ->
|
||||||
#{max := Max, current := Current, last5m := Last5M} = get_rule_speed(Id),
|
#{max := Max, current := Current, last5m := Last5M} = get_rule_speed(Id),
|
||||||
#{matched => get_rules_matched(Id),
|
#{matched => get_rules_matched(Id),
|
||||||
|
failed => get_rules_failed(Id),
|
||||||
|
passed => get_rules_passed(Id),
|
||||||
|
exception => get_rules_exception(Id),
|
||||||
|
no_result => get_rules_no_result(Id),
|
||||||
speed => Current,
|
speed => Current,
|
||||||
speed_max => Max,
|
speed_max => Max,
|
||||||
speed_last5m => Last5M
|
speed_last5m => Last5M
|
||||||
|
@ -176,17 +177,7 @@ inc(Id, Metric, Val) ->
|
||||||
counters:add(couters_ref(Id), metrics_idx(Metric), Val);
|
counters:add(couters_ref(Id), metrics_idx(Metric), Val);
|
||||||
Ref ->
|
Ref ->
|
||||||
counters:add(Ref, metrics_idx(Metric), Val)
|
counters:add(Ref, metrics_idx(Metric), Val)
|
||||||
end,
|
end.
|
||||||
inc_overall(Metric, Val).
|
|
||||||
|
|
||||||
-spec(inc_overall(atom(), pos_integer()) -> ok).
|
|
||||||
inc_overall(Metric, Val) ->
|
|
||||||
emqx_metrics:inc(Metric, Val).
|
|
||||||
|
|
||||||
inc_rules_matched(Id) ->
|
|
||||||
inc_rules_matched(Id, 1).
|
|
||||||
inc_rules_matched(Id, Val) ->
|
|
||||||
inc(Id, 'rules.matched', Val).
|
|
||||||
|
|
||||||
inc_actions_taken(Id) ->
|
inc_actions_taken(Id) ->
|
||||||
inc_actions_taken(Id, 1).
|
inc_actions_taken(Id, 1).
|
||||||
|
@ -213,8 +204,32 @@ inc_actions_retry(Id) ->
|
||||||
inc_actions_retry(Id, Val) ->
|
inc_actions_retry(Id, Val) ->
|
||||||
inc(Id, 'actions.retry', Val).
|
inc(Id, 'actions.retry', Val).
|
||||||
|
|
||||||
get_rules_matched(Id) ->
|
inc_rules_matched(Id) ->
|
||||||
get(Id, 'rules.matched').
|
inc_rules_matched(Id, 1).
|
||||||
|
inc_rules_matched(Id, Val) ->
|
||||||
|
inc(Id, 'rules.matched', Val).
|
||||||
|
|
||||||
|
inc_rules_failed(Id) ->
|
||||||
|
inc_rules_failed(Id, 1).
|
||||||
|
inc_rules_failed(Id, Val) ->
|
||||||
|
inc(Id, 'rules.failed', Val).
|
||||||
|
|
||||||
|
inc_rules_passed(Id) ->
|
||||||
|
inc_rules_passed(Id, 1).
|
||||||
|
inc_rules_passed(Id, Val) ->
|
||||||
|
inc(Id, 'rules.passed', Val).
|
||||||
|
|
||||||
|
inc_rules_exception(Id) ->
|
||||||
|
inc_rules_exception(Id, 1).
|
||||||
|
inc_rules_exception(Id, Val) ->
|
||||||
|
inc(Id, 'rules.failed', Val),
|
||||||
|
inc(Id, 'rules.exception', Val).
|
||||||
|
|
||||||
|
inc_rules_no_result(Id) ->
|
||||||
|
inc_rules_no_result(Id, 1).
|
||||||
|
inc_rules_no_result(Id, Val) ->
|
||||||
|
inc(Id, 'rules.failed', Val),
|
||||||
|
inc(Id, 'rules.no_result', Val).
|
||||||
|
|
||||||
get_actions_taken(Id) ->
|
get_actions_taken(Id) ->
|
||||||
get(Id, 'actions.taken').
|
get(Id, 'actions.taken').
|
||||||
|
@ -231,13 +246,26 @@ get_actions_exception(Id) ->
|
||||||
get_actions_retry(Id) ->
|
get_actions_retry(Id) ->
|
||||||
get(Id, 'actions.retry').
|
get(Id, 'actions.retry').
|
||||||
|
|
||||||
|
get_rules_matched(Id) ->
|
||||||
|
get(Id, 'rules.matched').
|
||||||
|
|
||||||
|
get_rules_failed(Id) ->
|
||||||
|
get(Id, 'rules.failed').
|
||||||
|
|
||||||
|
get_rules_passed(Id) ->
|
||||||
|
get(Id, 'rules.passed').
|
||||||
|
|
||||||
|
get_rules_exception(Id) ->
|
||||||
|
get(Id, 'rules.exception').
|
||||||
|
|
||||||
|
get_rules_no_result(Id) ->
|
||||||
|
get(Id, 'rules.no_result').
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
erlang:process_flag(trap_exit, true),
|
erlang:process_flag(trap_exit, true),
|
||||||
%% the overall counters
|
|
||||||
[ok = emqx_metrics:ensure(Metric) || Metric <- overall_metrics()],
|
|
||||||
%% the speed metrics
|
%% the speed metrics
|
||||||
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
||||||
{ok, #state{overall_rule_speed = #rule_speed{}}}.
|
{ok, #state{overall_rule_speed = #rule_speed{}}}.
|
||||||
|
@ -250,9 +278,6 @@ handle_call({get_rule_speed, Id}, _From, State = #state{rule_speeds = RuleSpeeds
|
||||||
Speed -> format_rule_speed(Speed)
|
Speed -> format_rule_speed(Speed)
|
||||||
end, State};
|
end, State};
|
||||||
|
|
||||||
handle_call(get_overall_rule_speed, _From, State = #state{overall_rule_speed = RuleSpeed}) ->
|
|
||||||
{reply, format_rule_speed(RuleSpeed), State};
|
|
||||||
|
|
||||||
handle_call({create_metrics, Id}, _From, State = #state{metric_ids = MIDs}) ->
|
handle_call({create_metrics, Id}, _From, State = #state{metric_ids = MIDs}) ->
|
||||||
{reply, create_counters(Id), State#state{metric_ids = sets:add_element(Id, MIDs)}};
|
{reply, create_counters(Id), State#state{metric_ids = sets:add_element(Id, MIDs)}};
|
||||||
|
|
||||||
|
@ -289,61 +314,74 @@ handle_info(ticking, State = #state{rule_speeds = undefined}) ->
|
||||||
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0,
|
handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0}) ->
|
||||||
overall_rule_speed = OverallRuleSpeed0}) ->
|
|
||||||
RuleSpeeds = maps:map(
|
RuleSpeeds = maps:map(
|
||||||
fun(Id, RuleSpeed) ->
|
fun(Id, RuleSpeed) ->
|
||||||
calculate_speed(get_rules_matched(Id), RuleSpeed)
|
calculate_speed(get_rules_matched(Id), RuleSpeed)
|
||||||
end, RuleSpeeds0),
|
end, RuleSpeeds0),
|
||||||
OverallRuleSpeed = calculate_speed(get_overall('rules.matched'), OverallRuleSpeed0),
|
|
||||||
async_refresh_resource_status(),
|
async_refresh_resource_status(),
|
||||||
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
||||||
{noreply, State#state{rule_speeds = RuleSpeeds,
|
{noreply, State#state{rule_speeds = RuleSpeeds}};
|
||||||
overall_rule_speed = OverallRuleSpeed}};
|
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
code_change({down, Vsn}, State = #state{metric_ids = MIDs}, _Extra)
|
code_change({down, _Vsn}, State = #state{metric_ids = MIDs}, [Vsn]) ->
|
||||||
when Vsn =:= "4.2.0";
|
case string:tokens(Vsn, ".") of
|
||||||
Vsn =:= "4.2.1" ->
|
["4", "3", SVal] ->
|
||||||
emqx_metrics:ensure('actions.failure'),
|
{Val, []} = string:to_integer(SVal),
|
||||||
emqx_metrics:set('actions.failure',
|
case Val =< 7 of
|
||||||
emqx_metrics:val('actions.error')
|
true ->
|
||||||
+ emqx_metrics:val('actions.exception')),
|
|
||||||
[begin
|
[begin
|
||||||
Matched = get_rules_matched(Id),
|
Passed = get_rules_passed(Id),
|
||||||
Succ = get_actions_success(Id),
|
Take = get_actions_taken(Id),
|
||||||
|
Success = get_actions_success(Id),
|
||||||
Error = get_actions_error(Id),
|
Error = get_actions_error(Id),
|
||||||
Except = get_actions_exception(Id),
|
Exception = get_actions_exception(Id),
|
||||||
|
Retry = get_actions_retry(Id),
|
||||||
ok = delete_counters(Id),
|
ok = delete_counters(Id),
|
||||||
ok = create_counters(Id),
|
ok = create_counters(Id, 7),
|
||||||
inc_rules_matched(Id, Matched),
|
inc_rules_matched(Id, Passed),
|
||||||
inc_actions_success(Id, Succ),
|
inc_actions_taken(Id, Take),
|
||||||
inc_actions_error(Id, Error + Except)
|
inc_actions_success(Id, Success),
|
||||||
|
inc_actions_error(Id, Error),
|
||||||
|
inc_actions_exception(Id, Exception),
|
||||||
|
inc_actions_retry(Id, Retry)
|
||||||
end || Id <- sets:to_list(MIDs)],
|
end || Id <- sets:to_list(MIDs)],
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
false -> {ok, State}
|
||||||
|
end;
|
||||||
|
_ -> {ok, State}
|
||||||
|
end;
|
||||||
|
|
||||||
code_change(Vsn, State = #state{metric_ids = MIDs}, _Extra)
|
code_change(_Vsn, State = #state{metric_ids = MIDs}, [Vsn]) ->
|
||||||
when Vsn =:= "4.2.0";
|
case string:tokens(Vsn, ".") of
|
||||||
Vsn =:= "4.2.1" ->
|
["4", "3", SVal] ->
|
||||||
[emqx_metrics:ensure(Name)
|
{Val, []} = string:to_integer(SVal),
|
||||||
|| Name <-
|
case Val =< 7 of
|
||||||
['actions.error', 'actions.taken',
|
true ->
|
||||||
'actions.exception', 'actions.retry'
|
|
||||||
]],
|
|
||||||
emqx_metrics:set('actions.error', emqx_metrics:val('actions.failure')),
|
|
||||||
[begin
|
[begin
|
||||||
Matched = get_rules_matched(Id),
|
Matched = get_rules_matched(Id),
|
||||||
Succ = get_actions_success(Id),
|
Take = get_actions_taken(Id),
|
||||||
|
Success = get_actions_success(Id),
|
||||||
Error = get_actions_error(Id),
|
Error = get_actions_error(Id),
|
||||||
|
Exception = get_actions_exception(Id),
|
||||||
|
Retry = get_actions_retry(Id),
|
||||||
ok = delete_counters(Id),
|
ok = delete_counters(Id),
|
||||||
ok = create_counters(Id),
|
ok = create_counters(Id),
|
||||||
inc_rules_matched(Id, Matched),
|
inc_rules_matched(Id, Matched),
|
||||||
inc_actions_success(Id, Succ),
|
inc_rules_passed(Id, Matched),
|
||||||
inc_actions_error(Id, Error)
|
inc_actions_taken(Id, Take),
|
||||||
|
inc_actions_success(Id, Success),
|
||||||
|
inc_actions_error(Id, Error),
|
||||||
|
inc_actions_exception(Id, Exception),
|
||||||
|
inc_actions_retry(Id, Retry)
|
||||||
end || Id <- sets:to_list(MIDs)],
|
end || Id <- sets:to_list(MIDs)],
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
false -> {ok, State}
|
||||||
|
end;
|
||||||
|
_ -> {ok, State}
|
||||||
|
end;
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -363,10 +401,12 @@ async_refresh_resource_status() ->
|
||||||
spawn(emqx_rule_engine, refresh_resource_status, []).
|
spawn(emqx_rule_engine, refresh_resource_status, []).
|
||||||
|
|
||||||
create_counters(Id) ->
|
create_counters(Id) ->
|
||||||
|
create_counters(Id, max_counters_size()).
|
||||||
|
create_counters(Id, Size) ->
|
||||||
case couters_ref(Id) of
|
case couters_ref(Id) of
|
||||||
not_found ->
|
not_found ->
|
||||||
ok = persistent_term:put(?CRefID(Id),
|
ok = persistent_term:put(?CRefID(Id),
|
||||||
counters:new(max_counters_size(), [write_concurrency]));
|
counters:new(Size, [write_concurrency]));
|
||||||
_Ref -> ok
|
_Ref -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -424,7 +464,7 @@ precision(Float, N) ->
|
||||||
%% Metrics Definitions
|
%% Metrics Definitions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
max_counters_size() -> 7.
|
max_counters_size() -> 11.
|
||||||
|
|
||||||
metrics_idx('rules.matched') -> 1;
|
metrics_idx('rules.matched') -> 1;
|
||||||
metrics_idx('actions.success') -> 2;
|
metrics_idx('actions.success') -> 2;
|
||||||
|
@ -432,13 +472,8 @@ metrics_idx('actions.error') -> 3;
|
||||||
metrics_idx('actions.taken') -> 4;
|
metrics_idx('actions.taken') -> 4;
|
||||||
metrics_idx('actions.exception') -> 5;
|
metrics_idx('actions.exception') -> 5;
|
||||||
metrics_idx('actions.retry') -> 6;
|
metrics_idx('actions.retry') -> 6;
|
||||||
metrics_idx(_) -> 7.
|
metrics_idx('rules.failed') -> 7;
|
||||||
|
metrics_idx('rules.passed') -> 8;
|
||||||
overall_metrics() ->
|
metrics_idx('rules.exception') -> 9;
|
||||||
[ 'rules.matched'
|
metrics_idx('rules.no_result') -> 10;
|
||||||
, 'actions.success'
|
metrics_idx(_) -> 11.
|
||||||
, 'actions.error'
|
|
||||||
, 'actions.taken'
|
|
||||||
, 'actions.exception'
|
|
||||||
, 'actions.retry'
|
|
||||||
].
|
|
||||||
|
|
|
@ -55,18 +55,23 @@ apply_rules([Rule = #rule{id = RuleID}|More], Input) ->
|
||||||
catch
|
catch
|
||||||
%% ignore the errors if select or match failed
|
%% ignore the errors if select or match failed
|
||||||
_:{select_and_transform_error, Error} ->
|
_:{select_and_transform_error, Error} ->
|
||||||
|
emqx_rule_metrics:inc_rules_exception(RuleID),
|
||||||
?LOG(warning, "SELECT clause exception for ~s failed: ~p",
|
?LOG(warning, "SELECT clause exception for ~s failed: ~p",
|
||||||
[RuleID, Error]);
|
[RuleID, Error]);
|
||||||
_:{match_conditions_error, Error} ->
|
_:{match_conditions_error, Error} ->
|
||||||
|
emqx_rule_metrics:inc_rules_exception(RuleID),
|
||||||
?LOG(warning, "WHERE clause exception for ~s failed: ~p",
|
?LOG(warning, "WHERE clause exception for ~s failed: ~p",
|
||||||
[RuleID, Error]);
|
[RuleID, Error]);
|
||||||
_:{select_and_collect_error, Error} ->
|
_:{select_and_collect_error, Error} ->
|
||||||
|
emqx_rule_metrics:inc_rules_exception(RuleID),
|
||||||
?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
|
?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
|
||||||
[RuleID, Error]);
|
[RuleID, Error]);
|
||||||
_:{match_incase_error, Error} ->
|
_:{match_incase_error, Error} ->
|
||||||
|
emqx_rule_metrics:inc_rules_exception(RuleID),
|
||||||
?LOG(warning, "INCASE clause exception for ~s failed: ~p",
|
?LOG(warning, "INCASE clause exception for ~s failed: ~p",
|
||||||
[RuleID, Error]);
|
[RuleID, Error]);
|
||||||
_:Error:StkTrace ->
|
_:Error:StkTrace ->
|
||||||
|
emqx_rule_metrics:inc_rules_exception(RuleID),
|
||||||
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
|
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
|
||||||
[RuleID, Error, StkTrace])
|
[RuleID, Error, StkTrace])
|
||||||
end,
|
end,
|
||||||
|
@ -78,6 +83,7 @@ apply_rule_discard_result(Rule, Input) ->
|
||||||
|
|
||||||
apply_rule(Rule = #rule{id = RuleID}, Input) ->
|
apply_rule(Rule = #rule{id = RuleID}, Input) ->
|
||||||
clear_rule_payload(),
|
clear_rule_payload(),
|
||||||
|
ok = emqx_rule_metrics:inc_rules_matched(RuleID),
|
||||||
do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
|
do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
|
||||||
|
|
||||||
do_apply_rule(#rule{id = RuleId,
|
do_apply_rule(#rule{id = RuleId,
|
||||||
|
@ -94,10 +100,14 @@ do_apply_rule(#rule{id = RuleId,
|
||||||
case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
|
case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
|
||||||
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
|
|
||||||
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
||||||
|
case Collection2 of
|
||||||
|
[] -> emqx_rule_metrics:inc_rules_no_result(RuleId);
|
||||||
|
_ -> emqx_rule_metrics:inc_rules_passed(RuleId)
|
||||||
|
end,
|
||||||
{ok, [take_actions(Actions, Coll, Input, OnFailed) || Coll <- Collection2]};
|
{ok, [take_actions(Actions, Coll, Input, OnFailed) || Coll <- Collection2]};
|
||||||
false ->
|
false ->
|
||||||
|
ok = emqx_rule_metrics:inc_rules_no_result(RuleId),
|
||||||
{error, nomatch}
|
{error, nomatch}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -112,9 +122,10 @@ do_apply_rule(#rule{id = RuleId,
|
||||||
case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
|
case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
|
||||||
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
|
ok = emqx_rule_metrics:inc_rules_passed(RuleId),
|
||||||
{ok, take_actions(Actions, Selected, Input, OnFailed)};
|
{ok, take_actions(Actions, Selected, Input, OnFailed)};
|
||||||
false ->
|
false ->
|
||||||
|
ok = emqx_rule_metrics:inc_rules_no_result(RuleId),
|
||||||
{error, nomatch}
|
{error, nomatch}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,7 @@ all() ->
|
||||||
, {group, events}
|
, {group, events}
|
||||||
, {group, multi_actions}
|
, {group, multi_actions}
|
||||||
, {group, bugs}
|
, {group, bugs}
|
||||||
|
, {group, rule_metrics}
|
||||||
].
|
].
|
||||||
|
|
||||||
suite() ->
|
suite() ->
|
||||||
|
@ -125,6 +126,10 @@ groups() ->
|
||||||
t_sqlparse_true_false,
|
t_sqlparse_true_false,
|
||||||
t_sqlparse_new_map
|
t_sqlparse_new_map
|
||||||
]},
|
]},
|
||||||
|
{rule_metrics, [],
|
||||||
|
[t_metrics,
|
||||||
|
t_metrics1
|
||||||
|
]},
|
||||||
{events, [],
|
{events, [],
|
||||||
[t_events
|
[t_events
|
||||||
]},
|
]},
|
||||||
|
@ -1327,6 +1332,110 @@ t_sqlselect_3(_Config) ->
|
||||||
emqtt:stop(Client),
|
emqtt:stop(Client),
|
||||||
emqx_rule_registry:remove_rule(TopicRule).
|
emqx_rule_registry:remove_rule(TopicRule).
|
||||||
|
|
||||||
|
t_metrics(_Config) ->
|
||||||
|
ok = emqx_rule_engine:load_providers(),
|
||||||
|
TopicRule = create_simple_repub_rule(
|
||||||
|
<<"t2">>,
|
||||||
|
"SELECT payload.msg as msg, payload.idx as idx "
|
||||||
|
"FROM \"t1\" "
|
||||||
|
"WHERE msg = 'hello' and idx + 1 > 2 "),
|
||||||
|
#rule{id = RuleId} = TopicRule,
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_matched(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_passed(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_failed(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_no_result(RuleId)),
|
||||||
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
ct:sleep(200),
|
||||||
|
PublishMoreTimes = fun(SomeMessage, Times) ->
|
||||||
|
[begin
|
||||||
|
emqtt:publish(Client, <<"t1">>, SomeMessage, 0),
|
||||||
|
ct:sleep(200)
|
||||||
|
end || _ <- lists:seq(1, Times)] end,
|
||||||
|
PublishMoreTimes(<<"{\"msg\":\"hello\", \"idx\":5}">>, 10),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_matched(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_failed(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_no_result(RuleId)),
|
||||||
|
PublishMoreTimes(<<"{\"msg\":\"hello\", \"idx\":0}">>, 10),
|
||||||
|
?assertEqual(20, emqx_rule_metrics:get_rules_matched(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_failed(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_no_result(RuleId)),
|
||||||
|
PublishMoreTimes(<<"{\"msg\":\"hello\", \"idx\":\"somevalue\"}">>, 10),
|
||||||
|
?assertEqual(30, emqx_rule_metrics:get_rules_matched(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)),
|
||||||
|
?assertEqual(20, emqx_rule_metrics:get_rules_failed(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_exception(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_no_result(RuleId)),
|
||||||
|
emqtt:stop(Client),
|
||||||
|
emqx_rule_registry:remove_rule(TopicRule).
|
||||||
|
|
||||||
|
t_metrics1(_Config) ->
|
||||||
|
ok = emqx_rule_engine:load_providers(),
|
||||||
|
TopicRule = create_simple_repub_rule(
|
||||||
|
<<"t2">>,
|
||||||
|
"FOREACH payload.sensors "
|
||||||
|
"DO clientid,item.name as name, item.idx + 1 as idx "
|
||||||
|
"INCASE item.idx >= 1 "
|
||||||
|
"FROM \"t1\" "),
|
||||||
|
#rule{id = RuleId} = TopicRule,
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_matched(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_passed(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_failed(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_no_result(RuleId)),
|
||||||
|
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
ct:sleep(200),
|
||||||
|
PublishMoreTimes = fun(SomeMessage, Times) ->
|
||||||
|
[begin
|
||||||
|
emqtt:publish(Client, <<"t1">>, SomeMessage, 0),
|
||||||
|
ct:sleep(200)
|
||||||
|
end || _ <- lists:seq(1, Times)] end,
|
||||||
|
Message = <<"{\"date\": \"2020-04-24\",
|
||||||
|
\"sensors\": [
|
||||||
|
{\"name\": \"a\", \"idx\":0},
|
||||||
|
{\"name\": \"b\", \"idx\":1},
|
||||||
|
{\"name\": \"c\", \"idx\":2}
|
||||||
|
]}">>,
|
||||||
|
PublishMoreTimes(Message, 10),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_matched(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_failed(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_no_result(RuleId)),
|
||||||
|
Message1 = <<"{\"date\": \"2020-04-24\",
|
||||||
|
\"sensors\": [
|
||||||
|
{\"name\": \"a\", \"idx\":0},
|
||||||
|
{\"name\": \"b\", \"idx\":0},
|
||||||
|
{\"name\": \"c\", \"idx\":0}
|
||||||
|
]}">>,
|
||||||
|
PublishMoreTimes(Message1, 10),
|
||||||
|
?assertEqual(20, emqx_rule_metrics:get_rules_matched(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_failed(RuleId)),
|
||||||
|
?assertEqual(0, emqx_rule_metrics:get_rules_exception(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_no_result(RuleId)),
|
||||||
|
Message2 = <<"{\"date\": \"2020-04-24\",
|
||||||
|
\"sensors\": [
|
||||||
|
{\"name\": \"a\", \"idx\":0},
|
||||||
|
{\"name\": \"b\", \"idx\":1},
|
||||||
|
{\"name\": \"c\", \"idx\":\"some string\"}
|
||||||
|
]}">>,
|
||||||
|
PublishMoreTimes(Message2, 10),
|
||||||
|
?assertEqual(30, emqx_rule_metrics:get_rules_matched(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_passed(RuleId)),
|
||||||
|
?assertEqual(20, emqx_rule_metrics:get_rules_failed(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_exception(RuleId)),
|
||||||
|
?assertEqual(10, emqx_rule_metrics:get_rules_no_result(RuleId)),
|
||||||
|
|
||||||
|
emqtt:stop(Client),
|
||||||
|
emqx_rule_registry:remove_rule(TopicRule).
|
||||||
|
|
||||||
t_sqlselect_multi_actoins_1(Config) ->
|
t_sqlselect_multi_actoins_1(Config) ->
|
||||||
%% We create 2 actions in the same rule:
|
%% We create 2 actions in the same rule:
|
||||||
%% The first will fail and we need to make sure the
|
%% The first will fail and we need to make sure the
|
||||||
|
|
|
@ -55,7 +55,6 @@ end_per_suite(_Config) ->
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
catch emqx_rule_metrics:stop(),
|
catch emqx_rule_metrics:stop(),
|
||||||
{ok, _} = emqx_rule_metrics:start_link(),
|
{ok, _} = emqx_rule_metrics:start_link(),
|
||||||
[emqx_metrics:set(M, 0) || M <- emqx_rule_metrics:overall_metrics()],
|
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(_, _Config) ->
|
end_per_testcase(_, _Config) ->
|
||||||
|
@ -81,8 +80,6 @@ t_action(_) ->
|
||||||
?assertEqual(1, emqx_rule_metrics:get_actions_exception(<<"action:1">>)),
|
?assertEqual(1, emqx_rule_metrics:get_actions_exception(<<"action:1">>)),
|
||||||
?assertEqual(2, emqx_rule_metrics:get_actions_taken(<<"action:2">>)),
|
?assertEqual(2, emqx_rule_metrics:get_actions_taken(<<"action:2">>)),
|
||||||
?assertEqual(0, emqx_rule_metrics:get_actions_taken(<<"action:3">>)),
|
?assertEqual(0, emqx_rule_metrics:get_actions_taken(<<"action:3">>)),
|
||||||
?assertEqual(3, emqx_rule_metrics:get_overall('actions.taken')),
|
|
||||||
?assertEqual(1, emqx_rule_metrics:get_overall('actions.exception')),
|
|
||||||
ok = emqx_rule_metrics:clear_metrics(<<"action:1">>),
|
ok = emqx_rule_metrics:clear_metrics(<<"action:1">>),
|
||||||
ok = emqx_rule_metrics:clear_metrics(<<"action:2">>),
|
ok = emqx_rule_metrics:clear_metrics(<<"action:2">>),
|
||||||
?assertEqual(0, emqx_rule_metrics:get_actions_taken(<<"action:1">>)),
|
?assertEqual(0, emqx_rule_metrics:get_actions_taken(<<"action:1">>)),
|
||||||
|
@ -92,12 +89,19 @@ t_rule(_) ->
|
||||||
ok = emqx_rule_metrics:create_rule_metrics(<<"rule:1">>),
|
ok = emqx_rule_metrics:create_rule_metrics(<<"rule:1">>),
|
||||||
ok = emqx_rule_metrics:create_rule_metrics(<<"rule2">>),
|
ok = emqx_rule_metrics:create_rule_metrics(<<"rule2">>),
|
||||||
ok = emqx_rule_metrics:inc(<<"rule:1">>, 'rules.matched'),
|
ok = emqx_rule_metrics:inc(<<"rule:1">>, 'rules.matched'),
|
||||||
|
ok = emqx_rule_metrics:inc(<<"rule:1">>, 'rules.passed'),
|
||||||
|
ok = emqx_rule_metrics:inc(<<"rule:1">>, 'rules.exception'),
|
||||||
|
ok = emqx_rule_metrics:inc(<<"rule:1">>, 'rules.no_result'),
|
||||||
|
ok = emqx_rule_metrics:inc(<<"rule:1">>, 'rules.failed'),
|
||||||
ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'),
|
ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'),
|
||||||
ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'),
|
ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'),
|
||||||
?assertEqual(1, emqx_rule_metrics:get(<<"rule:1">>, 'rules.matched')),
|
?assertEqual(1, emqx_rule_metrics:get(<<"rule:1">>, 'rules.matched')),
|
||||||
|
?assertEqual(1, emqx_rule_metrics:get(<<"rule:1">>, 'rules.passed')),
|
||||||
|
?assertEqual(1, emqx_rule_metrics:get(<<"rule:1">>, 'rules.exception')),
|
||||||
|
?assertEqual(1, emqx_rule_metrics:get(<<"rule:1">>, 'rules.no_result')),
|
||||||
|
?assertEqual(1, emqx_rule_metrics:get(<<"rule:1">>, 'rules.failed')),
|
||||||
?assertEqual(2, emqx_rule_metrics:get(<<"rule2">>, 'rules.matched')),
|
?assertEqual(2, emqx_rule_metrics:get(<<"rule2">>, 'rules.matched')),
|
||||||
?assertEqual(0, emqx_rule_metrics:get(<<"rule3">>, 'rules.matched')),
|
?assertEqual(0, emqx_rule_metrics:get(<<"rule3">>, 'rules.matched')),
|
||||||
?assertEqual(3, emqx_rule_metrics:get_overall('rules.matched')),
|
|
||||||
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule:1">>),
|
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule:1">>),
|
||||||
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule2">>).
|
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule2">>).
|
||||||
|
|
||||||
|
@ -119,24 +123,11 @@ rule_speed(_) ->
|
||||||
?LET(#{max := Max, current := Current}, emqx_rule_metrics:get_rule_speed(<<"rule1">>),
|
?LET(#{max := Max, current := Current}, emqx_rule_metrics:get_rule_speed(<<"rule1">>),
|
||||||
{?assert(Max =< 2),
|
{?assert(Max =< 2),
|
||||||
?assert(Current =< 2)}),
|
?assert(Current =< 2)}),
|
||||||
ct:pal("===== Speed: ~p~n", [emqx_rule_metrics:get_overall_rule_speed()]),
|
|
||||||
?LET(#{max := Max, current := Current}, emqx_rule_metrics:get_overall_rule_speed(),
|
|
||||||
{?assert(Max =< 3),
|
|
||||||
?assert(Current =< 3)}),
|
|
||||||
ct:sleep(2100),
|
ct:sleep(2100),
|
||||||
?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_rule_metrics:get_rule_speed(<<"rule1">>),
|
?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_rule_metrics:get_rule_speed(<<"rule1">>),
|
||||||
{?assert(Max =< 2),
|
{?assert(Max =< 2),
|
||||||
?assert(Current == 0),
|
?assert(Current == 0),
|
||||||
?assert(Last5Min =< 0.67)}),
|
?assert(Last5Min =< 0.67)}),
|
||||||
?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_rule_metrics:get_overall_rule_speed(),
|
|
||||||
{?assert(Max =< 3),
|
|
||||||
?assert(Current == 0),
|
|
||||||
?assert(Last5Min =< 1)}),
|
|
||||||
ct:sleep(3000),
|
|
||||||
?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_rule_metrics:get_overall_rule_speed(),
|
|
||||||
{?assert(Max =< 3),
|
|
||||||
?assert(Current == 0),
|
|
||||||
?assert(Last5Min == 0)}),
|
|
||||||
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule1">>),
|
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule1">>),
|
||||||
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule:2">>).
|
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule:2">>).
|
||||||
|
|
||||||
|
@ -146,14 +137,10 @@ rule_speed(_) ->
|
||||||
% t_get(_) ->
|
% t_get(_) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
|
||||||
% t_get_overall(_) ->
|
|
||||||
% error('TODO').
|
|
||||||
|
|
||||||
% t_get_rule_speed(_) ->
|
% t_get_rule_speed(_) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
|
||||||
% t_get_overall_rule_speed(_) ->
|
|
||||||
% error('TODO').
|
|
||||||
|
|
||||||
% t_get_rule_metrics(_) ->
|
% t_get_rule_metrics(_) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
@ -163,7 +150,3 @@ rule_speed(_) ->
|
||||||
|
|
||||||
% t_inc(_) ->
|
% t_inc(_) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
|
||||||
% t_overall_metrics(_) ->
|
|
||||||
% error('TODO').
|
|
||||||
|
|
||||||
|
|
2
build
2
build
|
@ -65,7 +65,7 @@ make_relup() {
|
||||||
if [ -d "$releases_dir" ]; then
|
if [ -d "$releases_dir" ]; then
|
||||||
while read -r zip; do
|
while read -r zip; do
|
||||||
local base_vsn
|
local base_vsn
|
||||||
base_vsn="$(echo "$zip" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-[0-9a-e]{8})?")"
|
base_vsn="$(echo "$zip" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-[0-9a-f]{8})?")"
|
||||||
if [ ! -d "$releases_dir/$base_vsn" ]; then
|
if [ ! -d "$releases_dir/$base_vsn" ]; then
|
||||||
local tmp_dir
|
local tmp_dir
|
||||||
tmp_dir="$(mktemp -d -t emqx.XXXXXXX)"
|
tmp_dir="$(mktemp -d -t emqx.XXXXXXX)"
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
deprecated_function_calls,warnings_as_errors,deprecated_functions]}.
|
deprecated_function_calls,warnings_as_errors,deprecated_functions]}.
|
||||||
|
|
||||||
{dialyzer, [
|
{dialyzer, [
|
||||||
{warnings, [unmatched_returns, error_handling, race_conditions]},
|
{warnings, [unmatched_returns, error_handling]},
|
||||||
{plt_location, "."},
|
{plt_location, "."},
|
||||||
{plt_prefix, "emqx_dialyzer"},
|
{plt_prefix, "emqx_dialyzer"},
|
||||||
{plt_apps, all_apps},
|
{plt_apps, all_apps},
|
||||||
|
|
Loading…
Reference in New Issue