feat(buffer_worker): add expiration time to requests

With this, we avoid performing work or replying to callers that are no
longer waiting on a result.

Also introduces two new counters:

- `dropped.expired` :: happens when a request expires before being
  sent downstream
- `late_reply` :: when a response is receive from downstream, but the
  caller is no longer for a reply because the request has expired, and
  the caller might even have retried it.
This commit is contained in:
Thales Macedo Garitezi 2023-01-19 18:07:08 -03:00
parent a5424959c6
commit 6fa6c679bb
11 changed files with 1090 additions and 88 deletions

View File

@ -16,19 +16,21 @@
-define(EMPTY_METRICS,
?METRICS(
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
)
).
-define(METRICS(
Dropped,
DroppedOther,
DroppedExpired,
DroppedQueueFull,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Queued,
Retried,
LateReply,
SentFailed,
SentInflight,
SentSucc,
@ -40,12 +42,14 @@
#{
'dropped' => Dropped,
'dropped.other' => DroppedOther,
'dropped.expired' => DroppedExpired,
'dropped.queue_full' => DroppedQueueFull,
'dropped.resource_not_found' => DroppedResourceNotFound,
'dropped.resource_stopped' => DroppedResourceStopped,
'matched' => Matched,
'queuing' => Queued,
'retried' => Retried,
'late_reply' => LateReply,
'failed' => SentFailed,
'inflight' => SentInflight,
'success' => SentSucc,
@ -59,12 +63,14 @@
-define(metrics(
Dropped,
DroppedOther,
DroppedExpired,
DroppedQueueFull,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Queued,
Retried,
LateReply,
SentFailed,
SentInflight,
SentSucc,
@ -76,12 +82,14 @@
#{
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.expired' := DroppedExpired,
'dropped.queue_full' := DroppedQueueFull,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
'queuing' := Queued,
'retried' := Retried,
'late_reply' := LateReply,
'failed' := SentFailed,
'inflight' := SentInflight,
'success' := SentSucc,

View File

@ -751,11 +751,11 @@ aggregate_metrics(AllMetrics) ->
fun(
#{
metrics := ?metrics(
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
)
},
?metrics(
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
)
) ->
?METRICS(
@ -773,7 +773,9 @@ aggregate_metrics(AllMetrics) ->
M12 + N12,
M13 + N13,
M14 + N14,
M15 + N15
M15 + N15,
M16 + N16,
M17 + N17
)
end,
InitMetrics,
@ -805,11 +807,13 @@ format_metrics(#{
counters := #{
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.expired' := DroppedExpired,
'dropped.queue_full' := DroppedQueueFull,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
'retried' := Retried,
'late_reply' := LateReply,
'failed' := SentFailed,
'success' := SentSucc,
'received' := Rcvd
@ -824,12 +828,14 @@ format_metrics(#{
?METRICS(
Dropped,
DroppedOther,
DroppedExpired,
DroppedQueueFull,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Queued,
Retried,
LateReply,
SentFailed,
SentInflight,
SentSucc,

View File

@ -830,7 +830,8 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"query_mode">> => <<"sync">>,
<<"request_timeout">> => <<"500ms">>,
%% using a long time so we can test recovery
<<"request_timeout">> => <<"15s">>,
%% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">>
}
@ -898,8 +899,10 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
),
Payload1 = <<"hello2">>,
Payload2 = <<"hello3">>,
emqx:publish(emqx_message:make(LocalTopic, Payload1)),
emqx:publish(emqx_message:make(LocalTopic, Payload2)),
%% we need to to it in other processes because it'll block due to
%% the long timeout
spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload1)) end),
spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload2)) end),
{ok, _} = snabbkaffe:receive_events(SRef),
%% verify the metrics of the bridge, the message should be queued
@ -917,9 +920,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"matched">> := Matched,
<<"success">> := 1,
<<"failed">> := 0,
<<"queuing">> := 1,
<<"inflight">> := 1
} when Matched >= 3,
<<"queuing">> := Queuing,
<<"inflight">> := Inflight
} when Matched >= 3 andalso Inflight + Queuing == 2,
maps:get(<<"metrics">>, DecodedMetrics1)
),

View File

@ -29,6 +29,8 @@
-type query_opts() :: #{
%% The key used for picking a resource worker
pick_key => term(),
timeout => timeout(),
expire_at => infinity | integer(),
async_reply_fun => reply_fun()
}.
-type resource_data() :: #{

View File

