chore: retry as much as possible, don't reply to caller too soon

This commit is contained in:
Thales Macedo Garitezi 2023-01-17 11:55:40 -03:00
parent b82009bc29
commit fa01deb3eb
10 changed files with 331 additions and 280 deletions

View File

@ -82,7 +82,7 @@ authorize(
} = Config
) ->
Request = generate_request(PubSub, Topic, Client, Config),
case emqx_resource:simple_sync_query(ResourceID, {Method, Request, RequestTimeout}) of
try emqx_resource:simple_sync_query(ResourceID, {Method, Request, RequestTimeout}) of
{ok, 204, _Headers} ->
{matched, allow};
{ok, 200, Headers, Body} ->
@ -112,6 +112,16 @@ authorize(
reason => Reason
}),
ignore
catch
error:timeout ->
Reason = timeout,
?tp(authz_http_request_failure, #{error => Reason}),
?SLOG(error, #{
msg => "http_server_query_failed",
resource => ResourceID,
reason => Reason
}),
ignore
end.
log_nomtach_msg(Status, Headers, Body) ->

View File

@ -172,7 +172,7 @@ t_response_handling(_Config) ->
[
#{
?snk_kind := authz_http_request_failure,
error := {recoverable_error, econnrefused}
error := timeout
}
],
?of_kind(authz_http_request_failure, Trace)

View File

