diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 9bdc1b3c2..c1692b9af 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -51,7 +51,7 @@ -export([ send_message/2, - send_message/4 + send_message/5 ]). -export([config_key_path/0]). @@ -220,14 +220,14 @@ send_to_matched_egress_bridges(Topic, Msg) -> send_message(BridgeId, Message) -> {BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId), ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), - send_message(BridgeType, BridgeName, ResId, Message). + send_message(BridgeType, BridgeName, ResId, Message, #{}). -send_message(BridgeType, BridgeName, ResId, Message) -> +send_message(BridgeType, BridgeName, ResId, Message, QueryOpts0) -> case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of not_found -> {error, bridge_not_found}; #{enable := true} = Config -> - QueryOpts = query_opts(Config), + QueryOpts = maps:merge(query_opts(Config), QueryOpts0), emqx_resource:query(ResId, {send_message, Message}, QueryOpts); #{enable := false} -> {error, bridge_stopped} diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 45bb5e4dc..bd3de3561 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -96,11 +96,10 @@ delete_all_bridges() -> ). %% test helpers -parse_and_check(Config, ConfigString, Name) -> - BridgeType = ?config(bridge_type, Config), +parse_and_check(BridgeType, BridgeName, ConfigString) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), - #{<<"bridges">> := #{BridgeType := #{Name := BridgeConfig}}} = RawConf, + #{<<"bridges">> := #{BridgeType := #{BridgeName := BridgeConfig}}} = RawConf, BridgeConfig. resource_id(Config) -> diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index f26e4037b..dfd5fd07c 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -119,13 +119,14 @@ bridge_config(TestCase, _TestGroup, Config) -> Host = ?config(bridge_host, Config), Port = ?config(bridge_port, Config), Version = ?config(iotdb_version, Config), + Type = ?config(bridge_type, Config), Name = << (atom_to_binary(TestCase))/binary, UniqueNum/binary >>, ServerURL = iotdb_server_url(Host, Port), ConfigString = io_lib:format( - "bridges.iotdb.~s {\n" + "bridges.~s.~s {\n" " enable = true\n" " base_url = \"~s\"\n" " authentication = {\n" @@ -142,12 +143,13 @@ bridge_config(TestCase, _TestGroup, Config) -> " }\n" "}\n", [ + Type, Name, ServerURL, Version ] ), - {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}. + {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Type, Name, ConfigString)}. make_iotdb_payload(DeviceId, Measurement, Type, Value) -> #{ diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 5d53234f7..41e5673d9 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -32,7 +32,8 @@ expire_at => infinity | integer(), async_reply_fun => reply_fun(), simple_query => boolean(), - is_buffer_supported => boolean() + is_buffer_supported => boolean(), + reply_to => reply_fun() }. -type resource_data() :: #{ id := resource_id(), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 2b3dbe2aa..d0d93a701 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -292,7 +292,7 @@ query(ResId, Request, Opts) -> {simple_sync, _} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again - emqx_resource_buffer_worker:simple_sync_query(ResId, Request); + emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts); {sync, _} -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); {async, _} -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 8f93e0def..5f352e181 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -38,6 +38,7 @@ -export([ simple_sync_query/2, + simple_sync_query/3, simple_async_query/3 ]). @@ -61,7 +62,7 @@ -define(COLLECT_REQ_LIMIT, 1000). -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). --define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)). +-define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)). -define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}). -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef), {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef} @@ -133,6 +134,10 @@ async_query(Id, Request, Opts0) -> %% simple query the resource without batching and queuing. -spec simple_sync_query(id(), request()) -> term(). simple_sync_query(Id, Request) -> + simple_sync_query(Id, Request, #{}). + +-spec simple_sync_query(id(), request(), query_opts()) -> term(). +simple_sync_query(Id, Request, QueryOpts0) -> %% Note: since calling this function implies in bypassing the %% buffer workers, and each buffer worker index is used when %% collecting gauge metrics, we use this dummy index. If this @@ -141,10 +146,11 @@ simple_sync_query(Id, Request) -> %% `emqx_resource_metrics:*_shift/3'. ?tp(simple_sync_query, #{id => Id, request => Request}), Index = undefined, - QueryOpts = simple_query_opts(), + QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), - Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), + ReplyTo = maps:get(reply_to, QueryOpts0, undefined), + Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -156,7 +162,10 @@ simple_async_query(Id, Request, QueryOpts0) -> QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), - Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), + ReplyTo = maps:get(reply_to, QueryOpts0, undefined), + Result = call_query( + async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts + ), _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. @@ -308,7 +317,7 @@ code_change(_OldVsn, State, _Extra) -> end ). -pick_call(Id, Key, Query, Timeout) -> +pick_call(Id, Key, Query = {_, _, QueryOpts}, Timeout) -> ?PICK(Id, Key, Pid, begin MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]), ReplyTo = {fun ?MODULE:reply_call/2, [MRef]}, @@ -316,23 +325,23 @@ pick_call(Id, Key, Query, Timeout) -> receive {MRef, Response} -> erlang:demonitor(MRef, [flush]), - Response; + maybe_reply_to(Response, QueryOpts); {'DOWN', MRef, process, Pid, Reason} -> error({worker_down, Reason}) after Timeout -> erlang:demonitor(MRef, [flush]), receive {MRef, Response} -> - Response + maybe_reply_to(Response, QueryOpts) after 0 -> error(timeout) end end end). -pick_cast(Id, Key, Query) -> +pick_cast(Id, Key, Query = {query, _Request, QueryOpts}) -> ?PICK(Id, Key, Pid, begin - ReplyTo = undefined, + ReplyTo = maps:get(reply_to, QueryOpts, undefined), erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)), ok end). @@ -1051,9 +1060,14 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> end ). -apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) -> +apply_query_fun( + sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, QueryOpts +) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), - ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); + maybe_reply_to( + ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request), + QueryOpts + ); apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async @@ -1081,12 +1095,17 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re end, Request ); -apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, _QueryOpts) -> +apply_query_fun( + sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts +) -> ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch), - ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); + maybe_reply_to( + ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch), + QueryOpts + ); apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async @@ -1118,6 +1137,12 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re Batch ). +maybe_reply_to(Result, #{reply_to := ReplyTo}) -> + do_reply_caller(ReplyTo, Result), + Result; +maybe_reply_to(Result, _) -> + Result. + handle_async_reply( #{ request_ref := Ref, diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index c0afa0939..4a1477a1c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -23,7 +23,8 @@ -export([ apply_rule/3, apply_rules/3, - clear_rule_payload/0 + clear_rule_payload/0, + inc_action_metrics/2 ]). -import( @@ -323,9 +324,7 @@ handle_action_list(RuleId, Actions, Selected, Envs) -> handle_action(RuleId, ActId, Selected, Envs) -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'), try - Result = do_handle_action(ActId, Selected, Envs), - inc_action_metrics(Result, RuleId), - Result + do_handle_action(RuleId, ActId, Selected, Envs) catch throw:out_of_service -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), @@ -345,21 +344,29 @@ handle_action(RuleId, ActId, Selected, Envs) -> }) end. -do_handle_action({bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) -> +-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found; R == unhealthy_target). +do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) -> ?TRACE( "BRIDGE", "bridge_action", #{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)} ), - case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected) of + ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId]}, + case + emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo}) + of {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped -> throw(out_of_service); + ?RESOURCE_ERROR_M(R, _) when ?IS_RES_DOWN(R) -> + throw(out_of_service); Result -> Result end; -do_handle_action(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> +do_handle_action(RuleId, #{mod := Mod, func := Func, args := Args}, Selected, Envs) -> %% the function can also throw 'out_of_service' - Mod:Func(Selected, Envs, Args). + Result = Mod:Func(Selected, Envs, Args), + inc_action_metrics(RuleId, Result), + Result. eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> nested_get({path, Path}, may_decode_payload(Payload)); @@ -512,14 +519,15 @@ nested_put(Alias, Val, Columns0) -> Columns = handle_alias(Alias, Columns0), emqx_rule_maps:nested_put(Alias, Val, Columns). --define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found). -inc_action_metrics({error, {recoverable_error, _}}, RuleId) -> +inc_action_metrics(RuleId, Result) -> + _ = do_inc_action_metrics(RuleId, Result), + Result. + +do_inc_action_metrics(RuleId, {error, {recoverable_error, _}}) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); -inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); -inc_action_metrics({error, {unrecoverable_error, _}}, RuleId) -> +do_inc_action_metrics(RuleId, {error, {unrecoverable_error, _}}) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'); -inc_action_metrics(R, RuleId) -> +do_inc_action_metrics(RuleId, R) -> case is_ok_result(R) of false -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index c9feda601..f05159e30 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -19,12 +19,11 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx_rule_engine/include/rule_engine.hrl"). --include_lib("emqx/include/emqx.hrl"). - -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("emqx/include/emqx.hrl"). + -import(emqx_common_test_helpers, [on_exit/1]). %%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())). @@ -38,7 +37,11 @@ all() -> {group, runtime}, {group, events}, {group, telemetry}, - {group, bugs} + {group, bugs}, + {group, metrics}, + {group, metrics_simple}, + {group, metrics_fail}, + {group, metrics_fail_simple} ]. suite() -> @@ -116,6 +119,22 @@ groups() -> {bugs, [], [ t_sqlparse_payload_as, t_sqlparse_nested_get + ]}, + {metrics, [], [ + t_rule_metrics_sync, + t_rule_metrics_async + ]}, + {metrics_simple, [], [ + t_rule_metrics_sync, + t_rule_metrics_async + ]}, + {metrics_fail, [], [ + t_rule_metrics_sync_fail, + t_rule_metrics_async_fail + ]}, + {metrics_fail_simple, [], [ + t_rule_metrics_sync_fail, + t_rule_metrics_async_fail ]} ]. @@ -128,7 +147,7 @@ init_per_suite(Config) -> emqx_rule_funcs_demo:module_info(), application:load(emqx_conf), ok = emqx_common_test_helpers:start_apps( - [emqx_conf, emqx_rule_engine, emqx_authz], + [emqx_conf, emqx_rule_engine, emqx_authz, emqx_bridge], fun set_special_configs/1 ), Config. @@ -160,14 +179,41 @@ on_get_resource_status(_id, _) -> #{}. group(_Groupname) -> []. +-define(BRIDGE_IMPL, emqx_bridge_mqtt_connector). init_per_group(registry, Config) -> Config; +init_per_group(metrics_fail, Config) -> + meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}), + meck:expect(?BRIDGE_IMPL, on_query_async, 4, {error, {unrecoverable_error, mecked_failure}}), + [{mecked, [?BRIDGE_IMPL]} | Config]; +init_per_group(metrics_simple, Config) -> + meck:new(?BRIDGE_IMPL, [non_strict, no_link, passthrough]), + meck:expect(?BRIDGE_IMPL, query_mode, fun + (#{resource_opts := #{query_mode := sync}}) -> simple_sync; + (_) -> simple_async + end), + [{mecked, [?BRIDGE_IMPL]} | Config]; +init_per_group(metrics_fail_simple, Config) -> + meck:new(?BRIDGE_IMPL, [non_strict, no_link, passthrough]), + meck:expect(?BRIDGE_IMPL, query_mode, fun + (#{resource_opts := #{query_mode := sync}}) -> simple_sync; + (_) -> simple_async + end), + meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}), + meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {ReplyFun, Args}, _) -> + Result = {error, {unrecoverable_error, mecked_failure}}, + erlang:apply(ReplyFun, Args ++ [Result]), + Result + end), + [{mecked, [?BRIDGE_IMPL]} | Config]; init_per_group(_Groupname, Config) -> Config. -end_per_group(_Groupname, _Config) -> - ok. - +end_per_group(_Groupname, Config) -> + case ?config(mecked, Config) of + undefined -> ok; + Mecked -> meck:unload(Mecked) + end. %%------------------------------------------------------------------------------ %% Testcase specific setup/teardown %%------------------------------------------------------------------------------ @@ -2822,6 +2868,114 @@ t_get_rule_ids_by_action_reference_ingress_bridge(_Config) -> ), ok. +%%------------------------------------------------------------------------------ +%% Test cases for rule metrics +%%------------------------------------------------------------------------------ + +-define(BRIDGE_TYPE, <<"mqtt">>). +-define(BRIDGE_NAME, <<"bridge_over_troubled_water">>). +-define(BRIDGE_CONFIG(QMODE), #{ + <<"server">> => <<"127.0.0.1:1883">>, + <<"username">> => <<"user1">>, + <<"password">> => <<"">>, + <<"proto_ver">> => <<"v4">>, + <<"ssl">> => #{<<"enable">> => false}, + <<"egress">> => + #{ + <<"local">> => + #{ + <<"topic">> => <<"foo/#">> + }, + <<"remote">> => + #{ + <<"topic">> => <<"bar/${topic}">>, + <<"payload">> => <<"${payload}">>, + <<"qos">> => <<"${qos}">>, + <<"retain">> => <<"${retain}">> + } + }, + <<"resource_opts">> => + #{ + <<"health_check_interval">> => <<"5s">>, + <<"query_mode">> => QMODE, + <<"request_ttl">> => <<"3s">>, + <<"worker_pool_size">> => 1 + } +}). + +-define(SUCCESSS_METRICS, #{ + matched := 1, + 'actions.total' := 1, + 'actions.failed' := 0, + 'actions.success' := 1 +}). +-define(FAIL_METRICS, #{ + matched := 1, + 'actions.total' := 1, + 'actions.failed' := 1, + 'actions.success' := 0 +}). + +t_rule_metrics_sync(_Config) -> + do_test_rule_metrics_success(<<"sync">>). + +t_rule_metrics_async(_Config) -> + do_test_rule_metrics_success(<<"async">>). + +t_rule_metrics_sync_fail(_Config) -> + do_test_rule_metrics_fail(<<"sync">>). + +t_rule_metrics_async_fail(_Config) -> + do_test_rule_metrics_fail(<<"async">>). + +do_test_rule_metrics_success(QMode) -> + ?assertMatch( + ?SUCCESSS_METRICS, + do_test_rule_metrics(QMode) + ). + +do_test_rule_metrics_fail(QMode) -> + ?assertMatch( + ?FAIL_METRICS, + do_test_rule_metrics(QMode) + ). + +do_test_rule_metrics(QMode) -> + BridgeId = create_bridge(?BRIDGE_TYPE, ?BRIDGE_NAME, ?BRIDGE_CONFIG(QMode)), + RuleId = <<"rule:test_metrics_bridge_action">>, + {ok, #{id := RuleId}} = + emqx_rule_engine:create_rule( + #{ + id => RuleId, + sql => <<"SELECT * FROM \"topic/#\"">>, + actions => [BridgeId] + } + ), + timer:sleep(100), + ?assertMatch( + #{ + matched := 0, + 'actions.total' := 0, + 'actions.failed' := 0, + 'actions.success' := 0 + }, + emqx_metrics_worker:get_counters(rule_metrics, RuleId) + ), + MsgId = emqx_guid:gen(), + emqx:publish(#message{id = MsgId, topic = <<"topic/test">>, payload = <<"hello">>}), + timer:sleep(100), + on_exit( + fun() -> + emqx_rule_engine:delete_rule(RuleId), + emqx_bridge:remove(?BRIDGE_TYPE, ?BRIDGE_NAME) + end + ), + emqx_metrics_worker:get_counters(rule_metrics, RuleId). + +create_bridge(Type, Name, Config) -> + {ok, _Bridge} = emqx_bridge:create(Type, Name, Config), + emqx_bridge_resource:bridge_id(Type, Name). + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/fix-11126.en.md b/changes/ce/fix-11126.en.md new file mode 100644 index 000000000..f83008ebd --- /dev/null +++ b/changes/ce/fix-11126.en.md @@ -0,0 +1 @@ +Rule metrics for async mode bridges will set failure counters correctly now.