From 73e19d84ee2c169f6783a8bcd2de9450234adbb3 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Aug 2022 23:47:58 +0800 Subject: [PATCH] feat: use the new metrics to bridge APIs --- apps/emqx_bridge/i18n/emqx_bridge_schema.conf | 152 ++++++++++++++++-- apps/emqx_bridge/include/emqx_bridge.hrl | 89 ++++++++++ apps/emqx_bridge/src/emqx_bridge_api.erl | 99 +----------- .../src/schema/emqx_bridge_schema.erl | 21 ++- .../test/emqx_bridge_mqtt_SUITE.erl | 64 ++++++-- .../src/emqx_connector_mqtt.erl | 3 +- apps/emqx_resource/src/emqx_resource.erl | 17 +- .../src/emqx_resource_manager.erl | 4 +- .../src/emqx_resource_worker.erl | 8 +- .../test/emqx_resource_SUITE.erl | 8 +- .../src/emqx_rule_sqltester.erl | 18 ++- 11 files changed, 327 insertions(+), 156 deletions(-) create mode 100644 apps/emqx_bridge/include/emqx_bridge.hrl diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index 704fd7bd7..06cc41a91 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -78,36 +78,149 @@ emqx_bridge_schema { } } + metric_batched { + desc { + en: """Count of messages that are currently accumulated in memory waiting for sending in one batch.""" + zh: """当前积压在内存里,等待批量发送的消息个数""" + } + label: { + en: "Batched" + zh: "等待批量发送" + } + } + + metric_dropped { + desc { + en: """Count of messages dropped.""" + zh: """被丢弃的消息个数。""" + } + label: { + en: "Dropped" + zh: "丢弃" + } + } + + metric_dropped_other { + desc { + en: """Count of messages dropped due to other reasons.""" + zh: """因为其他原因被丢弃的消息个数。""" + } + label: { + en: "Dropped Other" + zh: "其他丢弃" + } + } + metric_dropped_queue_full { + desc { + en: """Count of messages dropped due to the queue is full.""" + zh: """因为队列已满被丢弃的消息个数。""" + } + label: { + en: "Dropped Queue Full" + zh: "队列已满被丢弃" + } + } + metric_dropped_queue_not_enabled { + desc { + en: """Count of messages dropped due to the queue is not enabled.""" + zh: """因为队列未启用被丢弃的消息个数。""" + } + label: { + en: "Dropped Queue Disabled" + zh: "队列未启用被丢弃" + } + } + metric_dropped_resource_not_found { + desc { + en: """Count of messages dropped due to the resource is not found.""" + zh: """因为资源不存在被丢弃的消息个数。""" + } + label: { + en: "Dropped Resource NotFound" + zh: "资源不存在被丢弃" + } + } + metric_dropped_resource_stopped { + desc { + en: """Count of messages dropped due to the resource is stopped.""" + zh: """因为资源已停用被丢弃的消息个数。""" + } + label: { + en: "Dropped Resource Stopped" + zh: "资源停用被丢弃" + } + } metric_matched { desc { - en: """Count of this bridge is queried""" - zh: """Bridge 执行操作的次数""" + en: """Count of this bridge is matched and queried.""" + zh: """Bridge 被匹配到(被请求)的次数。""" } label: { - en: "Bridge Matched" - zh: "Bridge 执行操作的次数" + en: "Matched" + zh: "匹配次数" } } - metric_success { + metric_queued { desc { - en: """Count of query success""" - zh: """Bridge 执行操作成功的次数""" + en: """Count of messages that are currently queued.""" + zh: """当前被缓存到磁盘队列的消息个数。""" } label: { - en: "Bridge Success" - zh: "Bridge 执行操作成功的次数" + en: "Queued" + zh: "被缓存" + } + } + metric_sent { + desc { + en: """Count of messages that are sent by this bridge.""" + zh: """已经发送出去的消息个数。""" + } + label: { + en: "Sent" + zh: "已发送" + } + } + metric_sent_exception { + desc { + en: """Count of messages that were sent but exceptions occur.""" + zh: """发送出现异常的消息个数。""" + } + label: { + en: "Sent Exception" + zh: "发送异常" } } - metric_failed { + metric_sent_failed { desc { - en: """Count of query failed""" - zh: """Bridge 执行操作失败的次数""" + en: """Count of messages that sent failed.""" + zh: """发送失败的消息个数。""" } label: { - en: "Bridge Failed" - zh: "Bridge 执行操作失败的次数" + en: "Sent Failed" + zh: "发送失败" + } + } + + metric_sent_inflight { + desc { + en: """Count of messages that were sent asynchronously but ACKs are not received.""" + zh: """已异步地发送但没有收到 ACK 的消息个数。""" + } + label: { + en: "Sent Inflight" + zh: "已发送未确认" + } + } + metric_sent_success { + desc { + en: """Count of messages that sent successfully.""" + zh: """已经发送成功的消息个数。""" + } + label: { + en: "Sent Success" + zh: "发送成功" } } @@ -144,6 +257,17 @@ emqx_bridge_schema { } } + metric_received { + desc { + en: """Count of messages that is received from the remote system.""" + zh: """从远程系统收到的消息个数。""" + } + label: { + en: "Received" + zh: "已接收" + } + } + desc_bridges { desc { en: """Configuration for MQTT bridges.""" diff --git a/apps/emqx_bridge/include/emqx_bridge.hrl b/apps/emqx_bridge/include/emqx_bridge.hrl new file mode 100644 index 000000000..217c403b9 --- /dev/null +++ b/apps/emqx_bridge/include/emqx_bridge.hrl @@ -0,0 +1,89 @@ +-define(EMPTY_METRICS, + ?METRICS( + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + ) +). + +-define(METRICS( + Batched, + Dropped, + DroppedOther, + DroppedQueueFull, + DroppedQueueNotEnabled, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Sent, + SentExcpt, + SentFailed, + SentInflight, + SentSucc, + RATE, + RATE_5, + RATE_MAX, + Rcvd +), + #{ + 'batched' => Batched, + 'dropped' => Dropped, + 'dropped.other' => DroppedOther, + 'dropped.queue_full' => DroppedQueueFull, + 'dropped.queue_not_enabled' => DroppedQueueNotEnabled, + 'dropped.resource_not_found' => DroppedResourceNotFound, + 'dropped.resource_stopped' => DroppedResourceStopped, + 'matched' => Matched, + 'queued' => Queued, + 'sent' => Sent, + 'sent.exception' => SentExcpt, + 'sent.failed' => SentFailed, + 'sent.inflight' => SentInflight, + 'sent.success' => SentSucc, + rate => RATE, + rate_last5m => RATE_5, + rate_max => RATE_MAX, + received => Rcvd + } +). + +-define(metrics( + Batched, + Dropped, + DroppedOther, + DroppedQueueFull, + DroppedQueueNotEnabled, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Sent, + SentExcpt, + SentFailed, + SentInflight, + SentSucc, + RATE, + RATE_5, + RATE_MAX, + Rcvd +), + #{ + 'batched' := Batched, + 'dropped' := Dropped, + 'dropped.other' := DroppedOther, + 'dropped.queue_full' := DroppedQueueFull, + 'dropped.queue_not_enabled' := DroppedQueueNotEnabled, + 'dropped.resource_not_found' := DroppedResourceNotFound, + 'dropped.resource_stopped' := DroppedResourceStopped, + 'matched' := Matched, + 'queued' := Queued, + 'sent' := Sent, + 'sent.exception' := SentExcpt, + 'sent.failed' := SentFailed, + 'sent.inflight' := SentInflight, + 'sent.success' := SentSucc, + rate := RATE, + rate_last5m := RATE_5, + rate_max := RATE_MAX, + received := Rcvd + } +). diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 2f15bbcd7..ddffcb79f 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -20,6 +20,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). -import(hoconsc, [mk/2, array/1, enum/1]). @@ -57,96 +58,6 @@ end ). --define(EMPTY_METRICS, - ?METRICS( - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 - ) -). - --define(METRICS( - Batched, - Dropped, - DroppedOther, - DroppedQueueFull, - DroppedQueueNotEnabled, - DroppedResourceNotFound, - DroppedResourceStopped, - Matched, - Queued, - Retried, - Sent, - SentExcpt, - SentFailed, - SentInflight, - SentSucc, - RATE, - RATE_5, - RATE_MAX -), - #{ - 'batched' => Batched, - 'dropped' => Dropped, - 'dropped.other' => DroppedOther, - 'dropped.queue_full' => DroppedQueueFull, - 'dropped.queue_not_enabled' => DroppedQueueNotEnabled, - 'dropped.resource_not_found' => DroppedResourceNotFound, - 'dropped.resource_stopped' => DroppedResourceStopped, - 'matched' => Matched, - 'queued' => Queued, - 'retried' => Retried, - 'sent' => Sent, - 'sent.exception' => SentExcpt, - 'sent.failed' => SentFailed, - 'sent.inflight' => SentInflight, - 'sent.success' => SentSucc, - rate => RATE, - rate_last5m => RATE_5, - rate_max => RATE_MAX - } -). - --define(metrics( - Batched, - Dropped, - DroppedOther, - DroppedQueueFull, - DroppedQueueNotEnabled, - DroppedResourceNotFound, - DroppedResourceStopped, - Matched, - Queued, - Retried, - Sent, - SentExcpt, - SentFailed, - SentInflight, - SentSucc, - RATE, - RATE_5, - RATE_MAX -), - #{ - 'batched' := Batched, - 'dropped' := Dropped, - 'dropped.other' := DroppedOther, - 'dropped.queue_full' := DroppedQueueFull, - 'dropped.queue_not_enabled' := DroppedQueueNotEnabled, - 'dropped.resource_not_found' := DroppedResourceNotFound, - 'dropped.resource_stopped' := DroppedResourceStopped, - 'matched' := Matched, - 'queued' := Queued, - 'retried' := Retried, - 'sent' := Sent, - 'sent.exception' := SentExcpt, - 'sent.failed' := SentFailed, - 'sent.inflight' := SentInflight, - 'sent.success' := SentSucc, - rate := RATE, - rate_last5m := RATE_5, - rate_max := RATE_MAX - } -). - namespace() -> "bridge". api_spec() -> @@ -770,12 +681,12 @@ format_metrics(#{ 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, 'queued' := Queued, - 'retried' := Retried, 'sent' := Sent, 'sent.exception' := SentExcpt, 'sent.failed' := SentFailed, 'sent.inflight' := SentInflight, - 'sent.success' := SentSucc + 'sent.success' := SentSucc, + 'received' := Rcvd }, rate := #{ matched := #{current := Rate, last5m := Rate5m, max := RateMax} @@ -791,7 +702,6 @@ format_metrics(#{ DroppedResourceStopped, Matched, Queued, - Retried, Sent, SentExcpt, SentFailed, @@ -799,7 +709,8 @@ format_metrics(#{ SentSucc, Rate, Rate5m, - RateMax + RateMax, + Rcvd ). fill_defaults(Type, RawConf) -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index aedfcaa03..f55ac840e 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -102,16 +102,31 @@ fields(bridges) -> ] ++ ee_fields_bridges(); fields("metrics") -> [ + {"batched", mk(integer(), #{desc => ?DESC("metric_batched")})}, + {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})}, + {"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})}, + {"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})}, + {"dropped.queue_not_enabled", + mk(integer(), #{desc => ?DESC("metric_dropped_queue_not_enabled")})}, + {"dropped.resource_not_found", + mk(integer(), #{desc => ?DESC("metric_dropped_resource_not_found")})}, + {"dropped.resource_stopped", + mk(integer(), #{desc => ?DESC("metric_dropped_resource_stopped")})}, {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})}, - {"success", mk(integer(), #{desc => ?DESC("metric_success")})}, - {"failed", mk(integer(), #{desc => ?DESC("metric_failed")})}, + {"queued", mk(integer(), #{desc => ?DESC("metric_queued")})}, + {"sent", mk(integer(), #{desc => ?DESC("metric_sent")})}, + {"sent.exception", mk(integer(), #{desc => ?DESC("metric_sent_exception")})}, + {"sent.failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})}, + {"sent.inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})}, + {"sent.success", mk(integer(), #{desc => ?DESC("metric_sent_success")})}, {"rate", mk(float(), #{desc => ?DESC("metric_rate")})}, {"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})}, {"rate_last5m", mk( float(), #{desc => ?DESC("metric_rate_last5m")} - )} + )}, + {"received", mk(float(), #{desc => ?DESC("metric_received")})} ]; fields("node_metrics") -> [ diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 27c101728..02b76d64b 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -66,15 +66,6 @@ } }). --define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX), #{ - <<"matched">> := MATCH, - <<"success">> := SUCC, - <<"failed">> := FAILED, - <<"rate">> := SPEED, - <<"rate_last5m">> := SPEED5M, - <<"rate_max">> := SPEEDMAX -}). - inspect(Selected, _Envs, _Args) -> persistent_term:put(?MODULE, #{inspect => Selected}). @@ -185,6 +176,23 @@ t_mqtt_conn_bridge_ingress(_) -> end ), + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []), + ?assertMatch( + #{ + <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 0, <<"received">> := 1} + } + ] + }, + jsx:decode(BridgeStr) + ), + %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -237,9 +245,15 @@ t_mqtt_conn_bridge_egress(_) -> {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), ?assertMatch( #{ - <<"metrics">> := ?metrics(1, 1, 0, _, _, _), + <<"metrics">> := #{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0}, <<"node_metrics">> := - [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}] + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 1, <<"sent.success">> := 1, <<"sent.failed">> := 0} + } + ] }, jsx:decode(BridgeStr) ), @@ -337,6 +351,23 @@ t_ingress_mqtt_bridge_with_rules(_) -> persistent_term:get(?MODULE) ), + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []), + ?assertMatch( + #{ + <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 0, <<"received">> := 1} + } + ] + }, + jsx:decode(BridgeStr) + ), + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []). @@ -433,9 +464,16 @@ t_egress_mqtt_bridge_with_rules(_) -> {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), ?assertMatch( #{ - <<"metrics">> := ?metrics(2, 2, 0, _, _, _), + <<"metrics">> := #{<<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0}, <<"node_metrics">> := - [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}] + [ + #{ + <<"node">> := _, + <<"metrics">> := #{ + <<"matched">> := 2, <<"sent.success">> := 2, <<"sent.failed">> := 0 + } + } + ] }, jsx:decode(BridgeStr) ), diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index bdf43885a..3ce6925bc 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -135,8 +135,7 @@ drop_bridge(Name) -> %% When use this bridge as a data source, ?MODULE:on_message_received will be called %% if the bridge received msgs from the remote broker. on_message_received(Msg, HookPoint, ResId) -> - emqx_resource:inc_matched(ResId), - emqx_resource:inc_success(ResId), + emqx_resource:inc_received(ResId), emqx:run_hook(HookPoint, [Msg]). %% =================================================================== diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b07460643..c1d500e8b 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -110,7 +110,7 @@ list_group_instances/1 ]). --export([inc_metrics_funcs/1, inc_matched/1, inc_success/1, inc_failed/1]). +-export([inc_received/1]). -optional_callbacks([ on_query/3, @@ -443,19 +443,8 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> %% ================================================================================= -inc_matched(ResId) -> - emqx_metrics_worker:inc(?RES_METRICS, ResId, matched). - -inc_success(ResId) -> - emqx_metrics_worker:inc(?RES_METRICS, ResId, success). - -inc_failed(ResId) -> - emqx_metrics_worker:inc(?RES_METRICS, ResId, failed). +inc_received(ResId) -> + emqx_metrics_worker:inc(?RES_METRICS, ResId, 'received'). filter_instances(Filter) -> [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. - -inc_metrics_funcs(ResId) -> - OnSucc = [{fun ?MODULE:inc_success/1, ResId}], - OnFailed = [{fun ?MODULE:inc_failed/1, ResId}], - {OnSucc, OnFailed}. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index eaf0a9a5f..2581a3001 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -134,7 +134,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'dropped', 'queued', 'batched', - 'retried', 'sent.success', 'sent.failed', 'sent.exception', @@ -143,7 +142,8 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'dropped.queue_full', 'dropped.resource_not_found', 'dropped.resource_stopped', - 'dropped.other' + 'dropped.other', + 'received' ], [matched] ), diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 013f430b1..3a12a6b91 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -131,7 +131,7 @@ init({Id, Index, Opts}) -> false -> undefined end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', replayq:count(Queue)), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', queue_count(Queue)), ok = inflight_new(Name), St = #{ id => Id, @@ -254,7 +254,6 @@ retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = S case handle_query_result(Id, Result, false) of %% Send failed because resource down true -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'retried'), {keep_state, St0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working false -> @@ -569,6 +568,11 @@ assert_ok_result(R) when is_tuple(R) -> assert_ok_result(R) -> error({not_ok_result, R}). +queue_count(undefined) -> + 0; +queue_count(Q) -> + replayq:count(Q). + -spec name(id(), integer()) -> atom(). name(Id, Index) -> Mod = atom_to_list(?MODULE), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 0bfa67d07..68b4fb6dd 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -268,7 +268,7 @@ t_query_counter_async_query(_) -> end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), - ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C), + ?assertMatch(#{matched := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C), ok = emqx_resource:remove_local(?ID). t_query_counter_async_callback(_) -> @@ -309,7 +309,7 @@ t_query_counter_async_callback(_) -> end ), {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), - ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C), + ?assertMatch(#{matched := 1002, sent := 1002, 'sent.success' := 1002, 'sent.failed' := 0}, C), ?assertMatch(1000, ets:info(Tab0, size)), ?assert( lists:all( @@ -419,8 +419,8 @@ t_query_counter_async_inflight(_) -> {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), ct:pal("metrics: ~p", [C]), ?assertMatch( - #{matched := M, success := S, exception := E, failed := F, recoverable_error := RD} when - M >= Sent andalso M == S + E + F + RD, + #{matched := M, sent := St, 'sent.success' := Ss, dropped := D} when + St == Ss andalso M == St + D, C ), ?assert( diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index c333bb80e..9669e0113 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -62,14 +62,16 @@ test_rule(Sql, Select, Context, EventTopics) -> }, FullContext = fill_default_values(hd(EventTopics), emqx_rule_maps:atom_key_map(Context)), try emqx_rule_runtime:apply_rule(Rule, FullContext, #{}) of - {ok, Data} -> {ok, flatten(Data)}; - {error, Reason} -> {error, Reason} + {ok, Data} -> + {ok, flatten(Data)}; + {error, Reason} -> + {error, Reason} after ok = emqx_rule_engine:clear_metrics_for_rule(RuleId) end. get_selected_data(Selected, _Envs, _Args) -> - Selected. + {ok, Selected}. is_publish_topic(<<"$events/", _/binary>>) -> false; is_publish_topic(<<"$bridges/", _/binary>>) -> false; @@ -77,14 +79,14 @@ is_publish_topic(_Topic) -> true. flatten([]) -> []; -flatten([D1]) -> - D1; -flatten([D1 | L]) when is_list(D1) -> - D1 ++ flatten(L). +flatten([{ok, D}]) -> + D; +flatten([D | L]) when is_list(D) -> + [D0 || {ok, D0} <- D] ++ flatten(L). echo_action(Data, Envs) -> ?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}), - Data. + {ok, Data}. fill_default_values(Event, Context) -> maps:merge(envs_examp(Event), Context).