@ -60,21 +60,23 @@
-define(COLLECT_REQ_LIMIT, 1000).
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
-define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}).
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
-define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}).
-define(EXPAND(RESULT, BATCH), [
?REPLY(FROM, REQUEST, SENT, RESULT)
|| ?QUERY(FROM, REQUEST, SENT) <- BATCH
|| ?QUERY(FROM, REQUEST, SENT, _EXPIRE_AT) <- BATCH
]).
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, WorkerMRef),
{Ref, BatchOrQuery, IsRetriable, WorkerMRef}
).
-define(ITEM_IDX, 2).
-define(RETRY_IDX, 3).
-define(WORKER_MREF_IDX, 4).
-type id() :: binary().
-type index() :: pos_integer().
-type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean()).
-type expire_at() :: infinity | integer().
-type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean(), expire_at()).
-type request() :: term().
-type from() :: pid() | reply_fun() | request_from().
-type request_from() :: undefined | gen_statem:from().
@ -98,14 +100,18 @@ start_link(Id, Index, Opts) ->
gen_statem:start_link(?MODULE, {Id, Index, Opts}, []).
-spec sync_query(id(), request(), query_opts()) -> Result :: term().
sync_query(Id, Request, Opts) ->
sync_query(Id, Request, Opts0) ->
Opts1 = ensure_timeout_query_opts(Opts0, sync),
Opts = ensure_expire_at(Opts1),
PickKey = maps:get(pick_key, Opts, self()),
Timeout = maps:get(timeout, Opts, timer:seconds(15)),
Timeout = maps:get(timeout, Opts),
emqx_resource_metrics:matched_inc(Id),
pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
-spec async_query(id(), request(), query_opts()) -> Result :: term().
async_query(Id, Request, Opts) ->
async_query(Id, Request, Opts0) ->
Opts1 = ensure_timeout_query_opts(Opts0, async),
Opts = ensure_expire_at(Opts1),
PickKey = maps:get(pick_key, Opts, self()),
emqx_resource_metrics:matched_inc(Id),
pick_cast(Id, PickKey, {query, Request, Opts}).
@ -120,11 +126,15 @@ simple_sync_query(Id, Request) ->
%% would mess up the metrics anyway. `undefined' is ignored by
%% `emqx_resource_metrics:*_shift/3'.
Index = undefined,
QueryOpts = #{simple_query => true},
QueryOpts0 = #{simple_query => true, timeout => infinity},
QueryOpts = #{expire_at := ExpireAt} = ensure_expire_at(QueryOpts0),
emqx_resource_metrics:matched_inc(Id),
Ref = make_message_ref(),
Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts),
HasBeenSent = false,
From = self(),
Result = call_query(
sync, Id, Index, Ref, ?QUERY(From, Request, HasBeenSent, ExpireAt), QueryOpts
),
_ = handle_query_result(Id, Result, HasBeenSent),
Result.
@ -179,9 +189,14 @@ init({Id, Index, Opts}) ->
?tp(buffer_worker_init, #{id => Id, index => Index}),
{ok, running, Data}.
running(enter, _, St) ->
running(enter, _, Data) ->
?tp(buffer_worker_enter_running, #{}),
maybe_flush(St);
%% According to `gen_statem' laws, we mustn't call `maybe_flush'
%% directly because it may decide to return `{next_state, blocked, _}',
%% and that's an invalid response for a state enter call.
%% Returning a next event from a state enter call is also
%% prohibited.
{keep_state, ensure_flush_timer(Data, 0)};
running(cast, resume, _St) ->
keep_state_and_data;
running(cast, flush, Data) ->
@ -243,9 +258,9 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%==============================================================================
-define(PICK(ID, KEY, EXPR),
-define(PICK(ID, KEY, PID, EXPR),
try gproc_pool:pick_worker(ID, KEY) of
Pid when is_pid(Pid) ->
PID when is_pid(PID) ->
EXPR;
_ ->
?RESOURCE_ERROR(worker_not_created, "resource not created")
@ -258,7 +273,7 @@ code_change(_OldVsn, State, _Extra) ->
).
pick_call(Id, Key, Query, Timeout) ->
?PICK(Id, Key, begin
?PICK(Id, Key, Pid, begin
Caller = self(),
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
From = {Caller, MRef},
@ -281,15 +296,21 @@ pick_call(Id, Key, Query, Timeout) ->
end).
pick_cast(Id, Key, Query) ->
?PICK(Id, Key, begin
?PICK(Id, Key, Pid, begin
From = undefined,
erlang:send(Pid, ?SEND_REQ(From, Query)),
ok
end).
resume_from_blocked(Data) ->
#{inflight_tid := InflightTID} = Data,
case inflight_get_first_retriable(InflightTID) of
?tp(buffer_worker_resume_from_blocked_enter, #{}),
#{
id := Id,
index := Index,
inflight_tid := InflightTID
} = Data,
Now = now_(),
case inflight_get_first_retriable(InflightTID, Now) of
none ->
case is_inflight_full(InflightTID) of
true ->
@ -297,14 +318,32 @@ resume_from_blocked(Data) ->
false ->
{next_state, running, Data}
end;
{Ref, FirstQuery} ->
{expired, Ref, Batch} ->
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
?tp(buffer_worker_retry_expired, #{expired => Batch}),
resume_from_blocked(Data);
{single, Ref, Query} ->
%% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again.
case is_inflight_full(InflightTID) of
true ->
{keep_state, Data};
false ->
retry_inflight_sync(Ref, FirstQuery, Data)
retry_inflight_sync(Ref, Query, Data)
end;
{batch, Ref, NotExpired, Expired} ->
update_inflight_item(InflightTID, Ref, NotExpired),
NumExpired = length(Expired),
emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
NumExpired > 0 andalso ?tp(buffer_worker_retry_expired, #{expired => Expired}),
%% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again.
case is_inflight_full(InflightTID) of
true ->
{keep_state, Data};
false ->
retry_inflight_sync(Ref, NotExpired, Data)
end
end.
@ -320,10 +359,10 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
ReplyResult =
case QueryOrBatch of
?QUERY(From, CoreReq, HasBeenSent) ->
?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) ->
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
reply_caller_defer_metrics(Id, Reply, QueryOpts);
[?QUERY(_, _, _) | _] = Batch ->
[?QUERY(_, _, _, _) | _] = Batch ->
batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
end,
case ReplyResult of
@ -378,10 +417,12 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
(?SEND_REQ(undefined = _From, {query, Req, Opts})) ->
ReplyFun = maps:get(async_reply_fun, Opts, undefined),
HasBeenSent = false,
?QUERY(ReplyFun, Req, HasBeenSent);
(?SEND_REQ(From, {query, Req, _Opts})) ->
ExpireAt = maps:get(expire_at, Opts),
?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt);
(?SEND_REQ(From, {query, Req, Opts})) ->
HasBeenSent = false,
?QUERY(From, Req, HasBeenSent)
ExpireAt = maps:get(expire_at, Opts),
?QUERY(From, Req, HasBeenSent, ExpireAt)
end,
Requests
),
@ -406,6 +447,8 @@ maybe_flush(Data0) ->
-spec flush(data()) -> gen_statem:event_handler_result(state(), data()).
flush(Data0) ->
#{
id := Id,
index := Index,
batch_size := BatchSize,
inflight_tid := InflightTID,
queue := Q0
@ -419,25 +462,45 @@ flush(Data0) ->
Data2 = ensure_flush_timer(Data1),
{keep_state, Data2};
{_, false} ->
?tp(buffer_worker_flush_before_pop, #{}),
{Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
IsBatch = BatchSize =/= 1,
%% We *must* use the new queue, because we currently can't
%% `nack' a `pop'.
%% Maybe we could re-open the queue?
Data2 = Data1#{queue := Q1},
Ref = make_message_ref(),
do_flush(Data2, #{
new_queue => Q1,
is_batch => IsBatch,
batch => Batch,
ref => Ref,
ack_ref => QAckRef
})
?tp(buffer_worker_flush_before_sieve_expired, #{}),
Now = now_(),
%% if the request has expired, the caller is no longer
%% waiting for a response.
case sieve_expired_requests(Batch, Now) of
all_expired ->
ok = replayq:ack(Q1, QAckRef),
emqx_resource_metrics:dropped_expired_inc(Id, length(Batch)),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
flush(Data2);
{NotExpired, Expired} ->
NumExpired = length(Expired),
emqx_resource_metrics:dropped_expired_inc(Id, NumExpired),
IsBatch = BatchSize =/= 1,
%% We *must* use the new queue, because we currently can't
%% `nack' a `pop'.
%% Maybe we could re-open the queue?
?tp(
buffer_worker_flush_potentially_partial,
#{expired => Expired, not_expired => NotExpired}
),
Ref = make_message_ref(),
do_flush(Data2, #{
new_queue => Q1,
is_batch => IsBatch,
batch => NotExpired,
ref => Ref,
ack_ref => QAckRef
})
end
end.
-spec do_flush(data(), #{
is_batch := boolean(),
batch := [?QUERY(from(), request(), boolean())],
batch := [queue_query()],
ack_ref := replayq:ack_ref(),
ref := inflight_key(),
new_queue := replayq:q()
@ -459,7 +522,7 @@ do_flush(
inflight_tid := InflightTID
} = Data0,
%% unwrap when not batching (i.e., batch size == 1)
[?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
[?QUERY(From, CoreReq, HasBeenSent, _ExpireAt) = Request] = Batch,
QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
@ -812,10 +875,10 @@ call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
end
).
apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
}),
@ -834,13 +897,13 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt
end,
Request
);
apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, _QueryOpts) ->
?tp(call_batch_query, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
}),
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch],
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
}),
@ -850,7 +913,7 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt
begin
ReplyFun = fun ?MODULE:batch_reply_after_query/8,
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]},
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
Requests = [Request || ?QUERY(_From, Request, _, _ExpireAt) <- Batch],
IsRetriable = false,
WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, WorkerMRef),
@ -862,7 +925,41 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt
).
reply_after_query(
Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), QueryOpts, Result
Pid,
Id,
Index,
InflightTID,
Ref,
?QUERY(_From, _Request, _HasBeenSent, ExpireAt) = Query,
QueryOpts,
Result
) ->
?tp(
buffer_worker_reply_after_query_enter,
#{batch_or_query => [Query], ref => Ref}
),
Now = now_(),
case is_expired(ExpireAt, Now) of
true ->
IsFullBefore = is_inflight_full(InflightTID),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
IsFullBefore andalso ?MODULE:flush_worker(Pid),
?tp(buffer_worker_reply_after_query_expired, #{expired => [Query]}),
ok;
false ->
do_reply_after_query(Pid, Id, Index, InflightTID, Ref, Query, QueryOpts, Result)
end.
do_reply_after_query(
Pid,
Id,
Index,
InflightTID,
Ref,
?QUERY(From, Request, HasBeenSent, _ExpireAt),
QueryOpts,
Result
) ->
%% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the
@ -875,7 +972,7 @@ reply_after_query(
%% Keep retrying.
?tp(buffer_worker_reply_after_query, #{
action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent),
batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt),
ref => Ref,
result => Result
}),
@ -884,7 +981,7 @@ reply_after_query(
ack ->
?tp(buffer_worker_reply_after_query, #{
action => Action,
batch_or_query => ?QUERY(From, Request, HasBeenSent),
batch_or_query => ?QUERY(From, Request, HasBeenSent, _ExpireAt),
ref => Ref,
result => Result
}),
@ -896,6 +993,34 @@ reply_after_query(
end.
batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
?tp(
buffer_worker_reply_after_query_enter,
#{batch_or_query => Batch, ref => Ref}
),
Now = now_(),
case sieve_expired_requests(Batch, Now) of
all_expired ->
IsFullBefore = is_inflight_full(InflightTID),
IsAcked = ack_inflight(InflightTID, Ref, Id, Index),
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
IsFullBefore andalso ?MODULE:flush_worker(Pid),
?tp(buffer_worker_reply_after_query_expired, #{expired => Batch}),
ok;
{NotExpired, Expired} ->
NumExpired = length(Expired),
emqx_resource_metrics:late_reply_inc(Id, NumExpired),
NumExpired > 0 andalso
?tp(buffer_worker_reply_after_query_expired, #{expired => Expired}),
do_batch_reply_after_query(
Pid, Id, Index, InflightTID, Ref, NotExpired, QueryOpts, Result
)
end.
do_batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
?tp(
buffer_worker_reply_after_query_enter,
#{batch_or_query => Batch, ref => Ref}
),
%% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the
%% inflight window.
@ -986,9 +1111,12 @@ inflight_new(InfltWinSZ, Id, Index) ->
),
TableId.
-spec inflight_get_first_retriable(ets:tid()) ->
none | {integer(), [?QUERY(_, _, _)] | ?QUERY(_, _, _)}.
inflight_get_first_retriable(InflightTID) ->
-spec inflight_get_first_retriable(ets:tid(), integer()) ->
none
| {expired, inflight_key(), [queue_query()]}
| {single, inflight_key(), queue_query()}
| {batch, inflight_key(), _NotExpired :: [queue_query()], _Expired :: [queue_query()]}.
inflight_get_first_retriable(InflightTID, Now) ->
MatchSpec =
ets:fun2ms(
fun(?INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, _WorkerMRef)) when
@ -1000,8 +1128,22 @@ inflight_get_first_retriable(InflightTID) ->
case ets:select(InflightTID, MatchSpec, _Limit = 1) of
'$end_of_table' ->
none;
{[{Ref, BatchOrQuery}], _Continuation} ->
{Ref, BatchOrQuery}
{[{Ref, Query = ?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} ->
case is_expired(ExpireAt, Now) of
true ->
{expired, Ref, [Query]};
false ->
{single, Ref, Query}
end;
{[{Ref, Batch = [_ | _]}], _Continuation} ->
%% batch is non-empty because we check that in
%% `sieve_expired_requests'.
case sieve_expired_requests(Batch, Now) of
all_expired ->
{expired, Ref, Batch};
{NotExpired, Expired} ->
{batch, Ref, NotExpired, Expired}
end
end.
is_inflight_full(undefined) ->
@ -1030,7 +1172,7 @@ inflight_append(undefined, _InflightItem, _Id, _Index) ->
ok;
inflight_append(
InflightTID,
?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch0, IsRetriable, WorkerMRef),
?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, WorkerMRef),
Id,
Index
) ->
@ -1044,7 +1186,9 @@ inflight_append(
ok;
inflight_append(
InflightTID,
?INFLIGHT_ITEM(Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, IsRetriable, WorkerMRef),
?INFLIGHT_ITEM(
Ref, ?QUERY(_From, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, WorkerMRef
),
Id,
Index
) ->
@ -1106,9 +1250,9 @@ ack_inflight(undefined, _Ref, _Id, _Index) ->
ack_inflight(InflightTID, Ref, Id, Index) ->
Count =
case ets:take(InflightTID, Ref) of
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _), _IsRetriable, _WorkerMRef)] ->
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _WorkerMRef)] ->
1;
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _WorkerMRef)] ->
length(Batch);
_ ->
0
@ -1133,6 +1277,12 @@ mark_inflight_items_as_retriable(Data, WorkerMRef) ->
?tp(buffer_worker_worker_down_update, #{num_affected => _NumAffected}),
ok.
%% used to update a batch after dropping expired individual queries.
update_inflight_item(InflightTID, Ref, NewBatch) ->
_ = ets:update_element(InflightTID, Ref, {?ITEM_IDX, NewBatch}),
?tp(buffer_worker_worker_update_inflight_item, #{ref => Ref}),
ok.
%%==============================================================================
inc_sent_failed(Id, _HasBeenSent = true) ->
@ -1180,11 +1330,14 @@ clear_disk_queue_dir(Id, Index) ->
Res
end.
ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) ->
ensure_flush_timer(Data = #{batch_time := T}) ->
ensure_flush_timer(Data, T).
ensure_flush_timer(Data = #{tref := undefined}, T) ->
Ref = make_ref(),
TRef = erlang:send_after(T, self(), {flush, Ref}),
Data#{tref => {TRef, Ref}};
ensure_flush_timer(Data) ->
ensure_flush_timer(Data, _T) ->
Data.
cancel_flush_timer(St = #{tref := undefined}) ->
@ -1195,7 +1348,7 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
-spec make_message_ref() -> inflight_key().
make_message_ref() ->
erlang:monotonic_time(nanosecond).
now_().
collect_requests(Acc, Limit) ->
Count = length(Acc),
@ -1213,9 +1366,9 @@ do_collect_requests(Acc, Count, Limit) ->
mark_as_sent(Batch) when is_list(Batch) ->
lists:map(fun mark_as_sent/1, Batch);
mark_as_sent(?QUERY(From, Req, _)) ->
mark_as_sent(?QUERY(From, Req, _HasBeenSent, ExpireAt)) ->
HasBeenSent = true,
?QUERY(From, Req, HasBeenSent).
?QUERY(From, Req, HasBeenSent, ExpireAt).
is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
true;
@ -1235,3 +1388,49 @@ is_async_return({async_return, _}) ->
true;
is_async_return(_) ->
false.
sieve_expired_requests(Batch, Now) ->
{Expired, NotExpired} =
lists:partition(
fun(?QUERY(_From, _CoreReq, _HasBeenSent, ExpireAt)) ->
is_expired(ExpireAt, Now)
end,
Batch
),
case {NotExpired, Expired} of
{[], []} ->
%% Should be impossible for batch_size >= 1.
all_expired;
{[], [_ | _]} ->
all_expired;
{[_ | _], _} ->
{NotExpired, Expired}
end.
-spec is_expired(infinity | integer(), integer()) -> boolean().
is_expired(infinity = _ExpireAt, _Now) ->
false;
is_expired(ExpireAt, Now) ->
Now > ExpireAt.
now_() ->
erlang:monotonic_time(nanosecond).
-spec ensure_timeout_query_opts(query_opts(), sync | async) -> query_opts().
ensure_timeout_query_opts(#{timeout := _} = Opts, _SyncOrAsync) ->
Opts;
ensure_timeout_query_opts(#{} = Opts0, sync) ->
TimeoutMS = timer:seconds(15),
Opts0#{timeout => TimeoutMS};
ensure_timeout_query_opts(#{} = Opts0, async) ->
Opts0#{timeout => infinity}.
-spec ensure_expire_at(query_opts()) -> query_opts().
ensure_expire_at(#{expire_at := _} = Opts) ->
Opts;
ensure_expire_at(#{timeout := infinity} = Opts) ->
Opts#{expire_at => infinity};
ensure_expire_at(#{timeout := TimeoutMS} = Opts) ->
TimeoutNS = erlang:convert_time_unit(TimeoutMS, millisecond, nanosecond),
ExpireAt = now_() + TimeoutNS,
Opts#{expire_at => ExpireAt}.

View File

@ -134,8 +134,10 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
'retried.success',
'retried.failed',
'success',
'late_reply',
'failed',
'dropped',
'dropped.expired',
'dropped.queue_full',
'dropped.resource_not_found',
'dropped.resource_stopped',

View File

@ -34,6 +34,9 @@
dropped_other_inc/1,
dropped_other_inc/2,
dropped_other_get/1,
dropped_expired_inc/1,
dropped_expired_inc/2,
dropped_expired_get/1,
dropped_queue_full_inc/1,
dropped_queue_full_inc/2,
dropped_queue_full_get/1,
@ -46,6 +49,9 @@
failed_inc/1,
failed_inc/2,
failed_get/1,
late_reply_inc/1,
late_reply_inc/2,
late_reply_get/1,
matched_inc/1,
matched_inc/2,
matched_get/1,
@ -75,9 +81,11 @@ events() ->
[?TELEMETRY_PREFIX, Event]
|| Event <- [
dropped_other,
dropped_expired,
dropped_queue_full,
dropped_resource_not_found,
dropped_resource_stopped,
late_reply,
failed,
inflight,
matched,
@ -114,6 +122,9 @@ handle_telemetry_event(
dropped_other ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.other', Val);
dropped_expired ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.expired', Val);
dropped_queue_full ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val);
@ -123,6 +134,8 @@ handle_telemetry_event(
dropped_resource_stopped ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val),
emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_stopped', Val);
late_reply ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'late_reply', Val);
failed ->
emqx_metrics_worker:inc(?RES_METRICS, ID, 'failed', Val);
matched ->
@ -211,6 +224,30 @@ dropped_other_inc(ID, Val) ->
dropped_other_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.other').
%% @doc Count of messages dropped due to being expired before being sent.
dropped_expired_inc(ID) ->
dropped_expired_inc(ID, 1).
dropped_expired_inc(ID, Val) ->
telemetry:execute([?TELEMETRY_PREFIX, dropped_expired], #{counter_inc => Val}, #{
resource_id => ID
}).
dropped_expired_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.expired').
%% @doc Count of messages that were sent but received a late reply.
late_reply_inc(ID) ->
late_reply_inc(ID, 1).
late_reply_inc(ID, Val) ->
telemetry:execute([?TELEMETRY_PREFIX, late_reply], #{counter_inc => Val}, #{
resource_id => ID
}).
late_reply_get(ID) ->
emqx_metrics_worker:get(?RES_METRICS, ID, 'late_reply').
%% @doc Count of messages dropped because the queue was full
dropped_queue_full_inc(ID) ->
dropped_queue_full_inc(ID, 1).

View File

@ -260,7 +260,7 @@ counter_loop(
?tp(connector_demo_inc_counter_async, #{n => N}),
State#{counter => Num + N};
{big_payload, _Payload, ReplyFun} when Status == blocked ->
apply_reply(ReplyFun, {error, blocked}),
apply_reply(ReplyFun, {error, {recoverable_error, blocked}}),
State;
{{FromPid, ReqRef}, {inc, N}} when Status == running ->
%ct:pal("sync counter recv: ~p", [{inc, N}]),

View File

@ -232,7 +232,7 @@ t_batch_query_counter(_) ->
fun(Result, Trace) ->
?assertMatch({ok, 0}, Result),
QueryTrace = ?of_kind(call_batch_query, Trace),
?assertMatch([#{batch := [{query, _, get_counter, _}]}], QueryTrace)
?assertMatch([#{batch := [{query, _, get_counter, _, _}]}], QueryTrace)
end
),
@ -284,7 +284,7 @@ t_query_counter_async_query(_) ->
fun(Trace) ->
%% the callback_mode of 'emqx_connector_demo' is 'always_sync'.
QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
end
),
%% simple query ignores the query_mode and batching settings in the resource_worker
@ -295,7 +295,7 @@ t_query_counter_async_query(_) ->
?assertMatch({ok, 1000}, Result),
%% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace)
?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace)
end
),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
@ -337,7 +337,7 @@ t_query_counter_async_callback(_) ->
end,
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
end
),
@ -348,7 +348,7 @@ t_query_counter_async_callback(_) ->
fun(Result, Trace) ->
?assertMatch({ok, 1000}, Result),
QueryTrace = ?of_kind(call_query, Trace),
?assertMatch([#{query := {query, _, get_counter, _}}], QueryTrace)
?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace)
end
),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
@ -419,7 +419,7 @@ t_query_counter_async_inflight(_) ->
),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
end
),
tap_metrics(?LINE),
@ -476,7 +476,7 @@ t_query_counter_async_inflight(_) ->
end,
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace),
?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace),
?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
tap_metrics(?LINE),
ok
@ -495,7 +495,7 @@ t_query_counter_async_inflight(_) ->
),
fun(Trace) ->
QueryTrace = ?of_kind(call_query_async, Trace),
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
end
),
@ -605,8 +605,8 @@ t_query_counter_async_inflight_batch(_) ->
[
#{
batch := [
{query, _, {inc_counter, 1}, _},
{query, _, {inc_counter, 1}, _}
{query, _, {inc_counter, 1}, _, _},
{query, _, {inc_counter, 1}, _, _}
]
}
| _
@ -687,7 +687,7 @@ t_query_counter_async_inflight_batch(_) ->
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query_async, Trace),
?assertMatch(
[#{batch := [{query, _, {inc_counter, _}, _} | _]} | _],
[#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _],
QueryTrace
)
end
@ -712,7 +712,7 @@ t_query_counter_async_inflight_batch(_) ->
fun(Trace) ->
QueryTrace = ?of_kind(call_batch_query_async, Trace),
?assertMatch(
[#{batch := [{query, _, {inc_counter, _}, _} | _]} | _],
[#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _],
QueryTrace
)
end
@ -1499,9 +1499,680 @@ t_async_pool_worker_death(_Config) ->
),
ok.
t_expiration_sync_before_sending(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 1,
worker_pool_size => 1,
resume_interval => 1_000
}
),
do_t_expiration_before_sending(sync).
t_expiration_sync_batch_before_sending(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
resume_interval => 1_000
}
),
do_t_expiration_before_sending(sync).
t_expiration_async_before_sending(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
worker_pool_size => 1,
resume_interval => 1_000
}
),
do_t_expiration_before_sending(async).
t_expiration_async_batch_before_sending(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
resume_interval => 1_000
}
),
do_t_expiration_before_sending(async).
do_t_expiration_before_sending(QueryMode) ->
?check_trace(
begin
ok = emqx_resource:simple_sync_query(?ID, block),
?force_ordering(
#{?snk_kind := buffer_worker_flush_before_pop},
#{?snk_kind := delay_enter}
),
?force_ordering(
#{?snk_kind := delay},
#{?snk_kind := buffer_worker_flush_before_sieve_expired}
),
TimeoutMS = 100,
spawn_link(fun() ->
case QueryMode of
sync ->
?assertError(
timeout,
emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS})
);
async ->
?assertEqual(
ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS})
)
end
end),
spawn_link(fun() ->
?tp(delay_enter, #{}),
ct:sleep(2 * TimeoutMS),
?tp(delay, #{}),
ok
end),
{ok, _} = ?block_until(#{?snk_kind := buffer_worker_flush_all_expired}, 4 * TimeoutMS),
ok
end,
fun(Trace) ->
?assertMatch(
[#{batch := [{query, _, {inc_counter, 99}, _, _}]}],
?of_kind(buffer_worker_flush_all_expired, Trace)
),
Metrics = tap_metrics(?LINE),
?assertMatch(
#{
counters := #{
matched := 2,
%% the block call
success := 1,
dropped := 1,
'dropped.expired' := 1,
retried := 0,
failed := 0
}
},
Metrics
),
ok
end
),
ok.
t_expiration_sync_before_sending_partial_batch(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
resume_interval => 1_000
}
),
install_telemetry_handler(?FUNCTION_NAME),
do_t_expiration_before_sending_partial_batch(sync).
t_expiration_async_before_sending_partial_batch(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
resume_interval => 1_000
}
),
install_telemetry_handler(?FUNCTION_NAME),
do_t_expiration_before_sending_partial_batch(async).
do_t_expiration_before_sending_partial_batch(QueryMode) ->
?check_trace(
begin
ok = emqx_resource:simple_sync_query(?ID, block),
?force_ordering(
#{?snk_kind := buffer_worker_flush_before_pop},
#{?snk_kind := delay_enter}
),
?force_ordering(
#{?snk_kind := delay},
#{?snk_kind := buffer_worker_flush_before_sieve_expired}
),
Pid0 =
spawn_link(fun() ->
?assertEqual(
ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity})
),
?tp(infinity_query_returned, #{})
end),
TimeoutMS = 100,
Pid1 =
spawn_link(fun() ->
case QueryMode of
sync ->
?assertError(
timeout,
emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
);
async ->
?assertEqual(
ok,
emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
)
end
end),
Pid2 =
spawn_link(fun() ->
?tp(delay_enter, #{}),
ct:sleep(2 * TimeoutMS),
?tp(delay, #{}),
ok
end),
{ok, _} = ?block_until(
#{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS
),
ok = emqx_resource:simple_sync_query(?ID, resume),
case QueryMode of
async ->
{ok, _} = ?block_until(
#{
?snk_kind := buffer_worker_reply_after_query,
action := ack,
batch_or_query := [{query, _, {inc_counter, 99}, _, _}]
},
10 * TimeoutMS
);
sync ->
%% more time because it needs to retry if sync
{ok, _} = ?block_until(#{?snk_kind := infinity_query_returned}, 20 * TimeoutMS)
end,
lists:foreach(
fun(Pid) ->
unlink(Pid),
exit(Pid, kill)
end,
[Pid0, Pid1, Pid2]
),
ok
end,
fun(Trace) ->
?assertMatch(
[
#{
expired := [{query, _, {inc_counter, 199}, _, _}],
not_expired := [{query, _, {inc_counter, 99}, _, _}]
}
],
?of_kind(buffer_worker_flush_potentially_partial, Trace)
),
wait_until_gauge_is(inflight, 0, 500),
Metrics = tap_metrics(?LINE),
case QueryMode of
async ->
?assertMatch(
#{
counters := #{
matched := 4,
%% the block call, the request with
%% infinity timeout, and the resume
%% call.
success := 3,
dropped := 1,
'dropped.expired' := 1,
%% was sent successfully and held by
%% the test connector.
retried := 0,
failed := 0
}
},
Metrics
);
sync ->
?assertMatch(
#{
counters := #{
matched := 4,
%% the block call, the request with
%% infinity timeout, and the resume
%% call.
success := 3,
dropped := 1,
'dropped.expired' := 1,
%% currently, the test connector
%% replies with an error that may make
%% the buffer worker retry.
retried := Retried,
failed := 0
}
} when Retried =< 1,
Metrics
)
end,
ok
end
),
ok.
t_expiration_async_after_reply(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 1,
batch_time => 100,
worker_pool_size => 1,
resume_interval => 1_000
}
),
install_telemetry_handler(?FUNCTION_NAME),
do_t_expiration_async_after_reply(single).
t_expiration_async_batch_after_reply(_Config) ->
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
resume_interval => 2_000
}
),
install_telemetry_handler(?FUNCTION_NAME),
do_t_expiration_async_after_reply(batch).
do_t_expiration_async_after_reply(IsBatch) ->
?check_trace(
begin
NAcks =
case IsBatch of
batch -> 1;
single -> 2
end,
?force_ordering(
#{?snk_kind := buffer_worker_flush_ack},
NAcks,
#{?snk_kind := delay_enter},
_Guard = true
),
?force_ordering(
#{?snk_kind := delay},
#{
?snk_kind := buffer_worker_reply_after_query_enter,
batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
}
),
TimeoutMS = 100,
?assertEqual(
ok,
emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
),
?assertEqual(
ok, emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => infinity})
),
Pid0 =
spawn_link(fun() ->
?tp(delay_enter, #{}),
ct:sleep(2 * TimeoutMS),
?tp(delay, #{}),
ok
end),
{ok, _} = ?block_until(
#{?snk_kind := buffer_worker_flush_potentially_partial}, 4 * TimeoutMS
),
{ok, _} = ?block_until(
#{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS
),
unlink(Pid0),
exit(Pid0, kill),
ok
end,
fun(Trace) ->
?assertMatch(
[
#{
expired := [{query, _, {inc_counter, 199}, _, _}]
}
],
?of_kind(buffer_worker_reply_after_query_expired, Trace)
),
wait_telemetry_event(success, #{n_events => 1, timeout => 4_000}),
Metrics = tap_metrics(?LINE),
?assertMatch(
#{
counters := #{
matched := 2,
%% the request with infinity timeout.
success := 1,
dropped := 0,
late_reply := 1,
retried := 0,
failed := 0
}
},
Metrics
),
ok
end
),
ok.
t_expiration_batch_all_expired_after_reply(_Config) ->
ResumeInterval = 300,
emqx_connector_demo:set_callback_mode(async_if_possible),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => async,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
resume_interval => ResumeInterval
}
),
?check_trace(
begin
?force_ordering(
#{?snk_kind := buffer_worker_flush_ack},
#{?snk_kind := delay_enter}
),
?force_ordering(
#{?snk_kind := delay},
#{
?snk_kind := buffer_worker_reply_after_query_enter,
batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
}
),
TimeoutMS = 200,
?assertEqual(
ok,
emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
),
Pid0 =
spawn_link(fun() ->
?tp(delay_enter, #{}),
ct:sleep(2 * TimeoutMS),
?tp(delay, #{}),
ok
end),
{ok, _} = ?block_until(
#{?snk_kind := buffer_worker_reply_after_query_expired}, 10 * TimeoutMS
),
unlink(Pid0),
exit(Pid0, kill),
ok
end,
fun(Trace) ->
?assertMatch(
[
#{
expired := [{query, _, {inc_counter, 199}, _, _}]
}
],
?of_kind(buffer_worker_reply_after_query_expired, Trace)
),
Metrics = tap_metrics(?LINE),
?assertMatch(
#{
counters := #{
matched := 1,
success := 0,
dropped := 0,
late_reply := 1,
retried := 0,
failed := 0
}
},
Metrics
),
ok
end
),
ok.
t_expiration_retry(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 1,
batch_time => 100,
worker_pool_size => 1,
resume_interval => 300
}
),
do_t_expiration_retry(single).
t_expiration_retry_batch(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
resume_interval => 300
}
),
do_t_expiration_retry(batch).
do_t_expiration_retry(IsBatch) ->
ResumeInterval = 300,
?check_trace(
begin
ok = emqx_resource:simple_sync_query(?ID, block),
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_flush_nack}),
1,
200
),
TimeoutMS = 100,
%% the request that expires must be first, so it's the
%% head of the inflight table (and retriable).
{ok, SRef1} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_appended_to_queue}),
1,
ResumeInterval * 2
),
spawn_link(fun() ->
?assertError(
timeout,
emqx_resource:query(
?ID,
{inc_counter, 1},
#{timeout => TimeoutMS}
)
)
end),
Pid1 =
spawn_link(fun() ->
receive
go -> ok
end,
?assertEqual(
ok,
emqx_resource:query(
?ID,
{inc_counter, 2},
#{timeout => infinity}
)
)
end),
{ok, _} = snabbkaffe:receive_events(SRef1),
Pid1 ! go,
{ok, _} = snabbkaffe:receive_events(SRef0),
{ok, _} =
?block_until(
#{?snk_kind := buffer_worker_retry_expired},
ResumeInterval * 10
),
SuccessEventKind =
case IsBatch of
batch -> buffer_worker_retry_inflight_succeeded;
single -> buffer_worker_flush_ack
end,
{ok, {ok, _}} =
?wait_async_action(
emqx_resource:simple_sync_query(?ID, resume),
#{?snk_kind := SuccessEventKind},
ResumeInterval * 5
),
ok
end,
fun(Trace) ->
?assertMatch(
[#{expired := [{query, _, {inc_counter, 1}, _, _}]}],
?of_kind(buffer_worker_retry_expired, Trace)
),
ok
end
),
ok.
t_expiration_retry_batch_multiple_times(_Config) ->
ResumeInterval = 300,
emqx_connector_demo:set_callback_mode(always_sync),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
query_mode => sync,
batch_size => 2,
batch_time => 100,
worker_pool_size => 1,
resume_interval => ResumeInterval
}
),
?check_trace(
begin
ok = emqx_resource:simple_sync_query(?ID, block),
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_flush_nack}),
1,
200
),
TimeoutMS = 100,
spawn_link(fun() ->
?assertError(
timeout,
emqx_resource:query(
?ID,
{inc_counter, 1},
#{timeout => TimeoutMS}
)
)
end),
spawn_link(fun() ->
?assertError(
timeout,
emqx_resource:query(
?ID,
{inc_counter, 2},
#{timeout => ResumeInterval + TimeoutMS}
)
)
end),
{ok, _} = snabbkaffe:receive_events(SRef),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := buffer_worker_retry_expired}),
ResumeInterval * 10
),
ok
end,
fun(Trace) ->
?assertMatch(
[
#{expired := [{query, _, {inc_counter, 1}, _, _}]},
#{expired := [{query, _, {inc_counter, 2}, _, _}]}
],
?of_kind(buffer_worker_retry_expired, Trace)
),
ok
end
),
ok.
%%------------------------------------------------------------------------------
%% Helpers
%%------------------------------------------------------------------------------
inc_counter_in_parallel(N) ->
inc_counter_in_parallel(N, #{}).
@ -1564,6 +2235,81 @@ tap_metrics(Line) ->
ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]),
#{counters => C, gauges => G}.
install_telemetry_handler(TestCase) ->
Tid = ets:new(TestCase, [ordered_set, public]),
HandlerId = TestCase,
TestPid = self(),
_ = telemetry:attach_many(
HandlerId,
emqx_resource_metrics:events(),
fun(EventName, Measurements, Metadata, _Config) ->
Data = #{
name => EventName,
measurements => Measurements,
metadata => Metadata
},
ets:insert(Tid, {erlang:monotonic_time(), Data}),
TestPid ! {telemetry, Data},
ok
end,
unused_config
),
on_exit(fun() ->
telemetry:detach(HandlerId),
ets:delete(Tid)
end),
put({?MODULE, telemetry_table}, Tid),
Tid.
wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
Events = receive_all_events(GaugeName, Timeout),
case length(Events) > 0 andalso lists:last(Events) of
#{measurements := #{gauge_set := ExpectedValue}} ->
ok;
#{measurements := #{gauge_set := Value}} ->
ct:fail(
"gauge ~p didn't reach expected value ~p; last value: ~p",
[GaugeName, ExpectedValue, Value]
);
false ->
ct:pal("no ~p gauge events received!", [GaugeName])
end.
receive_all_events(EventName, Timeout) ->
receive_all_events(EventName, Timeout, []).
receive_all_events(EventName, Timeout, Acc) ->
receive
{telemetry, #{name := [_, _, EventName]} = Event} ->
receive_all_events(EventName, Timeout, [Event | Acc])
after Timeout ->
lists:reverse(Acc)
end.
wait_telemetry_event(EventName) ->
wait_telemetry_event(EventName, #{timeout => 5_000, n_events => 1}).
wait_telemetry_event(
EventName,
Opts0
) ->
DefaultOpts = #{timeout => 5_000, n_events => 1},
#{timeout := Timeout, n_events := NEvents} = maps:merge(DefaultOpts, Opts0),
wait_n_events(NEvents, Timeout, EventName).
wait_n_events(NEvents, _Timeout, _EventName) when NEvents =< 0 ->
ok;
wait_n_events(NEvents, Timeout, EventName) ->
TelemetryTable = get({?MODULE, telemetry_table}),
receive
{telemetry, #{name := [_, _, EventName]}} ->
wait_n_events(NEvents - 1, Timeout, EventName)
after Timeout ->
RecordedEvents = ets:tab2list(TelemetryTable),
ct:pal("recorded events: ~p", [RecordedEvents]),
error({timeout_waiting_for_telemetry, EventName})
end.
assert_sync_retry_fail_then_succeed_inflight(Trace) ->
ct:pal(" ~p", [Trace]),
?assert(

View File

@ -350,10 +350,12 @@ service_account_json(PrivateKeyPEM) ->
metrics_mapping() ->
#{
dropped => fun emqx_resource_metrics:dropped_get/1,
dropped_expired => fun emqx_resource_metrics:dropped_expired_get/1,
dropped_other => fun emqx_resource_metrics:dropped_other_get/1,
dropped_queue_full => fun emqx_resource_metrics:dropped_queue_full_get/1,
dropped_resource_not_found => fun emqx_resource_metrics:dropped_resource_not_found_get/1,
dropped_resource_stopped => fun emqx_resource_metrics:dropped_resource_stopped_get/1,
late_reply => fun emqx_resource_metrics:late_reply_get/1,
failed => fun emqx_resource_metrics:failed_get/1,
inflight => fun emqx_resource_metrics:inflight_get/1,
matched => fun emqx_resource_metrics:matched_get/1,
@ -1117,9 +1119,6 @@ do_econnrefused_or_timeout_test(Config, Error) ->
CurrentMetrics
);
{timeout, async} ->
wait_telemetry_event(TelemetryTable, success, ResourceId, #{
timeout => 10_000, n_events => 2
}),
wait_until_gauge_is(inflight, 0, _Timeout = 400),
wait_until_gauge_is(queuing, 0, _Timeout = 400),
assert_metrics(
@ -1130,7 +1129,8 @@ do_econnrefused_or_timeout_test(Config, Error) ->
matched => 2,
queuing => 0,
retried => 0,
success => 2
success => 0,
late_reply => 2
},
ResourceId
);

View File

@ -58,7 +58,6 @@
%% emqx_resource API
%%-------------------------------------------------------------------------------------------------
%% TODO: check
is_buffer_supported() -> false.
callback_mode() -> async_if_possible.