@ -170,8 +170,11 @@ send_message(BridgeId, Message) ->
case emqx:get_config([bridges, BridgeType, BridgeName], not_found) of
not_found ->
{error, {bridge_not_found, BridgeId}};
#{enable := true} ->
emqx_resource:query(ResId, {send_message, Message});
#{enable := true} = Config ->
Timeout = emqx_map_lib:deep_get(
[resource_opts, request_timeout], Config, timer:seconds(15)
),
emqx_resource:query(ResId, {send_message, Message}, #{timeout => Timeout});
#{enable := false} ->
{error, {bridge_stopped, BridgeId}}
end.

View File

@ -145,10 +145,12 @@ set_special_configs(_) ->
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(_, _Config) ->
clear_resources(),
emqx_common_test_helpers:call_janitor(),
snabbkaffe:stop(),
ok.
clear_resources() ->
@ -478,8 +480,6 @@ t_egress_custom_clientid_prefix(_Config) ->
end,
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
t_mqtt_conn_bridge_ingress_and_egress(_) ->
@ -830,6 +830,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"query_mode">> => <<"sync">>,
<<"request_timeout">> => <<"500ms">>,
%% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">>
}
@ -880,17 +881,14 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
ok = emqx_listeners:stop_listener('tcp:default'),
ct:sleep(1500),
%% PUBLISH 2 messages to the 'local' broker, the message should
ok = snabbkaffe:start_trace(),
%% PUBLISH 2 messages to the 'local' broker, the messages should
%% be enqueued and the resource will block
{ok, SRef} =
snabbkaffe:subscribe(
fun
(
#{
?snk_kind := call_query_enter,
query := {query, _From, {send_message, #{}}, _Sent}
}
) ->
(#{?snk_kind := resource_worker_retry_inflight_failed}) ->
true;
(#{?snk_kind := resource_worker_flush_nack}) ->
true;
(_) ->
false
@ -903,7 +901,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
emqx:publish(emqx_message:make(LocalTopic, Payload1)),
emqx:publish(emqx_message:make(LocalTopic, Payload2)),
{ok, _} = snabbkaffe:receive_events(SRef),
ok = snabbkaffe:stop(),
%% verify the metrics of the bridge, the message should be queued
{ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),

View File

@ -89,6 +89,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
}
}
request_timeout {
desc {
en: """Timeout for requests. If <code>query_mode</code> is <code>sync</code>, calls to the resource will be blocked for this amount of time before timing out."""
zh: """请求的超时。 如果<code>query_mode</code>是<code>sync</code>,对资源的调用将在超时前被阻断这一时间。"""
}
label {
en: """Request timeout"""
zh: """请求超时"""
}
}
enable_batch {
desc {
en: """Batch mode enabled."""

View File

@ -100,7 +100,7 @@ start_link(Id, Index, Opts) ->
-spec sync_query(id(), request(), query_opts()) -> Result :: term().
sync_query(Id, Request, Opts) ->
PickKey = maps:get(pick_key, Opts, self()),
Timeout = maps:get(timeout, Opts, infinity),
Timeout = maps:get(timeout, Opts, timer:seconds(15)),
emqx_resource_metrics:matched_inc(Id),
pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
@ -234,10 +234,7 @@ blocked(cast, flush, Data) ->
blocked(state_timeout, unblock, St) ->
resume_from_blocked(St);
blocked(info, ?SEND_REQ(_ReqFrom, {query, _Request, _Opts}) = Request0, Data0) ->
#{id := Id} = Data0,
{Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0),
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
_ = batch_reply_caller(Id, Error, Queries),
{_Queries, Data} = collect_and_enqueue_query_requests(Request0, Data0),
{keep_state, Data};
blocked(info, {flush, _Ref}, _Data) ->
keep_state_and_data;
@ -337,10 +334,16 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
} = Data0,
?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
QueryOpts = #{},
%% if we are retrying an inflight query, it has been sent
HasBeenSent = true,
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
case handle_query_result_pure(Id, Result, HasBeenSent) of
ReplyResult =
case QueryOrBatch of
?QUERY(From, CoreReq, HasBeenSent) ->
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
reply_caller_defer_metrics(Id, Reply);
[?QUERY(_, _, _) | _] = Batch ->
batch_reply_caller_defer_metrics(Id, Result, Batch)
end,
case ReplyResult of
%% Send failed because resource is down
{nack, PostFn} ->
PostFn(),
@ -476,27 +479,20 @@ do_flush(
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
case reply_caller(Id, Reply) of
%% Failed; remove the request from the queue, as we cannot pop
%% from it again. But we must ensure it's in the inflight
%% table, even if it's full, so we don't lose the request.
%% And only in that case.
%% from it again, but we'll retry it using the inflight table.
nack ->
ok = replayq:ack(Q1, QAckRef),
%% We might get a retriable response without having added
%% the request to the inflight table (e.g.: sync request,
%% but resource health check failed prior to calling and
%% so we didn't even call it). In that case, we must then
%% add it to the inflight table.
IsRetriable =
is_recoverable_error_result(Result) orelse
is_not_connected_result(Result),
ShouldPreserveInInflight = is_not_connected_result(Result),
%% we set it atomically just below; a limitation of having
%% to use tuples for atomic ets updates
IsRetriable = true,
WorkerMRef0 = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Request, IsRetriable, WorkerMRef0),
ShouldPreserveInInflight andalso
inflight_append(InflightTID, InflightItem, Id, Index),
IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref),
%% we must append again to the table to ensure that the
%% request will be retried (i.e., it might not have been
%% inserted during `call_query' if the resource was down
%% and/or if it was a sync request).
inflight_append(InflightTID, InflightItem, Id, Index),
mark_inflight_as_retriable(InflightTID, Ref),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
@ -513,11 +509,21 @@ do_flush(
%% Success; just ack.
ack ->
ok = replayq:ack(Q1, QAckRef),
%% Async requests are acked later when the async worker
%% calls the corresponding callback function. Also, we
%% must ensure the async worker is being monitored for
%% such requests.
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp(resource_worker_flush_ack, #{batch_or_query => Request}),
?tp(
resource_worker_flush_ack,
#{
batch_or_query => Request,
result => Result
}
),
case queue_count(Q1) > 0 of
true ->
{keep_state, Data1, [{next_event, internal, flush}]};
@ -542,27 +548,20 @@ do_flush(Data0, #{
Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts),
case batch_reply_caller(Id, Result, Batch) of
%% Failed; remove the request from the queue, as we cannot pop
%% from it again. But we must ensure it's in the inflight
%% table, even if it's full, so we don't lose the request.
%% And only in that case.
%% from it again, but we'll retry it using the inflight table.
nack ->
ok = replayq:ack(Q1, QAckRef),
%% We might get a retriable response without having added
%% the request to the inflight table (e.g.: sync request,
%% but resource health check failed prior to calling and
%% so we didn't even call it). In that case, we must then
%% add it to the inflight table.
IsRetriable =
is_recoverable_error_result(Result) orelse
is_not_connected_result(Result),
ShouldPreserveInInflight = is_not_connected_result(Result),
%% we set it atomically just below; a limitation of having
%% to use tuples for atomic ets updates
IsRetriable = true,
WorkerMRef0 = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef0),
ShouldPreserveInInflight andalso
inflight_append(InflightTID, InflightItem, Id, Index),
IsRetriable andalso mark_inflight_as_retriable(InflightTID, Ref),
%% we must append again to the table to ensure that the
%% request will be retried (i.e., it might not have been
%% inserted during `call_query' if the resource was down
%% and/or if it was a sync request).
inflight_append(InflightTID, InflightItem, Id, Index),
mark_inflight_as_retriable(InflightTID, Ref),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
@ -579,11 +578,21 @@ do_flush(Data0, #{
%% Success; just ack.
ack ->
ok = replayq:ack(Q1, QAckRef),
%% Async requests are acked later when the async worker
%% calls the corresponding callback function. Also, we
%% must ensure the async worker is being monitored for
%% such requests.
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index),
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp(resource_worker_flush_ack, #{batch_or_query => Batch}),
?tp(
resource_worker_flush_ack,
#{
batch_or_query => Batch,
result => Result
}
),
CurrentCount = queue_count(Q1),
case {CurrentCount > 0, CurrentCount >= BatchSize} of
{false, _} ->
@ -597,54 +606,79 @@ do_flush(Data0, #{
end.
batch_reply_caller(Id, BatchResult, Batch) ->
{ShouldBlock, PostFns} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch),
lists:foreach(fun(F) -> F() end, PostFns),
ShouldBlock.
batch_reply_caller_defer_metrics(Id, BatchResult, Batch) ->
lists:foldl(
fun(Reply, {_ShouldBlock, PostFns}) ->
{ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply),
{ShouldBlock, [PostFn | PostFns]}
end,
{ack, []},
%% the `Mod:on_batch_query/3` returns a single result for a batch,
%% so we need to expand
?EXPAND(BatchResult, Batch)
).
reply_caller(Id, Reply) ->
{ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply),
{ShouldBlock, PostFn} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch),
PostFn(),
ShouldBlock.
batch_reply_caller_defer_metrics(Id, BatchResult, Batch) ->
{ShouldAck, PostFns} =
lists:foldl(
fun(Reply, {_ShouldAck, PostFns}) ->
{ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply),
{ShouldAck, [PostFn | PostFns]}
end,
{ack, []},
%% the `Mod:on_batch_query/3` returns a single result for a batch,
%% so we need to expand
?EXPAND(BatchResult, Batch)
),
PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end,
{ShouldAck, PostFn}.
reply_caller(Id, Reply) ->
{ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply),
PostFn(),
ShouldAck.
%% Should only reply to the caller when the decision is final (not
%% retriable). See comment on `handle_query_result_pure'.
reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result)) ->
handle_query_result_pure(Id, Result, HasBeenSent);
reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result)) when
is_function(ReplyFun)
->
_ =
case Result of
{async_return, _} -> no_reply_for_now;
_ -> apply(ReplyFun, Args ++ [Result])
end,
handle_query_result_pure(Id, Result, HasBeenSent);
{ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
case {ShouldAck, Result} of
{nack, _} ->
ok;
{ack, {async_return, _}} ->
ok;
{ack, _} ->
apply(ReplyFun, Args ++ [Result]),
ok
end,
{ShouldAck, PostFn};
reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result)) ->
gen_statem:reply(From, Result),
handle_query_result_pure(Id, Result, HasBeenSent).
{ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
case {ShouldAck, Result} of
{nack, _} ->
ok;
{ack, {async_return, _}} ->
ok;
{ack, _} ->
gen_statem:reply(From, Result),
ok
end,
{ShouldAck, PostFn}.
handle_query_result(Id, Result, HasBeenSent) ->
{ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
PostFn(),
ShouldBlock.
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent) ->
%% We should always retry (nack), except when:
%% * resource is not found
%% * resource is stopped
%% * the result is a success (or at least a delayed result)
%% We also retry even sync requests. In that case, we shouldn't reply
%% the caller until one of those final results above happen.
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{msg => resource_exception, info => Msg}),
inc_sent_failed(Id, HasBeenSent),
%% inc_sent_failed(Id, HasBeenSent),
ok
end,
{ack, PostFn};
{nack, PostFn};
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when
NotWorking == not_connected; NotWorking == blocked
->
@ -666,10 +700,12 @@ handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) ->
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
emqx_resource_metrics:dropped_other_inc(Id),
%% emqx_resource_metrics:dropped_other_inc(Id),
ok
end,
{ack, PostFn};
{nack, PostFn};
%% TODO: invert this logic: we should differentiate errors that are
%% irrecoverable; all others are deemed recoverable.
handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) ->
%% the message will be queued in replayq or inflight window,
%% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
@ -679,22 +715,18 @@ handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent)
ok
end,
{nack, PostFn};
handle_query_result_pure(Id, {error, Reason}, HasBeenSent) ->
handle_query_result_pure(Id, {error, Reason}, _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
inc_sent_failed(Id, HasBeenSent),
ok
end,
{ack, PostFn};
handle_query_result_pure(_Id, {async_return, inflight_full}, _HasBeenSent) ->
{nack, fun() -> ok end};
handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent) ->
{nack, PostFn};
handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
inc_sent_failed(Id, HasBeenSent),
ok
end,
{ack, PostFn};
{nack, PostFn};
handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) ->
{ack, fun() -> ok end};
handle_query_result_pure(_Id, {async_return, {ok, Pid}}, _HasBeenSent) when is_pid(Pid) ->
@ -714,18 +746,6 @@ handle_async_worker_down(Data0, Pid) ->
cancel_inflight_items(Data, WorkerMRef),
{keep_state, Data}.
is_not_connected_result(?RESOURCE_ERROR_M(Error, _)) when
Error =:= not_connected; Error =:= blocked
->
true;
is_not_connected_result(_) ->
false.
is_recoverable_error_result({error, {recoverable_error, _Reason}}) ->
true;
is_recoverable_error_result(_) ->
false.
call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query}),
case emqx_resource_manager:ets_lookup(Id) of
@ -735,8 +755,9 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
configured -> maps:get(query_mode, Data);
_ -> QM0
end,
CM = maps:get(callback_mode, Data),
apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
CBM = maps:get(callback_mode, Data),
CallMode = call_mode(QM, CBM),
apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
{ok, _Group, #{status := stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
@ -763,20 +784,9 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
end
).
apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => sync}),
InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
?APPLY_RESOURCE(
call_query,
begin
IsRetriable = false,
WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Mod:on_query(Id, Request, ResSt)
end,
Request
);
apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
@ -796,23 +806,12 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt
end,
Request
);
apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
?tp(call_batch_query, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
}),
InflightTID = maps:get(inflight_tid, QueryOpts, undefined),
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
?APPLY_RESOURCE(
call_batch_query,
begin
IsRetriable = false,
WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
ok = inflight_append(InflightTID, InflightItem, Id, Index),
Mod:on_batch_query(Id, Requests, ResSt)
end,
Batch
);
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
@ -839,27 +838,27 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
{Action, PostFn} = reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)),
%% Should always ack async inflight requests that
%% returned, otherwise the request will get retried. The
%% caller has just been notified of the failure and should
%% decide if it wants to retry or not.
IsFullBefore = is_inflight_full(InflightTID),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso PostFn(),
case Action of
nack ->
%% Keep retrying.
?tp(resource_worker_reply_after_query, #{
action => nack,
action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent),
ref => Ref,
result => Result
}),
mark_inflight_as_retriable(InflightTID, Ref),
?MODULE:block(Pid);
ack ->
?tp(resource_worker_reply_after_query, #{
action => ack,
action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent),
ref => Ref,
result => Result
}),
IsFullBefore = is_inflight_full(InflightTID),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso PostFn(),
IsFullBefore andalso ?MODULE:flush_worker(Pid),
ok
end.
@ -868,24 +867,28 @@ batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) ->
%% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
{Action, PostFns} = batch_reply_caller_defer_metrics(Id, Result, Batch),
%% Should always ack async inflight requests that
%% returned, otherwise the request will get retried. The
%% caller has just been notified of the failure and should
%% decide if it wants to retry or not.
IsFullBefore = is_inflight_full(InflightTID),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso lists:foreach(fun(F) -> F() end, PostFns),
{Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch),
case Action of
nack ->
%% Keep retrying.
?tp(resource_worker_reply_after_query, #{
action => nack, batch_or_query => Batch, result => Result
action => nack,
batch_or_query => Batch,
ref => Ref,
result => Result
}),
mark_inflight_as_retriable(InflightTID, Ref),
?MODULE:block(Pid);
ack ->
?tp(resource_worker_reply_after_query, #{
action => ack, batch_or_query => Batch, result => Result
action => ack,
batch_or_query => Batch,
ref => Ref,
result => Result
}),
IsFullBefore = is_inflight_full(InflightTID),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso PostFn(),
IsFullBefore andalso ?MODULE:flush_worker(Pid),
ok
end.
@ -919,7 +922,14 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
Q1
end,
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q2)),
?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}),
?tp(
resource_worker_appended_to_queue,
#{
id => Id,
items => Queries,
queue_count => queue_count(Q2)
}
),
Q2.
%%==============================================================================
@ -1110,11 +1120,6 @@ do_cancel_inflight_item(Data, Ref) ->
%%==============================================================================
inc_sent_failed(Id, _HasBeenSent = true) ->
emqx_resource_metrics:retried_failed_inc(Id);
inc_sent_failed(Id, _HasBeenSent) ->
emqx_resource_metrics:failed_inc(Id).
inc_sent_success(Id, _HasBeenSent = true) ->
emqx_resource_metrics:retried_success_inc(Id);
inc_sent_success(Id, _HasBeenSent) ->

View File

@ -48,6 +48,7 @@ fields("creation_opts") ->
{health_check_interval, fun health_check_interval/1},
{auto_restart_interval, fun auto_restart_interval/1},
{query_mode, fun query_mode/1},
{request_timeout, fun request_timeout/1},
{async_inflight_window, fun async_inflight_window/1},
{enable_batch, fun enable_batch/1},
{batch_size, fun batch_size/1},
@ -80,6 +81,11 @@ query_mode(default) -> async;
query_mode(required) -> false;
query_mode(_) -> undefined.
request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
request_timeout(desc) -> ?DESC("request_timeout");
request_timeout(default) -> <<"15s">>;
request_timeout(_) -> undefined.
enable_batch(type) -> boolean();
enable_batch(required) -> false;
enable_batch(default) -> true;

View File

@ -259,6 +259,9 @@ counter_loop(
apply_reply(ReplyFun, ok),
?tp(connector_demo_inc_counter_async, #{n => N}),
State#{counter => Num + N};
{big_payload, _Payload, ReplyFun} when Status == blocked ->
apply_reply(ReplyFun, {error, blocked}),
State;
{{FromPid, ReqRef}, {inc, N}} when Status == running ->
%ct:pal("sync counter recv: ~p", [{inc, N}]),
FromPid ! {ReqRef, ok},
@ -269,6 +272,9 @@ counter_loop(
{{FromPid, ReqRef}, {big_payload, _Payload}} when Status == blocked ->
FromPid ! {ReqRef, incorrect_status},
State#{incorrect_status_count := IncorrectCount + 1};
{{FromPid, ReqRef}, {big_payload, _Payload}} when Status == running ->
FromPid ! {ReqRef, ok},
State;
{get, ReplyFun} ->
apply_reply(ReplyFun, Num),
State;

View File

@ -411,35 +411,18 @@ t_query_counter_async_inflight(_) ->
%% send async query to make the inflight window full
?check_trace(
begin
{ok, SRef} = snabbkaffe:subscribe(
?match_event(
#{
?snk_kind := resource_worker_appended_to_inflight,
is_new := true
}
),
WindowSize,
_Timeout = 5_000
{_, {ok, _}} =
?wait_async_action(
inc_counter_in_parallel(WindowSize, ReqOpts),
#{?snk_kind := resource_worker_flush_but_inflight_full},
1_000
),
inc_counter_in_parallel(WindowSize, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef),
ok
end,
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
end
),
tap_metrics(?LINE),
%% this will block the resource_worker as the inflight window is full now
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {inc_counter, 199}),
#{?snk_kind := resource_worker_flush_but_inflight_full},
1_000
),
?assertMatch(0, ets:info(Tab0, size)),
tap_metrics(?LINE),
@ -464,9 +447,9 @@ t_query_counter_async_inflight(_) ->
%% all responses should be received after the resource is resumed.
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
%% +2 because the tmp_query above will be retried and succeed
%% this time, and there was the inc 199 request as well.
WindowSize + 2,
%% +1 because the tmp_query above will be retried and succeed
%% this time.
WindowSize + 1,
_Timeout0 = 10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
@ -504,8 +487,12 @@ t_query_counter_async_inflight(_) ->
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% again, send async query to make the inflight window full
?check_trace(
?TRACE_OPTS,
inc_counter_in_parallel(WindowSize, ReqOpts),
{_, {ok, _}} =
?wait_async_action(
inc_counter_in_parallel(WindowSize, ReqOpts),
#{?snk_kind := resource_worker_flush_but_inflight_full},
1_000
),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
@ -584,7 +571,7 @@ t_query_counter_async_inflight_batch(_) ->
end,
ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
BatchSize = 2,
WindowSize = 3,
WindowSize = 15,
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
@ -606,16 +593,12 @@ t_query_counter_async_inflight_batch(_) ->
%% send async query to make the inflight window full
NumMsgs = BatchSize * WindowSize,
?check_trace(
begin
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := call_batch_query_async}),
WindowSize,
_Timeout = 60_000
{_, {ok, _}} =
?wait_async_action(
inc_counter_in_parallel(NumMsgs, ReqOpts),
#{?snk_kind := resource_worker_flush_but_inflight_full},
5_000
),
inc_counter_in_parallel(NumMsgs, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef),
ok
end,
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query_async, Trace),
?assertMatch(
@ -674,7 +657,7 @@ t_query_counter_async_inflight_batch(_) ->
%% +1 because the tmp_query above will be retried and succeed
%% this time.
WindowSize + 1,
_Timeout = 60_000
10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
tap_metrics(?LINE),
@ -695,7 +678,7 @@ t_query_counter_async_inflight_batch(_) ->
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
NumBatches1,
_Timeout = 60_000
10_000
),
inc_counter_in_parallel(NumMsgs1, ReqOpts),
{ok, _} = snabbkaffe:receive_events(SRef),
@ -720,8 +703,12 @@ t_query_counter_async_inflight_batch(_) ->
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
%% again, send async query to make the inflight window full
?check_trace(
?TRACE_OPTS,
inc_counter_in_parallel(WindowSize, ReqOpts),
{_, {ok, _}} =
?wait_async_action(
inc_counter_in_parallel(NumMsgs, ReqOpts),
#{?snk_kind := resource_worker_flush_but_inflight_full},
5_000
),
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query_async, Trace),
?assertMatch(
@ -734,11 +721,11 @@ t_query_counter_async_inflight_batch(_) ->
%% this will block the resource_worker
ok = emqx_resource:query(?ID, {inc_counter, 1}),
Sent = NumMsgs + NumMsgs1 + WindowSize,
Sent = NumMsgs + NumMsgs1 + NumMsgs,
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
WindowSize,
_Timeout = 60_000
10_000
),
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
{ok, _} = snabbkaffe:receive_events(SRef1),
@ -785,10 +772,8 @@ t_healthy_timeout(_) ->
%% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
#{health_check_interval => 200}
),
?assertMatch(
?RESOURCE_ERROR(not_connected),
emqx_resource:query(?ID, get_state)
),
?assertError(timeout, emqx_resource:query(?ID, get_state, #{timeout => 1_000})),
?assertMatch({ok, _Group, #{status := disconnected}}, emqx_resource_manager:ets_lookup(?ID)),
ok = emqx_resource:remove_local(?ID).
t_healthy(_) ->
@ -1131,6 +1116,7 @@ t_retry_batch(_Config) ->
ok.
t_delete_and_re_create_with_same_name(_Config) ->
NumBufferWorkers = 2,
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
@ -1139,7 +1125,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
#{
query_mode => sync,
batch_size => 1,
worker_pool_size => 2,
worker_pool_size => NumBufferWorkers,
queue_seg_bytes => 100,
resume_interval => 1_000
}
@ -1154,19 +1140,21 @@ t_delete_and_re_create_with_same_name(_Config) ->
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
NumRequests = 10,
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := resource_worker_appended_to_queue}),
NumRequests,
?match_event(#{?snk_kind := resource_worker_enter_blocked}),
NumBufferWorkers,
_Timeout = 5_000
),
%% ensure replayq offloads to disk
Payload = binary:copy(<<"a">>, 119),
lists:foreach(
fun(N) ->
{error, _} =
emqx_resource:query(
?ID,
{big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>}
)
spawn_link(fun() ->
{error, _} =
emqx_resource:query(
?ID,
{big_payload, <<(integer_to_binary(N))/binary, Payload/binary>>}
)
end)
end,
lists:seq(1, NumRequests)
),
@ -1177,10 +1165,11 @@ t_delete_and_re_create_with_same_name(_Config) ->
tap_metrics(?LINE),
Queuing1 = emqx_resource_metrics:queuing_get(?ID),
Inflight1 = emqx_resource_metrics:inflight_get(?ID),
?assertEqual(NumRequests - 1, Queuing1),
?assertEqual(1, Inflight1),
?assert(Queuing1 > 0),
?assertEqual(2, Inflight1),
%% now, we delete the resource
process_flag(trap_exit, true),
ok = emqx_resource:remove_local(?ID),
?assertEqual({error, not_found}, emqx_resource_manager:lookup(?ID)),
@ -1275,9 +1264,13 @@ t_retry_sync_inflight(_Config) ->
%% now really make the resource go into `blocked' state.
%% this results in a retriable error when sync.
ok = emqx_resource:simple_sync_query(?ID, block),
{{error, {recoverable_error, incorrect_status}}, {ok, _}} =
TestPid = self(),
{_, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
spawn_link(fun() ->
Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
TestPid ! {res, Res}
end),
#{?snk_kind := resource_worker_retry_inflight_failed},
ResumeInterval * 2
),
@ -1287,9 +1280,15 @@ t_retry_sync_inflight(_Config) ->
#{?snk_kind := resource_worker_retry_inflight_succeeded},
ResumeInterval * 3
),
receive
{res, Res} ->
?assertEqual(ok, Res)
after 5_000 ->
ct:fail("no response")
end,
ok
end,
[fun ?MODULE:assert_retry_fail_then_succeed_inflight/1]
[fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1]
),
ok.
@ -1312,12 +1311,17 @@ t_retry_sync_inflight_batch(_Config) ->
QueryOpts = #{},
?check_trace(
begin
%% now really make the resource go into `blocked' state.
%% this results in a retriable error when sync.
%% make the resource go into `blocked' state. this
%% results in a retriable error when sync.
ok = emqx_resource:simple_sync_query(?ID, block),
{{error, {recoverable_error, incorrect_status}}, {ok, _}} =
process_flag(trap_exit, true),
TestPid = self(),
{_, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
spawn_link(fun() ->
Res = emqx_resource:query(?ID, {big_payload, <<"a">>}, QueryOpts),
TestPid ! {res, Res}
end),
#{?snk_kind := resource_worker_retry_inflight_failed},
ResumeInterval * 2
),
@ -1327,13 +1331,19 @@ t_retry_sync_inflight_batch(_Config) ->
#{?snk_kind := resource_worker_retry_inflight_succeeded},
ResumeInterval * 3
),
receive
{res, Res} ->
?assertEqual(ok, Res)
after 5_000 ->
ct:fail("no response")
end,
ok
end,
[fun ?MODULE:assert_retry_fail_then_succeed_inflight/1]
[fun ?MODULE:assert_sync_retry_fail_then_succeed_inflight/1]
),
ok.
t_dont_retry_async_inflight(_Config) ->
t_retry_async_inflight(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
@ -1351,33 +1361,31 @@ t_dont_retry_async_inflight(_Config) ->
QueryOpts = #{},
?check_trace(
begin
%% block,
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, block_now),
#{?snk_kind := resource_worker_enter_blocked},
ResumeInterval * 2
),
%% block
ok = emqx_resource:simple_sync_query(?ID, block),
%% then send an async request; that shouldn't be retriable.
%% then send an async request; that should be retriable.
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
#{?snk_kind := resource_worker_flush_ack},
#{?snk_kind := resource_worker_retry_inflight_failed},
ResumeInterval * 2
),
%% will re-enter running because the single request is not retriable
{ok, _} = ?block_until(
#{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2
),
%% will reply with success after the resource is healed
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := resource_worker_enter_running},
ResumeInterval * 2
),
ok
end,
[fun ?MODULE:assert_no_retry_inflight/1]
[fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1]
),
ok.
t_dont_retry_async_inflight_batch(_Config) ->
t_retry_async_inflight_batch(_Config) ->
ResumeInterval = 1_000,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
@ -1396,29 +1404,27 @@ t_dont_retry_async_inflight_batch(_Config) ->
QueryOpts = #{},
?check_trace(
begin
%% block,
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, block_now),
#{?snk_kind := resource_worker_enter_blocked},
ResumeInterval * 2
),
%% block
ok = emqx_resource:simple_sync_query(?ID, block),
%% then send an async request; that shouldn't be retriable.
%% then send an async request; that should be retriable.
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:query(?ID, {big_payload, <<"b">>}, QueryOpts),
#{?snk_kind := resource_worker_flush_ack},
#{?snk_kind := resource_worker_retry_inflight_failed},
ResumeInterval * 2
),
%% will re-enter running because the single request is not retriable
{ok, _} = ?block_until(
#{?snk_kind := resource_worker_enter_running}, ResumeInterval * 2
),
%% will reply with success after the resource is healed
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := resource_worker_enter_running},
ResumeInterval * 2
),
ok
end,
[fun ?MODULE:assert_no_retry_inflight/1]
[fun ?MODULE:assert_async_retry_fail_then_succeed_inflight/1]
),
ok.
@ -1529,7 +1535,8 @@ inc_counter_in_parallel(N, Opts0) ->
ct:fail({wait_for_query_timeout, Pid})
end
|| Pid <- Pids
].
],
ok.
inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
Parent = self(),
@ -1566,12 +1573,8 @@ tap_metrics(Line) ->
ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]),
#{counters => C, gauges => G}.
assert_no_retry_inflight(Trace) ->
?assertEqual([], ?of_kind(resource_worker_retry_inflight_failed, Trace)),
?assertEqual([], ?of_kind(resource_worker_retry_inflight_succeeded, Trace)),
ok.
assert_retry_fail_then_succeed_inflight(Trace) ->
assert_sync_retry_fail_then_succeed_inflight(Trace) ->
ct:pal(" ~p", [Trace]),
?assert(
?strict_causality(
#{?snk_kind := resource_worker_flush_nack, ref := _Ref},
@ -1589,3 +1592,23 @@ assert_retry_fail_then_succeed_inflight(Trace) ->
)
),
ok.
assert_async_retry_fail_then_succeed_inflight(Trace) ->
ct:pal(" ~p", [Trace]),
?assert(
?strict_causality(
#{?snk_kind := resource_worker_reply_after_query, action := nack, ref := _Ref},
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
Trace
)
),
%% not strict causality because it might retry more than once
%% before restoring the resource health.
?assert(
?causality(
#{?snk_kind := resource_worker_retry_inflight_failed, ref := _Ref},
#{?snk_kind := resource_worker_retry_inflight_succeeded, ref := _Ref},
Trace
)
),
ok.

