fix(rules): improve the names of the metrics

This commit is contained in:
Shawn 2022-01-06 15:46:32 +08:00
parent 67a60e1153
commit 72d55c8c0d
7 changed files with 105 additions and 65 deletions

View File

@ -165,7 +165,7 @@ handle_publish(Msg, undefined) ->
?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as"
"_'ingress'_is_not_configured", "_'ingress'_is_not_configured",
message => Msg}); message => Msg});
handle_publish(Msg0, Vars) -> handle_publish(#{properties := Props} = Msg0, Vars) ->
Msg = format_msg_received(Msg0), Msg = format_msg_received(Msg0),
?SLOG(debug, #{msg => "publish_to_local_broker", ?SLOG(debug, #{msg => "publish_to_local_broker",
message => Msg, vars => Vars}), message => Msg, vars => Vars}),
@ -174,7 +174,7 @@ handle_publish(Msg0, Vars) ->
_ = erlang:apply(Mod, Func, [Msg | Args]); _ = erlang:apply(Mod, Func, [Msg | Args]);
_ -> ok _ -> ok
end, end,
maybe_publish_to_local_broker(Msg0, Vars). maybe_publish_to_local_broker(Msg, Vars, Props).
handle_disconnected(Reason, Parent) -> handle_disconnected(Reason, Parent) ->
Parent ! {disconnected, self(), Reason}. Parent ! {disconnected, self(), Reason}.
@ -195,14 +195,15 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
process_config(Config) -> process_config(Config) ->
maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) -> maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars,
Props) ->
case maps:get(local_topic, Vars, undefined) of case maps:get(local_topic, Vars, undefined) of
undefined -> undefined ->
ok; %% local topic is not set, discard it ok; %% local topic is not set, discard it
_ -> _ ->
case emqx_topic:match(Topic, SubTopic) of case emqx_topic:match(Topic, SubTopic) of
true -> true ->
_ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)), _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)),
ok; ok;
false -> false ->
?SLOG(warning, #{msg => "discard_message_as_topic_not_matched", ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched",

View File

@ -20,7 +20,7 @@
, from_binary/1 , from_binary/1
, make_pub_vars/2 , make_pub_vars/2
, to_remote_msg/2 , to_remote_msg/2
, to_broker_msg/2 , to_broker_msg/3
, estimate_size/1 , estimate_size/1
]). ]).
@ -78,9 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}. Msg#message{topic = topic(Mountpoint, Topic)}.
%% published from remote node over a MQTT connection %% published from remote node over a MQTT connection
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, to_broker_msg(#{dup := Dup} = MapMsg,
#{local_topic := TopicToken, payload := PayloadToken, #{local_topic := TopicToken, payload := PayloadToken,
local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}, Props) ->
Topic = replace_vars_in_str(TopicToken, MapMsg), Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(PayloadToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg),

View File

@ -524,14 +524,14 @@ t_ingress_mqtt_bridge_with_rules(_) ->
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
#{ <<"id">> := RuleId #{ <<"id">> := RuleId
, <<"metrics">> := #{ , <<"metrics">> := #{
<<"matched">> := 1, <<"sql.matched">> := 1,
<<"passed">> := 1, <<"sql.passed">> := 1,
<<"failed">> := 0, <<"sql.failed">> := 0,
<<"failed.exception">> := 0, <<"sql.failed.exception">> := 0,
<<"failed.no_result">> := 0, <<"sql.failed.no_result">> := 0,
<<"rate">> := _, <<"sql.matched.rate">> := _,
<<"rate_max">> := _, <<"sql.matched.rate.max">> := _,
<<"rate_last5m">> := _, <<"sql.matched.rate.last5m">> := _,
<<"outputs.total">> := 1, <<"outputs.total">> := 1,
<<"outputs.success">> := 1, <<"outputs.success">> := 1,
<<"outputs.failed">> := 0, <<"outputs.failed">> := 0,
@ -613,14 +613,14 @@ t_egress_mqtt_bridge_with_rules(_) ->
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
#{ <<"id">> := RuleId #{ <<"id">> := RuleId
, <<"metrics">> := #{ , <<"metrics">> := #{
<<"matched">> := 1, <<"sql.matched">> := 1,
<<"passed">> := 1, <<"sql.passed">> := 1,
<<"failed">> := 0, <<"sql.failed">> := 0,
<<"failed.exception">> := 0, <<"sql.failed.exception">> := 0,
<<"failed.no_result">> := 0, <<"sql.failed.no_result">> := 0,
<<"rate">> := _, <<"sql.matched.rate">> := _,
<<"rate_max">> := _, <<"sql.matched.rate.max">> := _,
<<"rate_last5m">> := _, <<"sql.matched.rate.last5m">> := _,
<<"outputs.total">> := 1, <<"outputs.total">> := 1,
<<"outputs.success">> := 1, <<"outputs.success">> := 1,
<<"outputs.failed">> := 0, <<"outputs.failed">> := 0,

