test(emqx_rule_engine): test rule metrics

This commit is contained in:
Stefan Strigler 2023-06-28 14:03:42 +02:00
parent 2274a192cc
commit 837acd4234
3 changed files with 167 additions and 13 deletions

View File

@ -96,11 +96,10 @@ delete_all_bridges() ->
). ).
%% test helpers %% test helpers
parse_and_check(Config, ConfigString, Name) -> parse_and_check(BridgeType, BridgeName, ConfigString) ->
BridgeType = ?config(bridge_type, Config),
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}), {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
#{<<"bridges">> := #{BridgeType := #{Name := BridgeConfig}}} = RawConf, #{<<"bridges">> := #{BridgeType := #{BridgeName := BridgeConfig}}} = RawConf,
BridgeConfig. BridgeConfig.
resource_id(Config) -> resource_id(Config) ->

View File

@ -119,13 +119,14 @@ bridge_config(TestCase, _TestGroup, Config) ->
Host = ?config(bridge_host, Config), Host = ?config(bridge_host, Config),
Port = ?config(bridge_port, Config), Port = ?config(bridge_port, Config),
Version = ?config(iotdb_version, Config), Version = ?config(iotdb_version, Config),
Type = ?config(bridge_type, Config),
Name = << Name = <<
(atom_to_binary(TestCase))/binary, UniqueNum/binary (atom_to_binary(TestCase))/binary, UniqueNum/binary
>>, >>,
ServerURL = iotdb_server_url(Host, Port), ServerURL = iotdb_server_url(Host, Port),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.iotdb.~s {\n" "bridges.~s.~s {\n"
" enable = true\n" " enable = true\n"
" base_url = \"~s\"\n" " base_url = \"~s\"\n"
" authentication = {\n" " authentication = {\n"
@ -142,12 +143,13 @@ bridge_config(TestCase, _TestGroup, Config) ->
" }\n" " }\n"
"}\n", "}\n",
[ [
Type,
Name, Name,
ServerURL, ServerURL,
Version Version
] ]
), ),
{Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}. {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Type, Name, ConfigString)}.
make_iotdb_payload(DeviceId, Measurement, Type, Value) -> make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
#{ #{

View File

