From 837acd42346361881404823d4d25082f48b83658 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 28 Jun 2023 14:03:42 +0200 Subject: [PATCH] test(emqx_rule_engine): test rule metrics --- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 5 +- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 6 +- .../test/emqx_rule_engine_SUITE.erl | 169 +++++++++++++++++- 3 files changed, 167 insertions(+), 13 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 62ba70b33..35691efe3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -96,11 +96,10 @@ delete_all_bridges() -> ). %% test helpers -parse_and_check(Config, ConfigString, Name) -> - BridgeType = ?config(bridge_type, Config), +parse_and_check(BridgeType, BridgeName, ConfigString) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), - #{<<"bridges">> := #{BridgeType := #{Name := BridgeConfig}}} = RawConf, + #{<<"bridges">> := #{BridgeType := #{BridgeName := BridgeConfig}}} = RawConf, BridgeConfig. resource_id(Config) -> diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index f26e4037b..dfd5fd07c 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -119,13 +119,14 @@ bridge_config(TestCase, _TestGroup, Config) -> Host = ?config(bridge_host, Config), Port = ?config(bridge_port, Config), Version = ?config(iotdb_version, Config), + Type = ?config(bridge_type, Config), Name = << (atom_to_binary(TestCase))/binary, UniqueNum/binary >>, ServerURL = iotdb_server_url(Host, Port), ConfigString = io_lib:format( - "bridges.iotdb.~s {\n" + "bridges.~s.~s {\n" " enable = true\n" " base_url = \"~s\"\n" " authentication = {\n" @@ -142,12 +143,13 @@ bridge_config(TestCase, _TestGroup, Config) -> " }\n" "}\n", [ + Type, Name, ServerURL, Version ] ), - {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}. + {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Type, Name, ConfigString)}. make_iotdb_payload(DeviceId, Measurement, Type, Value) -> #{ diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index c9feda601..b34228e0d 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -19,12 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx_rule_engine/include/rule_engine.hrl"). --include_lib("emqx/include/emqx.hrl"). - -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("emqx/include/emqx.hrl"). + -import(emqx_common_test_helpers, [on_exit/1]). %%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())). @@ -38,7 +37,11 @@ all() -> {group, runtime}, {group, events}, {group, telemetry}, - {group, bugs} + {group, bugs}, + {group, metrics}, + {group, metrics_simple}, + {group, metrics_fail}, + {group, metrics_fail_simple} ]. suite() -> @@ -116,6 +119,22 @@ groups() -> {bugs, [], [ t_sqlparse_payload_as, t_sqlparse_nested_get + ]}, + {metrics, [], [ + t_rule_metrics_sync, + t_rule_metrics_async + ]}, + {metrics_simple, [], [ + t_rule_metrics_sync, + t_rule_metrics_async + ]}, + {metrics_fail, [], [ + t_rule_metrics_sync_fail, + t_rule_metrics_async_fail + ]}, + {metrics_fail_simple, [], [ + t_rule_metrics_sync_fail, + t_rule_metrics_async_fail ]} ]. @@ -128,7 +147,7 @@ init_per_suite(Config) -> emqx_rule_funcs_demo:module_info(), application:load(emqx_conf), ok = emqx_common_test_helpers:start_apps( - [emqx_conf, emqx_rule_engine, emqx_authz], + [emqx_conf, emqx_rule_engine, emqx_authz, emqx_bridge], fun set_special_configs/1 ), Config. @@ -160,14 +179,37 @@ on_get_resource_status(_id, _) -> #{}. group(_Groupname) -> []. +-define(BRIDGE_IMPL, emqx_bridge_mqtt_connector). init_per_group(registry, Config) -> Config; +init_per_group(metrics_fail, Config) -> + meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}), + meck:expect(?BRIDGE_IMPL, on_query_async, 4, {error, {unrecoverable_error, mecked_failure}}), + [{mecked, [?BRIDGE_IMPL]} | Config]; +init_per_group(metrics_simple, Config) -> + meck:new(?BRIDGE_IMPL, [non_strict, no_link, passthrough]), + meck:expect(?BRIDGE_IMPL, query_mode, fun + (#{mqtt := #{query_mode := sync}}) -> simple_sync; + (_) -> simple_async + end), + [{mecked, [?BRIDGE_IMPL]} | Config]; +init_per_group(metrics_fail_simple, Config) -> + meck:new(?BRIDGE_IMPL, [non_strict, no_link, passthrough]), + meck:expect(?BRIDGE_IMPL, query_mode, fun + (#{mqtt := #{query_mode := sync}}) -> simple_sync; + (_) -> simple_async + end), + meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}), + meck:expect(?BRIDGE_IMPL, on_query_async, 4, {error, {unrecoverable_error, mecked_failure}}), + [{mecked, [?BRIDGE_IMPL]} | Config]; init_per_group(_Groupname, Config) -> Config. -end_per_group(_Groupname, _Config) -> - ok. - +end_per_group(_Groupname, Config) -> + case ?config(mecked, Config) of + undefined -> ok; + Mecked -> meck:unload(Mecked) + end. %%------------------------------------------------------------------------------ %% Testcase specific setup/teardown %%------------------------------------------------------------------------------ @@ -2822,6 +2864,117 @@ t_get_rule_ids_by_action_reference_ingress_bridge(_Config) -> ), ok. +%%------------------------------------------------------------------------------ +%% Test cases for rule metrics +%%------------------------------------------------------------------------------ + +-define(BRIDGE_TYPE, <<"mqtt">>). +-define(BRIDGE_NAME, <<"bridge_over_troubled_water">>). +-define(BRIDGE_CONFIG(QMODE), #{ + <<"server">> => <<"127.0.0.1:1883">>, + <<"username">> => <<"user1">>, + <<"password">> => <<"">>, + <<"proto_ver">> => <<"v4">>, + <<"ssl">> => #{<<"enable">> => false}, + <<"egress">> => + #{ + <<"local">> => + #{ + <<"topic">> => <<"foo/#">> + }, + <<"remote">> => + #{ + <<"topic">> => <<"bar/${topic}">>, + <<"payload">> => <<"${payload}">>, + <<"qos">> => <<"${qos}">>, + <<"retain">> => <<"${retain}">> + } + }, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"5s">>, + <<"query_mode">> => QMODE, + <<"request_ttl">> => <<"3s">>, + <<"worker_pool_size">> => 1 + } +}). + +-define(SUCCESSS_METRICS, #{ + matched := 1, + 'actions.total' := 1, + 'actions.failed' := 0, + 'actions.success' := 1 +}). +-define(FAIL_METRICS, #{ + matched := 1, + 'actions.total' := 1, + 'actions.failed' := 1, + 'actions.success' := 0 +}). + +t_rule_metrics_sync(_Config) -> + do_test_rule_metrics_success(<<"sync">>). + +t_rule_metrics_async(_Config) -> + do_test_rule_metrics_success(<<"async">>). + +t_rule_metrics_sync_fail(_Config) -> + do_test_rule_metrics_fail(<<"sync">>). + +t_rule_metrics_async_fail(_Config) -> + do_test_rule_metrics_fail(<<"async">>). + +do_test_rule_metrics_success(QMode) -> + ?assertMatch( + ?SUCCESSS_METRICS, + do_test_rule_metrics(QMode) + ). + +do_test_rule_metrics_fail(QMode) -> + ?assertMatch( + ?FAIL_METRICS, + do_test_rule_metrics(QMode) + ). + +do_test_rule_metrics(QMode) -> + BridgeId = create_bridge(?BRIDGE_TYPE, ?BRIDGE_NAME, ?BRIDGE_CONFIG(QMode)), + RuleId = <<"rule:test_metrics_bridge_action">>, + {ok, #{id := RuleId}} = + emqx_rule_engine:create_rule( + #{ + id => RuleId, + sql => <<"SELECT * FROM \"topic/#\"">>, + actions => [BridgeId] + } + ), + timer:sleep(100), + ?assertMatch( + #{ + matched := 0, + 'actions.total' := 0, + 'actions.failed' := 0, + 'actions.success' := 0 + }, + emqx_metrics_worker:get_counters(rule_metrics, RuleId) + ), + MsgId = emqx_guid:gen(), + emqx:publish(#message{id = MsgId, topic = <<"topic/test">>, payload = <<"hello">>}), + timer:sleep(100), + ct:pal("bridge metrics: ~p", [ + emqx_resource:get_metrics(emqx_bridge_resource:resource_id(BridgeId)) + ]), + on_exit( + fun() -> + emqx_rule_engine:delete_rule(RuleId), + emqx_bridge:remove(?BRIDGE_TYPE, ?BRIDGE_NAME) + end + ), + emqx_metrics_worker:get_counters(rule_metrics, RuleId). + +create_bridge(Type, Name, Config) -> + {ok, _Bridge} = emqx_bridge:create(Type, Name, Config), + emqx_bridge_resource:bridge_id(Type, Name). + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------