View File

@ -81,11 +81,41 @@ fields("rule_test") ->
]; ];
fields("metrics") -> fields("metrics") ->
[ {"matched", sc(integer(), #{desc => "How much times this rule is matched"})} [ {"sql.matched", sc(integer(), #{
, {"rate", sc(float(), #{desc => "The rate of matched, times/second"})} desc => "How much times the FROM clause of the SQL is matched."
, {"rate_max", sc(float(), #{desc => "The max rate of matched, times/second"})} })}
, {"rate_last5m", sc(float(), , {"sql.matched.rate", sc(float(), #{desc => "The rate of matched, times/second"})}
, {"sql.matched.rate.max", sc(float(), #{desc => "The max rate of matched, times/second"})}
, {"sql.matched.rate.last5m", sc(float(),
#{desc => "The average rate of matched in last 5 mins, times/second"})} #{desc => "The average rate of matched in last 5 mins, times/second"})}
, {"sql.passed", sc(integer(), #{desc => "How much times the SQL is passed"})}
, {"sql.failed", sc(integer(), #{desc => "How much times the SQL is failed"})}
, {"sql.failed.exception", sc(integer(), #{
desc => "How much times the SQL is failed due to exceptions. "
"This may because of a crash when calling a SQL function, or "
"trying to do arithmetic operation on undefined variables"
})}
, {"sql.failed.unknown", sc(integer(), #{
desc => "How much times the SQL is failed due to an unknown error."
})}
, {"outputs.total", sc(integer(), #{
desc => "How much times the outputs are called by the rule. "
"This value may serveral times of 'sql.matched', depending on the "
"number of the outputs of the rule."
})}
, {"outputs.success", sc(integer(), #{
desc => "How much times the rule success to call the outputs."
})}
, {"outputs.failed", sc(integer(), #{
desc => "How much times the rule failed to call the outputs."
})}
, {"outputs.failed.out_of_service", sc(integer(), #{
desc => "How much times the rule failed to call outputs due to the output is "
"out of service. For example, a bridge is disabled or stopped."
})}
, {"outputs.failed.unknown", sc(integer(), #{
desc => "How much times the rule failed to call outputs due to to an unknown error."
})}
]; ];
fields("node_metrics") -> fields("node_metrics") ->

View File

@ -73,11 +73,19 @@
%% NOTE: This order cannot be changed! This is to make the metric working during relup. %% NOTE: This order cannot be changed! This is to make the metric working during relup.
%% Append elements to this list to add new metrics. %% Append elements to this list to add new metrics.
-define(METRICS, ['matched', 'passed', 'failed', 'failed.exception', 'failed.no_result', -define(METRICS, [ 'sql.matched'
'outputs.total', 'outputs.success', 'outputs.failed', 'outputs.failed.out_of_service', , 'sql.passed'
'outputs.failed.unknown']). , 'sql.failed'
, 'sql.failed.exception'
, 'sql.failed.no_result'
, 'outputs.total'
, 'outputs.success'
, 'outputs.failed'
, 'outputs.failed.out_of_service'
, 'outputs.failed.unknown'
]).
-define(RATE_METRICS, ['matched']). -define(RATE_METRICS, ['sql.matched']).
config_key_path() -> config_key_path() ->
[rule_engine, rules]. [rule_engine, rules].

View File

@ -46,36 +46,36 @@
-define(METRICS(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS, -define(METRICS(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS,
O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5), O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5),
#{ #{
matched => MATCH, 'sql.matched' => MATCH,
passed => PASS, 'sql.passed' => PASS,
failed => FAIL, 'sql.failed' => FAIL,
'failed.exception' => FAIL_EX, 'sql.failed.exception' => FAIL_EX,
'failed.no_result' => FAIL_NORES, 'sql.failed.no_result' => FAIL_NORES,
'outputs.total' => O_TOTAL, 'outputs.total' => O_TOTAL,
'outputs.failed' => O_FAIL, 'outputs.failed' => O_FAIL,
'outputs.failed.out_of_service' => O_FAIL_OOS, 'outputs.failed.out_of_service' => O_FAIL_OOS,
'outputs.failed.unknown' => O_FAIL_UNKNOWN, 'outputs.failed.unknown' => O_FAIL_UNKNOWN,
'outputs.success' => O_SUCC, 'outputs.success' => O_SUCC,
rate => RATE, 'sql.matched.rate' => RATE,
rate_max => RATE_MAX, 'sql.matched.rate.max' => RATE_MAX,
rate_last5m => RATE_5 'sql.matched.rate.last5m' => RATE_5
}). }).
-define(metrics(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS, -define(metrics(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS,
O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5), O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5),
#{ #{
matched := MATCH, 'sql.matched' := MATCH,
passed := PASS, 'sql.passed' := PASS,
failed := FAIL, 'sql.failed' := FAIL,
'failed.exception' := FAIL_EX, 'sql.failed.exception' := FAIL_EX,
'failed.no_result' := FAIL_NORES, 'sql.failed.no_result' := FAIL_NORES,
'outputs.total' := O_TOTAL, 'outputs.total' := O_TOTAL,
'outputs.failed' := O_FAIL, 'outputs.failed' := O_FAIL,
'outputs.failed.out_of_service' := O_FAIL_OOS, 'outputs.failed.out_of_service' := O_FAIL_OOS,
'outputs.failed.unknown' := O_FAIL_UNKNOWN, 'outputs.failed.unknown' := O_FAIL_UNKNOWN,
'outputs.success' := O_SUCC, 'outputs.success' := O_SUCC,
rate := RATE, 'sql.matched.rate' := RATE,
rate_max := RATE_MAX, 'sql.matched.rate.max' := RATE_MAX,
rate_last5m := RATE_5 'sql.matched.rate.last5m' := RATE_5
}). }).
namespace() -> "rule". namespace() -> "rule".
@ -304,9 +304,9 @@ printable_function_name(Mod, Func) ->
get_rule_metrics(Id) -> get_rule_metrics(Id) ->
Format = fun (Node, #{ Format = fun (Node, #{
counters := counters :=
#{matched := Matched, passed := Passed, failed := Failed, #{'sql.matched' := Matched, 'sql.passed' := Passed, 'sql.failed' := Failed,
'failed.exception' := FailedEx, 'sql.failed.exception' := FailedEx,
'failed.no_result' := FailedNoRes, 'sql.failed.no_result' := FailedNoRes,
'outputs.total' := OTotal, 'outputs.total' := OTotal,
'outputs.failed' := OFailed, 'outputs.failed' := OFailed,
'outputs.failed.out_of_service' := OFailedOOS, 'outputs.failed.out_of_service' := OFailedOOS,
@ -314,7 +314,7 @@ get_rule_metrics(Id) ->
'outputs.success' := OFailedSucc 'outputs.success' := OFailedSucc
}, },
rate := rate :=
#{matched := #{'sql.matched' :=
#{current := Current, max := Max, last5m := Last5M} #{current := Current, max := Max, last5m := Last5M}
}}) -> }}) ->
#{ metrics => ?METRICS(Matched, Passed, Failed, FailedEx, FailedNoRes, #{ metrics => ?METRICS(Matched, Passed, Failed, FailedEx, FailedNoRes,