@ -19,12 +19,11 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx.hrl").
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())). %%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
@ -38,7 +37,11 @@ all() ->
{group, runtime}, {group, runtime},
{group, events}, {group, events},
{group, telemetry}, {group, telemetry},
{group, bugs} {group, bugs},
{group, metrics},
{group, metrics_simple},
{group, metrics_fail},
{group, metrics_fail_simple}
]. ].
suite() -> suite() ->
@ -116,6 +119,22 @@ groups() ->
{bugs, [], [ {bugs, [], [
t_sqlparse_payload_as, t_sqlparse_payload_as,
t_sqlparse_nested_get t_sqlparse_nested_get
]},
{metrics, [], [
t_rule_metrics_sync,
t_rule_metrics_async
]},
{metrics_simple, [], [
t_rule_metrics_sync,
t_rule_metrics_async
]},
{metrics_fail, [], [
t_rule_metrics_sync_fail,
t_rule_metrics_async_fail
]},
{metrics_fail_simple, [], [
t_rule_metrics_sync_fail,
t_rule_metrics_async_fail
]} ]}
]. ].
@ -128,7 +147,7 @@ init_per_suite(Config) ->
emqx_rule_funcs_demo:module_info(), emqx_rule_funcs_demo:module_info(),
application:load(emqx_conf), application:load(emqx_conf),
ok = emqx_common_test_helpers:start_apps( ok = emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_rule_engine, emqx_authz], [emqx_conf, emqx_rule_engine, emqx_authz, emqx_bridge],
fun set_special_configs/1 fun set_special_configs/1
), ),
Config. Config.
@ -160,14 +179,37 @@ on_get_resource_status(_id, _) -> #{}.
group(_Groupname) -> group(_Groupname) ->
[]. [].
-define(BRIDGE_IMPL, emqx_bridge_mqtt_connector).
init_per_group(registry, Config) -> init_per_group(registry, Config) ->
Config; Config;
init_per_group(metrics_fail, Config) ->
meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}),
meck:expect(?BRIDGE_IMPL, on_query_async, 4, {error, {unrecoverable_error, mecked_failure}}),
[{mecked, [?BRIDGE_IMPL]} | Config];
init_per_group(metrics_simple, Config) ->
meck:new(?BRIDGE_IMPL, [non_strict, no_link, passthrough]),
meck:expect(?BRIDGE_IMPL, query_mode, fun
(#{mqtt := #{query_mode := sync}}) -> simple_sync;
(_) -> simple_async
end),
[{mecked, [?BRIDGE_IMPL]} | Config];
init_per_group(metrics_fail_simple, Config) ->
meck:new(?BRIDGE_IMPL, [non_strict, no_link, passthrough]),
meck:expect(?BRIDGE_IMPL, query_mode, fun
(#{mqtt := #{query_mode := sync}}) -> simple_sync;
(_) -> simple_async
end),
meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}),
meck:expect(?BRIDGE_IMPL, on_query_async, 4, {error, {unrecoverable_error, mecked_failure}}),
[{mecked, [?BRIDGE_IMPL]} | Config];
init_per_group(_Groupname, Config) -> init_per_group(_Groupname, Config) ->
Config. Config.
end_per_group(_Groupname, _Config) -> end_per_group(_Groupname, Config) ->
ok. case ?config(mecked, Config) of
undefined -> ok;
Mecked -> meck:unload(Mecked)
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcase specific setup/teardown %% Testcase specific setup/teardown
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -2822,6 +2864,117 @@ t_get_rule_ids_by_action_reference_ingress_bridge(_Config) ->
), ),
ok. ok.
%%------------------------------------------------------------------------------
%% Test cases for rule metrics
%%------------------------------------------------------------------------------
-define(BRIDGE_TYPE, <<"mqtt">>).
-define(BRIDGE_NAME, <<"bridge_over_troubled_water">>).
-define(BRIDGE_CONFIG(QMODE), #{
<<"server">> => <<"127.0.0.1:1883">>,
<<"username">> => <<"user1">>,
<<"password">> => <<"">>,
<<"proto_ver">> => <<"v4">>,
<<"ssl">> => #{<<"enable">> => false},
<<"egress">> =>
#{
<<"local">> =>
#{
<<"topic">> => <<"foo/#">>
},
<<"remote">> =>
#{
<<"topic">> => <<"bar/${topic}">>,
<<"payload">> => <<"${payload}">>,
<<"qos">> => <<"${qos}">>,
<<"retain">> => <<"${retain}">>
}
},
<<"resource_opts">> =>
#{
<<"health_check_interval">> => <<"5s">>,
<<"query_mode">> => QMODE,
<<"request_ttl">> => <<"3s">>,
<<"worker_pool_size">> => 1
}
}).
-define(SUCCESSS_METRICS, #{
matched := 1,
'actions.total' := 1,
'actions.failed' := 0,
'actions.success' := 1
}).
-define(FAIL_METRICS, #{
matched := 1,
'actions.total' := 1,
'actions.failed' := 1,
'actions.success' := 0
}).
t_rule_metrics_sync(_Config) ->
do_test_rule_metrics_success(<<"sync">>).
t_rule_metrics_async(_Config) ->
do_test_rule_metrics_success(<<"async">>).
t_rule_metrics_sync_fail(_Config) ->
do_test_rule_metrics_fail(<<"sync">>).
t_rule_metrics_async_fail(_Config) ->
do_test_rule_metrics_fail(<<"async">>).
do_test_rule_metrics_success(QMode) ->
?assertMatch(
?SUCCESSS_METRICS,
do_test_rule_metrics(QMode)
).
do_test_rule_metrics_fail(QMode) ->
?assertMatch(
?FAIL_METRICS,
do_test_rule_metrics(QMode)
).
do_test_rule_metrics(QMode) ->
BridgeId = create_bridge(?BRIDGE_TYPE, ?BRIDGE_NAME, ?BRIDGE_CONFIG(QMode)),
RuleId = <<"rule:test_metrics_bridge_action">>,
{ok, #{id := RuleId}} =
emqx_rule_engine:create_rule(
#{
id => RuleId,
sql => <<"SELECT * FROM \"topic/#\"">>,
actions => [BridgeId]
}
),
timer:sleep(100),
?assertMatch(
#{
matched := 0,
'actions.total' := 0,
'actions.failed' := 0,
'actions.success' := 0
},
emqx_metrics_worker:get_counters(rule_metrics, RuleId)
),
MsgId = emqx_guid:gen(),
emqx:publish(#message{id = MsgId, topic = <<"topic/test">>, payload = <<"hello">>}),
timer:sleep(100),
ct:pal("bridge metrics: ~p", [
emqx_resource:get_metrics(emqx_bridge_resource:resource_id(BridgeId))
]),
on_exit(
fun() ->
emqx_rule_engine:delete_rule(RuleId),
emqx_bridge:remove(?BRIDGE_TYPE, ?BRIDGE_NAME)
end
),
emqx_metrics_worker:get_counters(rule_metrics, RuleId).
create_bridge(Type, Name, Config) ->
{ok, _Bridge} = emqx_bridge:create(Type, Name, Config),
emqx_bridge_resource:bridge_id(Type, Name).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal helpers %% Internal helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------