View File

@ -108,6 +108,7 @@ end_per_group(_Group, _Config) ->
init_per_testcase(TestCase, Config0) when
TestCase =:= t_publish_success_batch
->
ct:timetrap({seconds, 30}),
case ?config(batch_size, Config0) of
1 ->
[{skip_due_to_no_batching, true}];
@ -120,6 +121,7 @@ init_per_testcase(TestCase, Config0) when
[{telemetry_table, Tid} | Config]
end;
init_per_testcase(TestCase, Config0) ->
ct:timetrap({seconds, 30}),
{ok, _} = start_echo_http_server(),
delete_all_bridges(),
Tid = install_telemetry_handler(TestCase),
@ -283,6 +285,7 @@ gcp_pubsub_config(Config) ->
" pool_size = 1\n"
" pipelining = ~b\n"
" resource_opts = {\n"
" request_timeout = 500ms\n"
" worker_pool_size = 1\n"
" query_mode = ~s\n"
" batch_size = ~b\n"
@ -1266,7 +1269,6 @@ t_failure_no_body(Config) ->
t_unrecoverable_error(Config) ->
ResourceId = ?config(resource_id, Config),
TelemetryTable = ?config(telemetry_table, Config),
QueryMode = ?config(query_mode, Config),
TestPid = self(),
FailureNoBodyHandler =
@ -1328,26 +1330,14 @@ t_unrecoverable_error(Config) ->
ok
end
),
wait_telemetry_event(TelemetryTable, failed, ResourceId),
ExpectedInflightEvents =
case QueryMode of
sync -> 1;
async -> 3
end,
wait_telemetry_event(
TelemetryTable,
inflight,
ResourceId,
#{n_events => ExpectedInflightEvents, timeout => 5_000}
),
%% even waiting, hard to avoid flakiness... simpler to just sleep
%% a bit until stabilization.
ct:sleep(200),
wait_until_gauge_is(queuing, 0, _Timeout = 400),
wait_until_gauge_is(inflight, 1, _Timeout = 400),
assert_metrics(
#{
dropped => 0,
failed => 1,
inflight => 0,
failed => 0,
inflight => 1,
matched => 1,
queuing => 0,
retried => 0,