View File

@ -55,23 +55,23 @@ apply_rules([Rule = #{id := RuleID}|More], Input) ->
catch catch
%% ignore the errors if select or match failed %% ignore the errors if select or match failed
_:{select_and_transform_error, Error} -> _:{select_and_transform_error, Error} ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
?SLOG(warning, #{msg => "SELECT_clause_exception", ?SLOG(warning, #{msg => "SELECT_clause_exception",
rule_id => RuleID, reason => Error}); rule_id => RuleID, reason => Error});
_:{match_conditions_error, Error} -> _:{match_conditions_error, Error} ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
?SLOG(warning, #{msg => "WHERE_clause_exception", ?SLOG(warning, #{msg => "WHERE_clause_exception",
rule_id => RuleID, reason => Error}); rule_id => RuleID, reason => Error});
_:{select_and_collect_error, Error} -> _:{select_and_collect_error, Error} ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
?SLOG(warning, #{msg => "FOREACH_clause_exception", ?SLOG(warning, #{msg => "FOREACH_clause_exception",
rule_id => RuleID, reason => Error}); rule_id => RuleID, reason => Error});
_:{match_incase_error, Error} -> _:{match_incase_error, Error} ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
?SLOG(warning, #{msg => "INCASE_clause_exception", ?SLOG(warning, #{msg => "INCASE_clause_exception",
rule_id => RuleID, reason => Error}); rule_id => RuleID, reason => Error});
Class:Error:StkTrace -> Class:Error:StkTrace ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'),
?SLOG(error, #{msg => "apply_rule_failed", ?SLOG(error, #{msg => "apply_rule_failed",
rule_id => RuleID, rule_id => RuleID,
exception => Class, exception => Class,
@ -86,7 +86,7 @@ apply_rule_discard_result(Rule, Input) ->
ok. ok.
apply_rule(Rule = #{id := RuleID}, Input) -> apply_rule(Rule = #{id := RuleID}, Input) ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, matched), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.matched'),
clear_rule_payload(), clear_rule_payload(),
do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})).
@ -108,13 +108,13 @@ do_apply_rule(#{
Collection2 = filter_collection(Input, InCase, DoEach, Collection), Collection2 = filter_collection(Input, InCase, DoEach, Collection),
case Collection2 of case Collection2 of
[] -> [] ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'); ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result');
_ -> _ ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, passed) ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed')
end, end,
{ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]}; {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]};
false -> false ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'),
{error, nomatch} {error, nomatch}
end; end;
@ -129,10 +129,10 @@ do_apply_rule(#{id := RuleId,
case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)), case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
true -> true ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, passed), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed'),
{ok, handle_output_list(RuleId, Outputs, Selected, Input)}; {ok, handle_output_list(RuleId, Outputs, Selected, Input)};
false -> false ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'),
{error, nomatch} {error, nomatch}
end. end.
@ -254,10 +254,10 @@ handle_output(RuleId, OutId, Selected, Envs) ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.success'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.success'),
Result Result
catch catch
throw:Reason -> throw:out_of_service ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'),
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.out_of_service'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.out_of_service'),
?SLOG(error, #{msg => "output_failed", output => OutId, reason => Reason}); ?SLOG(warning, #{msg => "out_of_service", output => OutId});
Err:Reason:ST -> Err:Reason:ST ->
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'),
ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.unknown'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.unknown'),
@ -269,10 +269,11 @@ do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) ->
?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}), ?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}),
case emqx_bridge:send_message(BridgeId, Selected) of case emqx_bridge:send_message(BridgeId, Selected) of
{error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped -> {error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped ->
throw(bridge_out_of_service); throw(out_of_service);
Result -> Result Result -> Result
end; end;
do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
%% the function can also throw 'out_of_service'
Mod:Func(Selected, Envs, Args). Mod:Func(Selected, Envs, Args).
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->