From eb41b77de4bf54c12dadb307b38fe4fd2eb5cc62 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 19 Jul 2023 11:37:06 -0300 Subject: [PATCH] fix(rule_metrics): notify rule metrics of late replies and expired requests Fixes https://emqx.atlassian.net/browse/EMQX-10600 --- .../test/emqx_bridge_http_SUITE.erl | 71 +++++++++++++++++-- apps/emqx_resource/include/emqx_resource.hrl | 6 +- .../src/emqx_resource_buffer_worker.erl | 38 +++++++++- .../src/emqx_rule_runtime.erl | 2 +- changes/ce/fix-11306.en.md | 1 + 5 files changed, 110 insertions(+), 8 deletions(-) create mode 100644 changes/ce/fix-11306.en.md diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl index 30a01cf6a..7b1c32bda 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl @@ -40,13 +40,13 @@ groups() -> init_per_suite(_Config) -> emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]), + ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge, emqx_rule_engine]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), []. end_per_suite(_Config) -> - ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]), + ok = emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_conf]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), _ = application:stop(emqx_connector), _ = application:stop(emqx_bridge), @@ -77,13 +77,19 @@ init_per_testcase(t_too_many_requests, Config) -> ), ok = emqx_bridge_http_connector_test_server:set_handler(too_many_requests_http_handler()), [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config]; +init_per_testcase(t_rule_action_expired, Config) -> + [ + {bridge_name, ?BRIDGE_NAME} + | Config + ]; init_per_testcase(_TestCase, Config) -> Server = start_http_server(#{response_delay_ms => 0}), [{http_server, Server} | Config]. end_per_testcase(TestCase, _Config) when TestCase =:= t_path_not_found; - TestCase =:= t_too_many_requests + TestCase =:= t_too_many_requests; + TestCase =:= t_rule_action_expired -> ok = emqx_bridge_http_connector_test_server:stop(), persistent_term:erase({?MODULE, times_called}), @@ -202,6 +208,7 @@ parse_http_request_assertive(ReqStr0) -> bridge_async_config(#{port := Port} = Config) -> Type = maps:get(type, Config, ?BRIDGE_TYPE), Name = maps:get(name, Config, ?BRIDGE_NAME), + Host = maps:get(host, Config, "localhost"), Path = maps:get(path, Config, ""), PoolSize = maps:get(pool_size, Config, 1), QueryMode = maps:get(query_mode, Config, "async"), @@ -218,7 +225,7 @@ bridge_async_config(#{port := Port} = Config) -> end, ConfigString = io_lib:format( "bridges.~s.~s {\n" - " url = \"http://localhost:~p~s\"\n" + " url = \"http://~s:~p~s\"\n" " connect_timeout = \"~p\"\n" " enable = true\n" %% local_topic @@ -248,6 +255,7 @@ bridge_async_config(#{port := Port} = Config) -> [ Type, Name, + Host, Port, Path, ConnectTimeout, @@ -540,6 +548,61 @@ t_too_many_requests(Config) -> ), ok. +t_rule_action_expired(Config) -> + ?check_trace( + begin + RuleTopic = <<"t/webhook/rule">>, + BridgeConfig = bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + host => "non.existent.host", + port => 9999, + path => <<"/some/path">>, + resume_interval => "100ms", + connect_timeout => "1s", + request_timeout => "100ms", + resource_request_ttl => "100ms" + }), + {ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig), + {ok, #{<<"id">> := RuleId}} = + emqx_bridge_testlib:create_rule_and_action_http(?BRIDGE_TYPE, RuleTopic, Config), + Msg = emqx_message:make(RuleTopic, <<"timeout">>), + emqx:publish(Msg), + ?retry( + _Interval = 500, + _NAttempts = 20, + ?assertMatch( + #{ + counters := #{ + matched := 1, + failed := 0, + dropped := 1 + } + }, + emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME) + ) + ), + ?retry( + _Interval = 500, + _NAttempts = 20, + ?assertMatch( + #{ + counters := #{ + matched := 1, + 'actions.failed' := 1, + 'actions.failed.unknown' := 1, + 'actions.total' := 1 + } + }, + emqx_metrics_worker:get_metrics(rule_metrics, RuleId) + ) + ), + ok + end, + [] + ), + ok. + %% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 10dc001c2..6a90a1e0a 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -24,7 +24,11 @@ -type callback_mode() :: always_sync | async_if_possible. -type query_mode() :: simple_sync | simple_async | sync | async | no_queries. -type result() :: term(). --type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined. +-type reply_fun() :: + {fun((result(), Args :: term()) -> any()), Args :: term()} + | {fun((result(), Args :: term()) -> any()), Args :: term(), reply_context()} + | undefined. +-type reply_context() :: #{reply_dropped => boolean()}. -type query_opts() :: #{ %% The key used for picking a resource worker pick_key => term(), diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 5f352e181..279a141f5 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -366,6 +366,7 @@ resume_from_blocked(Data) -> true -> #{dropped_expired => length(Batch)}; false -> #{} end, + batch_reply_dropped(Batch, {error, request_expired}), NData = aggregate_counters(Data, Counters), ?tp(buffer_worker_retry_expired, #{expired => Batch}), resume_from_blocked(NData); @@ -378,6 +379,7 @@ resume_from_blocked(Data) -> {batch, Ref, NotExpired, Expired} -> NumExpired = length(Expired), ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired), + batch_reply_dropped(Expired, {error, request_expired}), NData = aggregate_counters(Data, #{dropped_expired => NumExpired}), ?tp(buffer_worker_retry_expired, #{expired => Expired}), %% We retry msgs in inflight window sync, as if we send them @@ -484,6 +486,9 @@ do_reply_caller({F, Args}, {async_return, Result}) -> %% decision has to be made by the caller do_reply_caller({F, Args}, Result); do_reply_caller({F, Args}, Result) when is_function(F) -> + _ = erlang:apply(F, Args ++ [Result]), + ok; +do_reply_caller({F, Args, _Context}, Result) when is_function(F) -> _ = erlang:apply(F, Args ++ [Result]), ok. @@ -537,11 +542,13 @@ flush(Data0) -> {[], _AllExpired} -> ok = replayq:ack(Q1, QAckRef), NumExpired = length(Batch), + batch_reply_dropped(Batch, {error, request_expired}), Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}), ?tp(buffer_worker_flush_all_expired, #{batch => Batch}), flush(Data3); {NotExpired, Expired} -> NumExpired = length(Expired), + batch_reply_dropped(Expired, {error, request_expired}), Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}), IsBatch = (BatchSize > 1), %% We *must* use the new queue, because we currently can't @@ -809,6 +816,28 @@ reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) end, {ShouldAck, PostFn, DeltaCounters}. +%% This is basically used only by rule actions. To avoid rule action metrics from +%% becoming inconsistent when we drop messages, we need a way to signal rule engine that +%% this action has reached a conclusion. +-spec reply_dropped(reply_fun(), {error, late_reply | request_expired}) -> ok. +reply_dropped(_ReplyTo = {Fn, Args, #{reply_dropped := true}}, Result) when + is_function(Fn), is_list(Args) +-> + %% We want to avoid bumping metrics inside the buffer worker, since it's costly. + spawn(fun() -> erlang:apply(Fn, Args ++ [Result]) end), + ok; +reply_dropped(_ReplyTo, _Result) -> + ok. + +-spec batch_reply_dropped([queue_query()], {error, late_reply | request_expired}) -> ok. +batch_reply_dropped(Batch, Result) -> + lists:foreach( + fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt)) -> + reply_dropped(ReplyTo, Result) + end, + Batch + ). + %% This is only called by `simple_{,a}sync_query', so we can bump the %% counters here. handle_query_result(Id, Result, HasBeenSent) -> @@ -1164,7 +1193,7 @@ handle_async_reply1( inflight_tid := InflightTID, resource_id := Id, buffer_worker := BufferWorkerPid, - min_query := ?QUERY(_, _, _, ExpireAt) = _Query + min_query := ?QUERY(ReplyTo, _, _, ExpireAt) = _Query } = ReplyContext, Result ) -> @@ -1178,7 +1207,11 @@ handle_async_reply1( IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid), %% evalutate metrics call here since we're not inside %% buffer worker - IsAcked andalso emqx_resource_metrics:late_reply_inc(Id), + IsAcked andalso + begin + emqx_resource_metrics:late_reply_inc(Id), + reply_dropped(ReplyTo, {error, late_reply}) + end, ?tp(handle_async_reply_expired, #{expired => [_Query]}), ok; false -> @@ -1292,6 +1325,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) -> %% evalutate metrics call here since we're not inside buffer %% worker emqx_resource_metrics:late_reply_inc(Id, NumExpired), + batch_reply_dropped(RealExpired, {error, late_reply}), case RealNotExpired of [] -> %% all expired, no need to update back the inflight batch diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index de1e92a3f..d62803d7e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -350,7 +350,7 @@ do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Env "bridge_action", #{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)} ), - ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId]}, + ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}}, case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo}) of diff --git a/changes/ce/fix-11306.en.md b/changes/ce/fix-11306.en.md new file mode 100644 index 000000000..519124c3d --- /dev/null +++ b/changes/ce/fix-11306.en.md @@ -0,0 +1 @@ +Fixed rule action metrics inconsistency where dropped requests were not accounted for.