diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 2b2bad59d..5cc6c7796 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -165,7 +165,7 @@ handle_publish(Msg, undefined) -> ?SLOG(error, #{msg => "cannot_publish_to_local_broker_as" "_'ingress'_is_not_configured", message => Msg}); -handle_publish(Msg0, Vars) -> +handle_publish(#{properties := Props} = Msg0, Vars) -> Msg = format_msg_received(Msg0), ?SLOG(debug, #{msg => "publish_to_local_broker", message => Msg, vars => Vars}), @@ -174,7 +174,7 @@ handle_publish(Msg0, Vars) -> _ = erlang:apply(Mod, Func, [Msg | Args]); _ -> ok end, - maybe_publish_to_local_broker(Msg0, Vars). + maybe_publish_to_local_broker(Msg, Vars, Props). handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. @@ -195,14 +195,15 @@ sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) -> process_config(Config) -> maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). -maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) -> +maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars, + Props) -> case maps:get(local_topic, Vars, undefined) of undefined -> ok; %% local topic is not set, discard it _ -> case emqx_topic:match(Topic, SubTopic) of true -> - _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)), + _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)), ok; false -> ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched", diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 449837d51..e8e4580f4 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -20,7 +20,7 @@ , from_binary/1 , make_pub_vars/2 , to_remote_msg/2 - , to_broker_msg/2 + , to_broker_msg/3 , estimate_size/1 ]). @@ -78,9 +78,9 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. %% published from remote node over a MQTT connection -to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, +to_broker_msg(#{dup := Dup} = MapMsg, #{local_topic := TopicToken, payload := PayloadToken, - local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> + local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}, Props) -> Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 821d02278..e7277083e 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -524,14 +524,14 @@ t_ingress_mqtt_bridge_with_rules(_) -> {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), #{ <<"id">> := RuleId , <<"metrics">> := #{ - <<"matched">> := 1, - <<"passed">> := 1, - <<"failed">> := 0, - <<"failed.exception">> := 0, - <<"failed.no_result">> := 0, - <<"rate">> := _, - <<"rate_max">> := _, - <<"rate_last5m">> := _, + <<"sql.matched">> := 1, + <<"sql.passed">> := 1, + <<"sql.failed">> := 0, + <<"sql.failed.exception">> := 0, + <<"sql.failed.no_result">> := 0, + <<"sql.matched.rate">> := _, + <<"sql.matched.rate.max">> := _, + <<"sql.matched.rate.last5m">> := _, <<"outputs.total">> := 1, <<"outputs.success">> := 1, <<"outputs.failed">> := 0, @@ -613,14 +613,14 @@ t_egress_mqtt_bridge_with_rules(_) -> {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), #{ <<"id">> := RuleId , <<"metrics">> := #{ - <<"matched">> := 1, - <<"passed">> := 1, - <<"failed">> := 0, - <<"failed.exception">> := 0, - <<"failed.no_result">> := 0, - <<"rate">> := _, - <<"rate_max">> := _, - <<"rate_last5m">> := _, + <<"sql.matched">> := 1, + <<"sql.passed">> := 1, + <<"sql.failed">> := 0, + <<"sql.failed.exception">> := 0, + <<"sql.failed.no_result">> := 0, + <<"sql.matched.rate">> := _, + <<"sql.matched.rate.max">> := _, + <<"sql.matched.rate.last5m">> := _, <<"outputs.total">> := 1, <<"outputs.success">> := 1, <<"outputs.failed">> := 0, diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index d992cdc07..7af10a342 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -81,11 +81,41 @@ fields("rule_test") -> ]; fields("metrics") -> - [ {"matched", sc(integer(), #{desc => "How much times this rule is matched"})} - , {"rate", sc(float(), #{desc => "The rate of matched, times/second"})} - , {"rate_max", sc(float(), #{desc => "The max rate of matched, times/second"})} - , {"rate_last5m", sc(float(), + [ {"sql.matched", sc(integer(), #{ + desc => "How much times the FROM clause of the SQL is matched." + })} + , {"sql.matched.rate", sc(float(), #{desc => "The rate of matched, times/second"})} + , {"sql.matched.rate.max", sc(float(), #{desc => "The max rate of matched, times/second"})} + , {"sql.matched.rate.last5m", sc(float(), #{desc => "The average rate of matched in last 5 mins, times/second"})} + , {"sql.passed", sc(integer(), #{desc => "How much times the SQL is passed"})} + , {"sql.failed", sc(integer(), #{desc => "How much times the SQL is failed"})} + , {"sql.failed.exception", sc(integer(), #{ + desc => "How much times the SQL is failed due to exceptions. " + "This may because of a crash when calling a SQL function, or " + "trying to do arithmetic operation on undefined variables" + })} + , {"sql.failed.unknown", sc(integer(), #{ + desc => "How much times the SQL is failed due to an unknown error." + })} + , {"outputs.total", sc(integer(), #{ + desc => "How much times the outputs are called by the rule. " + "This value may serveral times of 'sql.matched', depending on the " + "number of the outputs of the rule." + })} + , {"outputs.success", sc(integer(), #{ + desc => "How much times the rule success to call the outputs." + })} + , {"outputs.failed", sc(integer(), #{ + desc => "How much times the rule failed to call the outputs." + })} + , {"outputs.failed.out_of_service", sc(integer(), #{ + desc => "How much times the rule failed to call outputs due to the output is " + "out of service. For example, a bridge is disabled or stopped." + })} + , {"outputs.failed.unknown", sc(integer(), #{ + desc => "How much times the rule failed to call outputs due to to an unknown error." + })} ]; fields("node_metrics") -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 99f9050a7..8749fea13 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -73,11 +73,19 @@ %% NOTE: This order cannot be changed! This is to make the metric working during relup. %% Append elements to this list to add new metrics. --define(METRICS, ['matched', 'passed', 'failed', 'failed.exception', 'failed.no_result', - 'outputs.total', 'outputs.success', 'outputs.failed', 'outputs.failed.out_of_service', - 'outputs.failed.unknown']). +-define(METRICS, [ 'sql.matched' + , 'sql.passed' + , 'sql.failed' + , 'sql.failed.exception' + , 'sql.failed.no_result' + , 'outputs.total' + , 'outputs.success' + , 'outputs.failed' + , 'outputs.failed.out_of_service' + , 'outputs.failed.unknown' + ]). --define(RATE_METRICS, ['matched']). +-define(RATE_METRICS, ['sql.matched']). config_key_path() -> [rule_engine, rules]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index e57fd5ecd..b983747e9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -46,36 +46,36 @@ -define(METRICS(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS, O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5), #{ - matched => MATCH, - passed => PASS, - failed => FAIL, - 'failed.exception' => FAIL_EX, - 'failed.no_result' => FAIL_NORES, + 'sql.matched' => MATCH, + 'sql.passed' => PASS, + 'sql.failed' => FAIL, + 'sql.failed.exception' => FAIL_EX, + 'sql.failed.no_result' => FAIL_NORES, 'outputs.total' => O_TOTAL, 'outputs.failed' => O_FAIL, 'outputs.failed.out_of_service' => O_FAIL_OOS, 'outputs.failed.unknown' => O_FAIL_UNKNOWN, 'outputs.success' => O_SUCC, - rate => RATE, - rate_max => RATE_MAX, - rate_last5m => RATE_5 + 'sql.matched.rate' => RATE, + 'sql.matched.rate.max' => RATE_MAX, + 'sql.matched.rate.last5m' => RATE_5 }). -define(metrics(MATCH, PASS, FAIL, FAIL_EX, FAIL_NORES, O_TOTAL, O_FAIL, O_FAIL_OOS, O_FAIL_UNKNOWN, O_SUCC, RATE, RATE_MAX, RATE_5), #{ - matched := MATCH, - passed := PASS, - failed := FAIL, - 'failed.exception' := FAIL_EX, - 'failed.no_result' := FAIL_NORES, + 'sql.matched' := MATCH, + 'sql.passed' := PASS, + 'sql.failed' := FAIL, + 'sql.failed.exception' := FAIL_EX, + 'sql.failed.no_result' := FAIL_NORES, 'outputs.total' := O_TOTAL, 'outputs.failed' := O_FAIL, 'outputs.failed.out_of_service' := O_FAIL_OOS, 'outputs.failed.unknown' := O_FAIL_UNKNOWN, 'outputs.success' := O_SUCC, - rate := RATE, - rate_max := RATE_MAX, - rate_last5m := RATE_5 + 'sql.matched.rate' := RATE, + 'sql.matched.rate.max' := RATE_MAX, + 'sql.matched.rate.last5m' := RATE_5 }). namespace() -> "rule". @@ -304,9 +304,9 @@ printable_function_name(Mod, Func) -> get_rule_metrics(Id) -> Format = fun (Node, #{ counters := - #{matched := Matched, passed := Passed, failed := Failed, - 'failed.exception' := FailedEx, - 'failed.no_result' := FailedNoRes, + #{'sql.matched' := Matched, 'sql.passed' := Passed, 'sql.failed' := Failed, + 'sql.failed.exception' := FailedEx, + 'sql.failed.no_result' := FailedNoRes, 'outputs.total' := OTotal, 'outputs.failed' := OFailed, 'outputs.failed.out_of_service' := OFailedOOS, @@ -314,7 +314,7 @@ get_rule_metrics(Id) -> 'outputs.success' := OFailedSucc }, rate := - #{matched := + #{'sql.matched' := #{current := Current, max := Max, last5m := Last5M} }}) -> #{ metrics => ?METRICS(Matched, Passed, Failed, FailedEx, FailedNoRes, diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index e30a34b84..080a36511 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -55,23 +55,23 @@ apply_rules([Rule = #{id := RuleID}|More], Input) -> catch %% ignore the errors if select or match failed _:{select_and_transform_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{msg => "SELECT_clause_exception", rule_id => RuleID, reason => Error}); _:{match_conditions_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{msg => "WHERE_clause_exception", rule_id => RuleID, reason => Error}); _:{select_and_collect_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{msg => "FOREACH_clause_exception", rule_id => RuleID, reason => Error}); _:{match_incase_error, Error} -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(warning, #{msg => "INCASE_clause_exception", rule_id => RuleID, reason => Error}); Class:Error:StkTrace -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'failed.exception'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.failed.exception'), ?SLOG(error, #{msg => "apply_rule_failed", rule_id => RuleID, exception => Class, @@ -86,7 +86,7 @@ apply_rule_discard_result(Rule, Input) -> ok. apply_rule(Rule = #{id := RuleID}, Input) -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, matched), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleID, 'sql.matched'), clear_rule_payload(), do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). @@ -108,13 +108,13 @@ do_apply_rule(#{ Collection2 = filter_collection(Input, InCase, DoEach, Collection), case Collection2 of [] -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'); + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'); _ -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, passed) + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed') end, {ok, [handle_output_list(RuleId, Outputs, Coll, Input) || Coll <- Collection2]}; false -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'), {error, nomatch} end; @@ -129,10 +129,10 @@ do_apply_rule(#{id := RuleId, case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)), {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, passed), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.passed'), {ok, handle_output_list(RuleId, Outputs, Selected, Input)}; false -> - ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'failed.no_result'), + ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'sql.failed.no_result'), {error, nomatch} end. @@ -254,10 +254,10 @@ handle_output(RuleId, OutId, Selected, Envs) -> ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.success'), Result catch - throw:Reason -> + throw:out_of_service -> ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.out_of_service'), - ?SLOG(error, #{msg => "output_failed", output => OutId, reason => Reason}); + ?SLOG(warning, #{msg => "out_of_service", output => OutId}); Err:Reason:ST -> ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed'), ok = emqx_plugin_libs_metrics:inc(rule_metrics, RuleId, 'outputs.failed.unknown'), @@ -269,10 +269,11 @@ do_handle_output(BridgeId, Selected, _Envs) when is_binary(BridgeId) -> ?TRACE("BRIDGE", "output_to_bridge", #{bridge_id => BridgeId}), case emqx_bridge:send_message(BridgeId, Selected) of {error, {Err, _}} when Err == bridge_not_found; Err == bridge_stopped -> - throw(bridge_out_of_service); + throw(out_of_service); Result -> Result end; do_handle_output(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> + %% the function can also throw 'out_of_service' Mod:Func(Selected, Envs, Args). eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->