Merge pull request #11306 from thalesmg/rule-actions-reply-dropped-r51-20230719
fix(rule_metrics): notify rule metrics of late replies and expired requests
This commit is contained in:
commit
2531c3e7d1
|
@ -40,13 +40,13 @@ groups() ->
|
||||||
|
|
||||||
init_per_suite(_Config) ->
|
init_per_suite(_Config) ->
|
||||||
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
||||||
ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]),
|
ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge, emqx_rule_engine]),
|
||||||
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||||
[].
|
[].
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]),
|
ok = emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_conf]),
|
||||||
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
||||||
_ = application:stop(emqx_connector),
|
_ = application:stop(emqx_connector),
|
||||||
_ = application:stop(emqx_bridge),
|
_ = application:stop(emqx_bridge),
|
||||||
|
@ -77,13 +77,19 @@ init_per_testcase(t_too_many_requests, Config) ->
|
||||||
),
|
),
|
||||||
ok = emqx_bridge_http_connector_test_server:set_handler(too_many_requests_http_handler()),
|
ok = emqx_bridge_http_connector_test_server:set_handler(too_many_requests_http_handler()),
|
||||||
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
|
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
|
||||||
|
init_per_testcase(t_rule_action_expired, Config) ->
|
||||||
|
[
|
||||||
|
{bridge_name, ?BRIDGE_NAME}
|
||||||
|
| Config
|
||||||
|
];
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
Server = start_http_server(#{response_delay_ms => 0}),
|
Server = start_http_server(#{response_delay_ms => 0}),
|
||||||
[{http_server, Server} | Config].
|
[{http_server, Server} | Config].
|
||||||
|
|
||||||
end_per_testcase(TestCase, _Config) when
|
end_per_testcase(TestCase, _Config) when
|
||||||
TestCase =:= t_path_not_found;
|
TestCase =:= t_path_not_found;
|
||||||
TestCase =:= t_too_many_requests
|
TestCase =:= t_too_many_requests;
|
||||||
|
TestCase =:= t_rule_action_expired
|
||||||
->
|
->
|
||||||
ok = emqx_bridge_http_connector_test_server:stop(),
|
ok = emqx_bridge_http_connector_test_server:stop(),
|
||||||
persistent_term:erase({?MODULE, times_called}),
|
persistent_term:erase({?MODULE, times_called}),
|
||||||
|
@ -202,6 +208,7 @@ parse_http_request_assertive(ReqStr0) ->
|
||||||
bridge_async_config(#{port := Port} = Config) ->
|
bridge_async_config(#{port := Port} = Config) ->
|
||||||
Type = maps:get(type, Config, ?BRIDGE_TYPE),
|
Type = maps:get(type, Config, ?BRIDGE_TYPE),
|
||||||
Name = maps:get(name, Config, ?BRIDGE_NAME),
|
Name = maps:get(name, Config, ?BRIDGE_NAME),
|
||||||
|
Host = maps:get(host, Config, "localhost"),
|
||||||
Path = maps:get(path, Config, ""),
|
Path = maps:get(path, Config, ""),
|
||||||
PoolSize = maps:get(pool_size, Config, 1),
|
PoolSize = maps:get(pool_size, Config, 1),
|
||||||
QueryMode = maps:get(query_mode, Config, "async"),
|
QueryMode = maps:get(query_mode, Config, "async"),
|
||||||
|
@ -218,7 +225,7 @@ bridge_async_config(#{port := Port} = Config) ->
|
||||||
end,
|
end,
|
||||||
ConfigString = io_lib:format(
|
ConfigString = io_lib:format(
|
||||||
"bridges.~s.~s {\n"
|
"bridges.~s.~s {\n"
|
||||||
" url = \"http://localhost:~p~s\"\n"
|
" url = \"http://~s:~p~s\"\n"
|
||||||
" connect_timeout = \"~p\"\n"
|
" connect_timeout = \"~p\"\n"
|
||||||
" enable = true\n"
|
" enable = true\n"
|
||||||
%% local_topic
|
%% local_topic
|
||||||
|
@ -248,6 +255,7 @@ bridge_async_config(#{port := Port} = Config) ->
|
||||||
[
|
[
|
||||||
Type,
|
Type,
|
||||||
Name,
|
Name,
|
||||||
|
Host,
|
||||||
Port,
|
Port,
|
||||||
Path,
|
Path,
|
||||||
ConnectTimeout,
|
ConnectTimeout,
|
||||||
|
@ -540,6 +548,61 @@ t_too_many_requests(Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_rule_action_expired(Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
RuleTopic = <<"t/webhook/rule">>,
|
||||||
|
BridgeConfig = bridge_async_config(#{
|
||||||
|
type => ?BRIDGE_TYPE,
|
||||||
|
name => ?BRIDGE_NAME,
|
||||||
|
host => "non.existent.host",
|
||||||
|
port => 9999,
|
||||||
|
path => <<"/some/path">>,
|
||||||
|
resume_interval => "100ms",
|
||||||
|
connect_timeout => "1s",
|
||||||
|
request_timeout => "100ms",
|
||||||
|
resource_request_ttl => "100ms"
|
||||||
|
}),
|
||||||
|
{ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig),
|
||||||
|
{ok, #{<<"id">> := RuleId}} =
|
||||||
|
emqx_bridge_testlib:create_rule_and_action_http(?BRIDGE_TYPE, RuleTopic, Config),
|
||||||
|
Msg = emqx_message:make(RuleTopic, <<"timeout">>),
|
||||||
|
emqx:publish(Msg),
|
||||||
|
?retry(
|
||||||
|
_Interval = 500,
|
||||||
|
_NAttempts = 20,
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
counters := #{
|
||||||
|
matched := 1,
|
||||||
|
failed := 0,
|
||||||
|
dropped := 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?retry(
|
||||||
|
_Interval = 500,
|
||||||
|
_NAttempts = 20,
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
counters := #{
|
||||||
|
matched := 1,
|
||||||
|
'actions.failed' := 1,
|
||||||
|
'actions.failed.unknown' := 1,
|
||||||
|
'actions.total' := 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_metrics_worker:get_metrics(rule_metrics, RuleId)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%% helpers
|
%% helpers
|
||||||
do_t_async_retries(TestContext, Error, Fn) ->
|
do_t_async_retries(TestContext, Error, Fn) ->
|
||||||
#{error_attempts := ErrorAttempts} = TestContext,
|
#{error_attempts := ErrorAttempts} = TestContext,
|
||||||
|
|
|
@ -24,7 +24,11 @@
|
||||||
-type callback_mode() :: always_sync | async_if_possible.
|
-type callback_mode() :: always_sync | async_if_possible.
|
||||||
-type query_mode() :: simple_sync | simple_async | sync | async | no_queries.
|
-type query_mode() :: simple_sync | simple_async | sync | async | no_queries.
|
||||||
-type result() :: term().
|
-type result() :: term().
|
||||||
-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
|
-type reply_fun() ::
|
||||||
|
{fun((result(), Args :: term()) -> any()), Args :: term()}
|
||||||
|
| {fun((result(), Args :: term()) -> any()), Args :: term(), reply_context()}
|
||||||
|
| undefined.
|
||||||
|
-type reply_context() :: #{reply_dropped => boolean()}.
|
||||||
-type query_opts() :: #{
|
-type query_opts() :: #{
|
||||||
%% The key used for picking a resource worker
|
%% The key used for picking a resource worker
|
||||||
pick_key => term(),
|
pick_key => term(),
|
||||||
|
|
|
@ -366,6 +366,7 @@ resume_from_blocked(Data) ->
|
||||||
true -> #{dropped_expired => length(Batch)};
|
true -> #{dropped_expired => length(Batch)};
|
||||||
false -> #{}
|
false -> #{}
|
||||||
end,
|
end,
|
||||||
|
batch_reply_dropped(Batch, {error, request_expired}),
|
||||||
NData = aggregate_counters(Data, Counters),
|
NData = aggregate_counters(Data, Counters),
|
||||||
?tp(buffer_worker_retry_expired, #{expired => Batch}),
|
?tp(buffer_worker_retry_expired, #{expired => Batch}),
|
||||||
resume_from_blocked(NData);
|
resume_from_blocked(NData);
|
||||||
|
@ -378,6 +379,7 @@ resume_from_blocked(Data) ->
|
||||||
{batch, Ref, NotExpired, Expired} ->
|
{batch, Ref, NotExpired, Expired} ->
|
||||||
NumExpired = length(Expired),
|
NumExpired = length(Expired),
|
||||||
ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
|
ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
|
||||||
|
batch_reply_dropped(Expired, {error, request_expired}),
|
||||||
NData = aggregate_counters(Data, #{dropped_expired => NumExpired}),
|
NData = aggregate_counters(Data, #{dropped_expired => NumExpired}),
|
||||||
?tp(buffer_worker_retry_expired, #{expired => Expired}),
|
?tp(buffer_worker_retry_expired, #{expired => Expired}),
|
||||||
%% We retry msgs in inflight window sync, as if we send them
|
%% We retry msgs in inflight window sync, as if we send them
|
||||||
|
@ -484,6 +486,9 @@ do_reply_caller({F, Args}, {async_return, Result}) ->
|
||||||
%% decision has to be made by the caller
|
%% decision has to be made by the caller
|
||||||
do_reply_caller({F, Args}, Result);
|
do_reply_caller({F, Args}, Result);
|
||||||
do_reply_caller({F, Args}, Result) when is_function(F) ->
|
do_reply_caller({F, Args}, Result) when is_function(F) ->
|
||||||
|
_ = erlang:apply(F, Args ++ [Result]),
|
||||||
|
ok;
|
||||||
|
do_reply_caller({F, Args, _Context}, Result) when is_function(F) ->
|
||||||
_ = erlang:apply(F, Args ++ [Result]),
|
_ = erlang:apply(F, Args ++ [Result]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -537,11 +542,13 @@ flush(Data0) ->
|
||||||
{[], _AllExpired} ->
|
{[], _AllExpired} ->
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
NumExpired = length(Batch),
|
NumExpired = length(Batch),
|
||||||
|
batch_reply_dropped(Batch, {error, request_expired}),
|
||||||
Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
|
Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
|
||||||
?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
|
?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
|
||||||
flush(Data3);
|
flush(Data3);
|
||||||
{NotExpired, Expired} ->
|
{NotExpired, Expired} ->
|
||||||
NumExpired = length(Expired),
|
NumExpired = length(Expired),
|
||||||
|
batch_reply_dropped(Expired, {error, request_expired}),
|
||||||
Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
|
Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
|
||||||
IsBatch = (BatchSize > 1),
|
IsBatch = (BatchSize > 1),
|
||||||
%% We *must* use the new queue, because we currently can't
|
%% We *must* use the new queue, because we currently can't
|
||||||
|
@ -809,6 +816,28 @@ reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts)
|
||||||
end,
|
end,
|
||||||
{ShouldAck, PostFn, DeltaCounters}.
|
{ShouldAck, PostFn, DeltaCounters}.
|
||||||
|
|
||||||
|
%% This is basically used only by rule actions. To avoid rule action metrics from
|
||||||
|
%% becoming inconsistent when we drop messages, we need a way to signal rule engine that
|
||||||
|
%% this action has reached a conclusion.
|
||||||
|
-spec reply_dropped(reply_fun(), {error, late_reply | request_expired}) -> ok.
|
||||||
|
reply_dropped(_ReplyTo = {Fn, Args, #{reply_dropped := true}}, Result) when
|
||||||
|
is_function(Fn), is_list(Args)
|
||||||
|
->
|
||||||
|
%% We want to avoid bumping metrics inside the buffer worker, since it's costly.
|
||||||
|
emqx_pool:async_submit(Fn, Args ++ [Result]),
|
||||||
|
ok;
|
||||||
|
reply_dropped(_ReplyTo, _Result) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec batch_reply_dropped([queue_query()], {error, late_reply | request_expired}) -> ok.
|
||||||
|
batch_reply_dropped(Batch, Result) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt)) ->
|
||||||
|
reply_dropped(ReplyTo, Result)
|
||||||
|
end,
|
||||||
|
Batch
|
||||||
|
).
|
||||||
|
|
||||||
%% This is only called by `simple_{,a}sync_query', so we can bump the
|
%% This is only called by `simple_{,a}sync_query', so we can bump the
|
||||||
%% counters here.
|
%% counters here.
|
||||||
handle_query_result(Id, Result, HasBeenSent) ->
|
handle_query_result(Id, Result, HasBeenSent) ->
|
||||||
|
@ -1164,7 +1193,7 @@ handle_async_reply1(
|
||||||
inflight_tid := InflightTID,
|
inflight_tid := InflightTID,
|
||||||
resource_id := Id,
|
resource_id := Id,
|
||||||
buffer_worker := BufferWorkerPid,
|
buffer_worker := BufferWorkerPid,
|
||||||
min_query := ?QUERY(_, _, _, ExpireAt) = _Query
|
min_query := ?QUERY(ReplyTo, _, _, ExpireAt) = _Query
|
||||||
} = ReplyContext,
|
} = ReplyContext,
|
||||||
Result
|
Result
|
||||||
) ->
|
) ->
|
||||||
|
@ -1178,7 +1207,11 @@ handle_async_reply1(
|
||||||
IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
|
IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
|
||||||
%% evalutate metrics call here since we're not inside
|
%% evalutate metrics call here since we're not inside
|
||||||
%% buffer worker
|
%% buffer worker
|
||||||
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
|
IsAcked andalso
|
||||||
|
begin
|
||||||
|
emqx_resource_metrics:late_reply_inc(Id),
|
||||||
|
reply_dropped(ReplyTo, {error, late_reply})
|
||||||
|
end,
|
||||||
?tp(handle_async_reply_expired, #{expired => [_Query]}),
|
?tp(handle_async_reply_expired, #{expired => [_Query]}),
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
|
@ -1292,6 +1325,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
||||||
%% evalutate metrics call here since we're not inside buffer
|
%% evalutate metrics call here since we're not inside buffer
|
||||||
%% worker
|
%% worker
|
||||||
emqx_resource_metrics:late_reply_inc(Id, NumExpired),
|
emqx_resource_metrics:late_reply_inc(Id, NumExpired),
|
||||||
|
batch_reply_dropped(RealExpired, {error, late_reply}),
|
||||||
case RealNotExpired of
|
case RealNotExpired of
|
||||||
[] ->
|
[] ->
|
||||||
%% all expired, no need to update back the inflight batch
|
%% all expired, no need to update back the inflight batch
|
||||||
|
|
|
@ -350,7 +350,7 @@ do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Env
|
||||||
"bridge_action",
|
"bridge_action",
|
||||||
#{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
|
#{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
|
||||||
),
|
),
|
||||||
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId]},
|
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}},
|
||||||
case
|
case
|
||||||
emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo})
|
emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo})
|
||||||
of
|
of
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed rule action metrics inconsistency where dropped requests were not accounted for.
|
Loading…
Reference in New Issue