feat(buffer_worker): also use the inflight table for sync requests
Related: https://emqx.atlassian.net/browse/EMQX-8692 This should also correctly account for `retried.*` metrics for sync requests. Also fixes cases where race conditions for retrying async requests could potentially lead to inconsistent metrics. Fixes more cases where a stale reference to `replayq` was being held accidentally after a `pop`.
This commit is contained in:
parent
c89114b0e3
commit
80976708e8
|
@ -114,10 +114,13 @@ simple_sync_query(Id, Request) ->
|
||||||
%% would mess up the metrics anyway. `undefined' is ignored by
|
%% would mess up the metrics anyway. `undefined' is ignored by
|
||||||
%% `emqx_resource_metrics:*_shift/3'.
|
%% `emqx_resource_metrics:*_shift/3'.
|
||||||
Index = undefined,
|
Index = undefined,
|
||||||
QueryOpts = #{},
|
QueryOpts = #{perform_inflight_capacity_check => false},
|
||||||
emqx_resource_metrics:matched_inc(Id),
|
emqx_resource_metrics:matched_inc(Id),
|
||||||
Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), QueryOpts),
|
Ref = make_message_ref(),
|
||||||
_ = handle_query_result(Id, Result, false, false),
|
Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts),
|
||||||
|
HasBeenSent = false,
|
||||||
|
BlockWorker = false,
|
||||||
|
_ = handle_query_result(Id, Result, HasBeenSent, BlockWorker),
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
-spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
|
-spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
|
||||||
|
@ -129,10 +132,13 @@ simple_async_query(Id, Request, ReplyFun) ->
|
||||||
%% would mess up the metrics anyway. `undefined' is ignored by
|
%% would mess up the metrics anyway. `undefined' is ignored by
|
||||||
%% `emqx_resource_metrics:*_shift/3'.
|
%% `emqx_resource_metrics:*_shift/3'.
|
||||||
Index = undefined,
|
Index = undefined,
|
||||||
QueryOpts = #{},
|
QueryOpts = #{perform_inflight_capacity_check => false},
|
||||||
emqx_resource_metrics:matched_inc(Id),
|
emqx_resource_metrics:matched_inc(Id),
|
||||||
Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), QueryOpts),
|
Ref = make_message_ref(),
|
||||||
_ = handle_query_result(Id, Result, false, false),
|
Result = call_query(async, Id, Index, Ref, ?QUERY(ReplyFun, Request, false), QueryOpts),
|
||||||
|
HasBeenSent = false,
|
||||||
|
BlockWorker = false,
|
||||||
|
_ = handle_query_result(Id, Result, HasBeenSent, BlockWorker),
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
-spec block(pid() | atom()) -> ok.
|
-spec block(pid() | atom()) -> ok.
|
||||||
|
@ -313,16 +319,27 @@ retry_queue(
|
||||||
empty ->
|
empty ->
|
||||||
{next_state, running, Data0};
|
{next_state, running, Data0};
|
||||||
{Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} ->
|
{Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} ->
|
||||||
|
Data = Data0#{queue := Q1},
|
||||||
QueryOpts = #{inflight_name => Name},
|
QueryOpts = #{inflight_name => Name},
|
||||||
Result = call_query(configured, Id, Index, Query, QueryOpts),
|
Ref = make_message_ref(),
|
||||||
|
Result = call_query(configured, Id, Index, Ref, Query, QueryOpts),
|
||||||
Reply = ?REPLY(undefined, Request, HasBeenSent, Result),
|
Reply = ?REPLY(undefined, Request, HasBeenSent, Result),
|
||||||
case reply_caller(Id, Reply) of
|
case reply_caller(Id, Reply) of
|
||||||
true ->
|
true ->
|
||||||
{keep_state, Data0, {state_timeout, ResumeT, resume}};
|
%% Still failed, but now it's in the inflight
|
||||||
|
%% table and marked as sent, except if the result
|
||||||
|
%% says inflight is full. In this case, we must
|
||||||
|
%% ensure it's indeed in the inflight table or
|
||||||
|
%% risk lose it.
|
||||||
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
|
is_inflight_full_result(Result) andalso
|
||||||
|
inflight_append(Name, Ref, Query, Id, Index),
|
||||||
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||||
|
{keep_state, Data, {state_timeout, ResumeT, resume}};
|
||||||
false ->
|
false ->
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
|
is_async(Id) orelse inflight_drop(Name, Ref, Id, Index),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||||
Data = Data0#{queue := Q1},
|
|
||||||
retry_queue(Data)
|
retry_queue(Data)
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
@ -341,8 +358,10 @@ retry_queue(
|
||||||
empty ->
|
empty ->
|
||||||
{next_state, running, Data0};
|
{next_state, running, Data0};
|
||||||
{Q1, QAckRef, Batch0} ->
|
{Q1, QAckRef, Batch0} ->
|
||||||
|
Data = Data0#{queue := Q1},
|
||||||
QueryOpts = #{inflight_name => Name},
|
QueryOpts = #{inflight_name => Name},
|
||||||
Result = call_query(configured, Id, Index, Batch0, QueryOpts),
|
Ref = make_message_ref(),
|
||||||
|
Result = call_query(configured, Id, Index, Ref, Batch0, QueryOpts),
|
||||||
%% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue,
|
%% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue,
|
||||||
%% we now change the 'from' field to 'undefined' so it will not reply the caller again.
|
%% we now change the 'from' field to 'undefined' so it will not reply the caller again.
|
||||||
Batch = [
|
Batch = [
|
||||||
|
@ -352,12 +371,21 @@ retry_queue(
|
||||||
case batch_reply_caller(Id, Result, Batch) of
|
case batch_reply_caller(Id, Result, Batch) of
|
||||||
true ->
|
true ->
|
||||||
?tp(resource_worker_retry_queue_batch_failed, #{batch => Batch}),
|
?tp(resource_worker_retry_queue_batch_failed, #{batch => Batch}),
|
||||||
{keep_state, Data0, {state_timeout, ResumeT, resume}};
|
%% Still failed, but now it's in the inflight
|
||||||
|
%% table and marked as sent, except if the result
|
||||||
|
%% says inflight is full. In this case, we must
|
||||||
|
%% ensure it's indeed in the inflight table or
|
||||||
|
%% risk lose it.
|
||||||
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
|
is_inflight_full_result(Result) andalso
|
||||||
|
inflight_append(Name, Ref, Batch, Id, Index),
|
||||||
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||||
|
{keep_state, Data, {state_timeout, ResumeT, resume}};
|
||||||
false ->
|
false ->
|
||||||
?tp(resource_worker_retry_queue_batch_succeeded, #{batch => Batch}),
|
?tp(resource_worker_retry_queue_batch_succeeded, #{batch => Batch}),
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
|
is_async(Id) orelse inflight_drop(Name, Ref, Id, Index),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||||
Data = Data0#{queue := Q1},
|
|
||||||
retry_queue(Data)
|
retry_queue(Data)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
@ -372,15 +400,25 @@ retry_inflight_sync(
|
||||||
QueryOpts = #{},
|
QueryOpts = #{},
|
||||||
%% if we are retrying an inflight query, it has been sent
|
%% if we are retrying an inflight query, it has been sent
|
||||||
HasBeenSent = true,
|
HasBeenSent = true,
|
||||||
Result = call_query(sync, Id, Index, QueryOrBatch, QueryOpts),
|
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
|
||||||
BlockWorker = false,
|
BlockWorker = false,
|
||||||
case handle_query_result(Id, Result, HasBeenSent, BlockWorker) of
|
case handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker) of
|
||||||
%% Send failed because resource is down
|
%% Send failed because resource is down
|
||||||
true ->
|
{true, PostFn} ->
|
||||||
|
PostFn(),
|
||||||
|
?tp(resource_worker_retry_inflight_failed, #{query_or_batch => QueryOrBatch}),
|
||||||
{keep_state, Data0, {state_timeout, ResumeT, resume}};
|
{keep_state, Data0, {state_timeout, ResumeT, resume}};
|
||||||
%% Send ok or failed but the resource is working
|
%% Send ok or failed but the resource is working
|
||||||
false ->
|
{false, PostFn} ->
|
||||||
inflight_drop(Name, Ref, Id, Index),
|
IsDropped = inflight_drop(Name, Ref, Id, Index),
|
||||||
|
%% we need to defer bumping the counters after
|
||||||
|
%% `inflight_drop' to avoid the race condition when an
|
||||||
|
%% inflight request might get completed concurrently with
|
||||||
|
%% the retry, bumping them twice. Since both inflight
|
||||||
|
%% requests (repeated and original) have the safe `Ref',
|
||||||
|
%% we bump the counter when removing it from the table.
|
||||||
|
IsDropped andalso PostFn(),
|
||||||
|
?tp(resource_worker_retry_inflight_succeeded, #{query_or_batch => QueryOrBatch}),
|
||||||
do_resume(Data0)
|
do_resume(Data0)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -442,10 +480,12 @@ flush(Data0) ->
|
||||||
%% `nack' a `pop'.
|
%% `nack' a `pop'.
|
||||||
%% Maybe we could re-open the queue?
|
%% Maybe we could re-open the queue?
|
||||||
Data1 = Data0#{queue := Q1},
|
Data1 = Data0#{queue := Q1},
|
||||||
|
Ref = make_message_ref(),
|
||||||
do_flush(Data1, #{
|
do_flush(Data1, #{
|
||||||
new_queue => Q1,
|
new_queue => Q1,
|
||||||
is_batch => IsBatch,
|
is_batch => IsBatch,
|
||||||
batch => Batch,
|
batch => Batch,
|
||||||
|
ref => Ref,
|
||||||
ack_ref => QAckRef
|
ack_ref => QAckRef
|
||||||
})
|
})
|
||||||
end.
|
end.
|
||||||
|
@ -456,7 +496,16 @@ flush(Data0) ->
|
||||||
ack_ref := replayq:ack_ref()
|
ack_ref := replayq:ack_ref()
|
||||||
}) ->
|
}) ->
|
||||||
gen_statem:event_handler_result(state(), data()).
|
gen_statem:event_handler_result(state(), data()).
|
||||||
do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) ->
|
do_flush(
|
||||||
|
Data0,
|
||||||
|
#{
|
||||||
|
is_batch := false,
|
||||||
|
batch := Batch,
|
||||||
|
ref := Ref,
|
||||||
|
ack_ref := QAckRef,
|
||||||
|
new_queue := Q1
|
||||||
|
}
|
||||||
|
) ->
|
||||||
#{
|
#{
|
||||||
id := Id,
|
id := Id,
|
||||||
index := Index,
|
index := Index,
|
||||||
|
@ -465,37 +514,23 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que
|
||||||
%% unwrap when not batching (i.e., batch size == 1)
|
%% unwrap when not batching (i.e., batch size == 1)
|
||||||
[?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
|
[?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
|
||||||
QueryOpts = #{inflight_name => Name},
|
QueryOpts = #{inflight_name => Name},
|
||||||
Result = call_query(configured, Id, Index, Request, QueryOpts),
|
Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
|
||||||
IsAsync = is_async(Id),
|
|
||||||
Data1 = cancel_flush_timer(Data0),
|
Data1 = cancel_flush_timer(Data0),
|
||||||
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
|
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
|
||||||
case {reply_caller(Id, Reply), IsAsync} of
|
case reply_caller(Id, Reply) of
|
||||||
%% failed and is not async; keep the request in the queue to
|
%% Failed; remove the request from the queue, as we cannot pop
|
||||||
%% be retried
|
%% from it again. But we must ensure it's in the inflight
|
||||||
{true, false} ->
|
%% table, even if it's full, so we don't lose the request.
|
||||||
%% Note: currently, we cannot safely pop an item from
|
%% And only in that case.
|
||||||
%% `replayq', keep the old reference to the queue and
|
true ->
|
||||||
%% later try to append new items to the old ref: by
|
|
||||||
%% popping an item, we may cause the side effect of
|
|
||||||
%% closing an open segment and opening a new one, and the
|
|
||||||
%% later `append' with the old file descriptor will fail
|
|
||||||
%% with `einval' because it has been closed. So we are
|
|
||||||
%% forced to re-append the item, changing the order of
|
|
||||||
%% requests...
|
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
|
||||||
SentBatch = mark_as_sent(Batch),
|
|
||||||
Q2 = append_queue(Id, Index, Q1, SentBatch),
|
|
||||||
Data2 = Data1#{queue := Q2},
|
|
||||||
{next_state, blocked, Data2};
|
|
||||||
%% failed and is async; remove the request from the queue, as
|
|
||||||
%% it is already in inflight table
|
|
||||||
{true, true} ->
|
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
|
is_inflight_full_result(Result) andalso inflight_append(Name, Ref, Request, Id, Index),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||||
{next_state, blocked, Data1};
|
{next_state, blocked, Data1};
|
||||||
%% success; just ack
|
%% Success; just ack.
|
||||||
{false, _} ->
|
false ->
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
|
is_async(Id) orelse inflight_drop(Name, Ref, Id, Index),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||||
case replayq:count(Q1) > 0 of
|
case replayq:count(Q1) > 0 of
|
||||||
true ->
|
true ->
|
||||||
|
@ -504,7 +539,13 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que
|
||||||
{keep_state, Data1}
|
{keep_state, Data1}
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) ->
|
do_flush(Data0, #{
|
||||||
|
is_batch := true,
|
||||||
|
batch := Batch,
|
||||||
|
ref := Ref,
|
||||||
|
ack_ref := QAckRef,
|
||||||
|
new_queue := Q1
|
||||||
|
}) ->
|
||||||
#{
|
#{
|
||||||
id := Id,
|
id := Id,
|
||||||
index := Index,
|
index := Index,
|
||||||
|
@ -512,36 +553,22 @@ do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queu
|
||||||
name := Name
|
name := Name
|
||||||
} = Data0,
|
} = Data0,
|
||||||
QueryOpts = #{inflight_name => Name},
|
QueryOpts = #{inflight_name => Name},
|
||||||
Result = call_query(configured, Id, Index, Batch, QueryOpts),
|
Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts),
|
||||||
IsAsync = is_async(Id),
|
|
||||||
Data1 = cancel_flush_timer(Data0),
|
Data1 = cancel_flush_timer(Data0),
|
||||||
case {batch_reply_caller(Id, Result, Batch), IsAsync} of
|
case batch_reply_caller(Id, Result, Batch) of
|
||||||
%% failed and is not async; keep the request in the queue to
|
%% Failed; remove the request from the queue, as we cannot pop
|
||||||
%% be retried
|
%% from it again. But we must ensure it's in the inflight
|
||||||
{true, false} ->
|
%% table, even if it's full, so we don't lose the request.
|
||||||
%% Note: currently, we cannot safely pop an item from
|
%% And only in that case.
|
||||||
%% `replayq', keep the old reference to the queue and
|
true ->
|
||||||
%% later try to append new items to the old ref: by
|
|
||||||
%% popping an item, we may cause the side effect of
|
|
||||||
%% closing an open segment and opening a new one, and the
|
|
||||||
%% later `append' with the old file descriptor will fail
|
|
||||||
%% with `einval' because it has been closed. So we are
|
|
||||||
%% forced to re-append the item, changing the order of
|
|
||||||
%% requests...
|
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
|
||||||
SentBatch = mark_as_sent(Batch),
|
|
||||||
Q2 = append_queue(Id, Index, Q1, SentBatch),
|
|
||||||
Data2 = Data1#{queue := Q2},
|
|
||||||
{next_state, blocked, Data2};
|
|
||||||
%% failed and is async; remove the request from the queue, as
|
|
||||||
%% it is already in inflight table
|
|
||||||
{true, true} ->
|
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
|
is_inflight_full_result(Result) andalso inflight_append(Name, Ref, Batch, Id, Index),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||||
{next_state, blocked, Data1};
|
{next_state, blocked, Data1};
|
||||||
%% success; just ack
|
%% Success; just ack.
|
||||||
{false, _} ->
|
false ->
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
|
is_async(Id) orelse inflight_drop(Name, Ref, Id, Index),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
|
||||||
CurrentCount = replayq:count(Q1),
|
CurrentCount = replayq:count(Q1),
|
||||||
case {CurrentCount > 0, CurrentCount >= BatchSize} of
|
case {CurrentCount > 0, CurrentCount >= BatchSize} of
|
||||||
|
@ -556,23 +583,34 @@ do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queu
|
||||||
end.
|
end.
|
||||||
|
|
||||||
batch_reply_caller(Id, BatchResult, Batch) ->
|
batch_reply_caller(Id, BatchResult, Batch) ->
|
||||||
|
{ShouldBlock, PostFns} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch),
|
||||||
|
lists:foreach(fun(F) -> F() end, PostFns),
|
||||||
|
ShouldBlock.
|
||||||
|
|
||||||
|
batch_reply_caller_defer_metrics(Id, BatchResult, Batch) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(Reply, BlockWorker) ->
|
fun(Reply, {BlockWorker, PostFns}) ->
|
||||||
reply_caller(Id, Reply, BlockWorker)
|
{ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply, BlockWorker),
|
||||||
|
{ShouldBlock, [PostFn | PostFns]}
|
||||||
end,
|
end,
|
||||||
false,
|
{false, []},
|
||||||
%% the `Mod:on_batch_query/3` returns a single result for a batch,
|
%% the `Mod:on_batch_query/3` returns a single result for a batch,
|
||||||
%% so we need to expand
|
%% so we need to expand
|
||||||
?EXPAND(BatchResult, Batch)
|
?EXPAND(BatchResult, Batch)
|
||||||
).
|
).
|
||||||
|
|
||||||
reply_caller(Id, Reply) ->
|
reply_caller(Id, Reply) ->
|
||||||
BlockWorker = false,
|
{ShouldBlock, PostFn} = reply_caller_defer_metrics(Id, Reply),
|
||||||
reply_caller(Id, Reply, BlockWorker).
|
PostFn(),
|
||||||
|
ShouldBlock.
|
||||||
|
|
||||||
reply_caller(Id, ?REPLY(undefined, _, HasBeenSent, Result), BlockWorker) ->
|
reply_caller_defer_metrics(Id, Reply) ->
|
||||||
handle_query_result(Id, Result, HasBeenSent, BlockWorker);
|
BlockWorker = false,
|
||||||
reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) when
|
reply_caller_defer_metrics(Id, Reply, BlockWorker).
|
||||||
|
|
||||||
|
reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), BlockWorker) ->
|
||||||
|
handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker);
|
||||||
|
reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) when
|
||||||
is_function(ReplyFun)
|
is_function(ReplyFun)
|
||||||
->
|
->
|
||||||
_ =
|
_ =
|
||||||
|
@ -580,55 +618,89 @@ reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker)
|
||||||
{async_return, _} -> no_reply_for_now;
|
{async_return, _} -> no_reply_for_now;
|
||||||
_ -> apply(ReplyFun, Args ++ [Result])
|
_ -> apply(ReplyFun, Args ++ [Result])
|
||||||
end,
|
end,
|
||||||
handle_query_result(Id, Result, HasBeenSent, BlockWorker);
|
handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker);
|
||||||
reply_caller(Id, ?REPLY(From, _, HasBeenSent, Result), BlockWorker) ->
|
reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), BlockWorker) ->
|
||||||
gen_statem:reply(From, Result),
|
gen_statem:reply(From, Result),
|
||||||
handle_query_result(Id, Result, HasBeenSent, BlockWorker).
|
handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker).
|
||||||
|
|
||||||
handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent, BlockWorker) ->
|
handle_query_result(Id, Result, HasBeenSent, BlockWorker) ->
|
||||||
|
{ShouldBlock, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker),
|
||||||
|
PostFn(),
|
||||||
|
ShouldBlock.
|
||||||
|
|
||||||
|
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent, BlockWorker) ->
|
||||||
|
PostFn = fun() ->
|
||||||
?SLOG(error, #{msg => resource_exception, info => Msg}),
|
?SLOG(error, #{msg => resource_exception, info => Msg}),
|
||||||
inc_sent_failed(Id, HasBeenSent),
|
inc_sent_failed(Id, HasBeenSent),
|
||||||
BlockWorker;
|
ok
|
||||||
handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _) when
|
end,
|
||||||
|
{BlockWorker, PostFn};
|
||||||
|
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _) when
|
||||||
NotWorking == not_connected; NotWorking == blocked
|
NotWorking == not_connected; NotWorking == blocked
|
||||||
->
|
->
|
||||||
true;
|
{true, fun() -> ok end};
|
||||||
handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, BlockWorker) ->
|
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, BlockWorker) ->
|
||||||
|
PostFn = fun() ->
|
||||||
?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
|
?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}),
|
||||||
emqx_resource_metrics:dropped_resource_not_found_inc(Id),
|
emqx_resource_metrics:dropped_resource_not_found_inc(Id),
|
||||||
BlockWorker;
|
ok
|
||||||
handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, BlockWorker) ->
|
end,
|
||||||
|
{BlockWorker, PostFn};
|
||||||
|
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, BlockWorker) ->
|
||||||
|
PostFn = fun() ->
|
||||||
?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
|
?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}),
|
||||||
emqx_resource_metrics:dropped_resource_stopped_inc(Id),
|
emqx_resource_metrics:dropped_resource_stopped_inc(Id),
|
||||||
BlockWorker;
|
ok
|
||||||
handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, BlockWorker) ->
|
end,
|
||||||
|
{BlockWorker, PostFn};
|
||||||
|
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, BlockWorker) ->
|
||||||
|
PostFn = fun() ->
|
||||||
?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
|
?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
|
||||||
emqx_resource_metrics:dropped_other_inc(Id),
|
emqx_resource_metrics:dropped_other_inc(Id),
|
||||||
BlockWorker;
|
ok
|
||||||
handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, _BlockWorker) ->
|
end,
|
||||||
|
{BlockWorker, PostFn};
|
||||||
|
handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, _BlockWorker) ->
|
||||||
%% the message will be queued in replayq or inflight window,
|
%% the message will be queued in replayq or inflight window,
|
||||||
%% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
|
%% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
|
||||||
%% sent this message.
|
%% sent this message.
|
||||||
|
PostFn = fun() ->
|
||||||
?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
|
?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
|
||||||
true;
|
ok
|
||||||
handle_query_result(Id, {error, Reason}, HasBeenSent, BlockWorker) ->
|
end,
|
||||||
|
{true, PostFn};
|
||||||
|
handle_query_result_pure(Id, {error, Reason}, HasBeenSent, BlockWorker) ->
|
||||||
|
PostFn = fun() ->
|
||||||
?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
|
?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
|
||||||
inc_sent_failed(Id, HasBeenSent),
|
inc_sent_failed(Id, HasBeenSent),
|
||||||
BlockWorker;
|
ok
|
||||||
handle_query_result(_Id, {async_return, inflight_full}, _HasBeenSent, _BlockWorker) ->
|
end,
|
||||||
true;
|
{BlockWorker, PostFn};
|
||||||
handle_query_result(Id, {async_return, {error, Msg}}, HasBeenSent, BlockWorker) ->
|
handle_query_result_pure(_Id, {async_return, inflight_full}, _HasBeenSent, _BlockWorker) ->
|
||||||
|
{true, fun() -> ok end};
|
||||||
|
handle_query_result_pure(Id, {async_return, {error, Msg}}, HasBeenSent, BlockWorker) ->
|
||||||
|
PostFn = fun() ->
|
||||||
?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
|
?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
|
||||||
inc_sent_failed(Id, HasBeenSent),
|
inc_sent_failed(Id, HasBeenSent),
|
||||||
BlockWorker;
|
ok
|
||||||
handle_query_result(_Id, {async_return, ok}, _HasBeenSent, BlockWorker) ->
|
end,
|
||||||
BlockWorker;
|
{BlockWorker, PostFn};
|
||||||
handle_query_result(Id, Result, HasBeenSent, BlockWorker) ->
|
handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent, BlockWorker) ->
|
||||||
|
{BlockWorker, fun() -> ok end};
|
||||||
|
handle_query_result_pure(Id, Result, HasBeenSent, BlockWorker) ->
|
||||||
|
PostFn = fun() ->
|
||||||
assert_ok_result(Result),
|
assert_ok_result(Result),
|
||||||
inc_sent_success(Id, HasBeenSent),
|
inc_sent_success(Id, HasBeenSent),
|
||||||
BlockWorker.
|
ok
|
||||||
|
end,
|
||||||
|
{BlockWorker, PostFn}.
|
||||||
|
|
||||||
call_query(QM0, Id, Index, Query, QueryOpts) ->
|
is_inflight_full_result({async_return, inflight_full}) ->
|
||||||
|
true;
|
||||||
|
is_inflight_full_result(_) ->
|
||||||
|
false.
|
||||||
|
|
||||||
|
call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
|
||||||
?tp(call_query_enter, #{id => Id, query => Query}),
|
?tp(call_query_enter, #{id => Id, query => Query}),
|
||||||
case emqx_resource_manager:ets_lookup(Id) of
|
case emqx_resource_manager:ets_lookup(Id) of
|
||||||
{ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
|
{ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
|
||||||
|
@ -638,7 +710,7 @@ call_query(QM0, Id, Index, Query, QueryOpts) ->
|
||||||
_ -> QM0
|
_ -> QM0
|
||||||
end,
|
end,
|
||||||
CM = maps:get(callback_mode, Data),
|
CM = maps:get(callback_mode, Data),
|
||||||
apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Query, ResSt, QueryOpts);
|
apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Ref, Query, ResSt, QueryOpts);
|
||||||
{ok, _Group, #{status := stopped}} ->
|
{ok, _Group, #{status := stopped}} ->
|
||||||
?RESOURCE_ERROR(stopped, "resource stopped or disabled");
|
?RESOURCE_ERROR(stopped, "resource stopped or disabled");
|
||||||
{ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
|
{ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
|
||||||
|
@ -665,20 +737,34 @@ call_query(QM0, Id, Index, Query, QueryOpts) ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _QueryOpts) ->
|
apply_query_fun(sync, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
|
||||||
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
|
?tp(call_query, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
|
||||||
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
|
Name = maps:get(inflight_name, QueryOpts, undefined),
|
||||||
apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
|
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
|
||||||
|
?APPLY_RESOURCE(
|
||||||
|
call_query,
|
||||||
|
case PerformInflightCapacityCheck andalso is_inflight_full(Name) of
|
||||||
|
true ->
|
||||||
|
%% should be kept in the inflight table and retried
|
||||||
|
%% when resuming.
|
||||||
|
{async_return, inflight_full};
|
||||||
|
false ->
|
||||||
|
ok = inflight_append(Name, Ref, Query, Id, Index),
|
||||||
|
Mod:on_query(Id, Request, ResSt)
|
||||||
|
end,
|
||||||
|
Request
|
||||||
|
);
|
||||||
|
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
|
||||||
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
|
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
|
||||||
Name = maps:get(inflight_name, QueryOpts, undefined),
|
Name = maps:get(inflight_name, QueryOpts, undefined),
|
||||||
|
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
|
||||||
?APPLY_RESOURCE(
|
?APPLY_RESOURCE(
|
||||||
call_query_async,
|
call_query_async,
|
||||||
case is_inflight_full(Name) of
|
case PerformInflightCapacityCheck andalso is_inflight_full(Name) of
|
||||||
true ->
|
true ->
|
||||||
{async_return, inflight_full};
|
{async_return, inflight_full};
|
||||||
false ->
|
false ->
|
||||||
ReplyFun = fun ?MODULE:reply_after_query/7,
|
ReplyFun = fun ?MODULE:reply_after_query/7,
|
||||||
Ref = make_message_ref(),
|
|
||||||
Args = [self(), Id, Index, Name, Ref, Query],
|
Args = [self(), Id, Index, Name, Ref, Query],
|
||||||
ok = inflight_append(Name, Ref, Query, Id, Index),
|
ok = inflight_append(Name, Ref, Query, Id, Index),
|
||||||
Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
|
Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
|
||||||
|
@ -686,21 +772,35 @@ apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, Que
|
||||||
end,
|
end,
|
||||||
Request
|
Request
|
||||||
);
|
);
|
||||||
apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _QueryOpts) ->
|
apply_query_fun(sync, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
|
||||||
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
|
?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
|
||||||
|
Name = maps:get(inflight_name, QueryOpts, undefined),
|
||||||
|
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
|
||||||
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
|
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
|
||||||
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
|
?APPLY_RESOURCE(
|
||||||
apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
|
call_batch_query,
|
||||||
|
case PerformInflightCapacityCheck andalso is_inflight_full(Name) of
|
||||||
|
true ->
|
||||||
|
%% should be kept in the inflight table and retried
|
||||||
|
%% when resuming.
|
||||||
|
{async_return, inflight_full};
|
||||||
|
false ->
|
||||||
|
ok = inflight_append(Name, Ref, Batch, Id, Index),
|
||||||
|
Mod:on_batch_query(Id, Requests, ResSt)
|
||||||
|
end,
|
||||||
|
Batch
|
||||||
|
);
|
||||||
|
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
|
||||||
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
|
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
|
||||||
Name = maps:get(inflight_name, QueryOpts, undefined),
|
Name = maps:get(inflight_name, QueryOpts, undefined),
|
||||||
|
PerformInflightCapacityCheck = maps:get(perform_inflight_capacity_check, QueryOpts, true),
|
||||||
?APPLY_RESOURCE(
|
?APPLY_RESOURCE(
|
||||||
call_batch_query_async,
|
call_batch_query_async,
|
||||||
case is_inflight_full(Name) of
|
case PerformInflightCapacityCheck andalso is_inflight_full(Name) of
|
||||||
true ->
|
true ->
|
||||||
{async_return, inflight_full};
|
{async_return, inflight_full};
|
||||||
false ->
|
false ->
|
||||||
ReplyFun = fun ?MODULE:batch_reply_after_query/7,
|
ReplyFun = fun ?MODULE:batch_reply_after_query/7,
|
||||||
Ref = make_message_ref(),
|
|
||||||
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]},
|
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]},
|
||||||
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
|
Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
|
||||||
ok = inflight_append(Name, Ref, Batch, Id, Index),
|
ok = inflight_append(Name, Ref, Batch, Id, Index),
|
||||||
|
@ -714,29 +814,36 @@ reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasBeenSent),
|
||||||
%% NOTE: 'inflight' is the count of messages that were sent async
|
%% NOTE: 'inflight' is the count of messages that were sent async
|
||||||
%% but received no ACK, NOT the number of messages queued in the
|
%% but received no ACK, NOT the number of messages queued in the
|
||||||
%% inflight window.
|
%% inflight window.
|
||||||
case reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Result)) of
|
case reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)) of
|
||||||
true ->
|
{true, PostFn} ->
|
||||||
|
PostFn(),
|
||||||
?MODULE:block(Pid);
|
?MODULE:block(Pid);
|
||||||
false ->
|
{false, PostFn} ->
|
||||||
drop_inflight_and_resume(Pid, Name, Ref, Id, Index)
|
IsDropped = drop_inflight_and_resume(Pid, Name, Ref, Id, Index),
|
||||||
|
IsDropped andalso PostFn(),
|
||||||
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) ->
|
batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) ->
|
||||||
%% NOTE: 'inflight' is the count of messages that were sent async
|
%% NOTE: 'inflight' is the count of messages that were sent async
|
||||||
%% but received no ACK, NOT the number of messages queued in the
|
%% but received no ACK, NOT the number of messages queued in the
|
||||||
%% inflight window.
|
%% inflight window.
|
||||||
case batch_reply_caller(Id, Result, Batch) of
|
case batch_reply_caller_defer_metrics(Id, Result, Batch) of
|
||||||
true ->
|
{true, PostFns} ->
|
||||||
|
lists:foreach(fun(F) -> F() end, PostFns),
|
||||||
?MODULE:block(Pid);
|
?MODULE:block(Pid);
|
||||||
false ->
|
{false, PostFns} ->
|
||||||
drop_inflight_and_resume(Pid, Name, Ref, Id, Index)
|
IsDropped = drop_inflight_and_resume(Pid, Name, Ref, Id, Index),
|
||||||
|
IsDropped andalso lists:foreach(fun(F) -> F() end, PostFns),
|
||||||
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
drop_inflight_and_resume(Pid, Name, Ref, Id, Index) ->
|
drop_inflight_and_resume(Pid, Name, Ref, Id, Index) ->
|
||||||
case is_inflight_full(Name) of
|
case is_inflight_full(Name) of
|
||||||
true ->
|
true ->
|
||||||
inflight_drop(Name, Ref, Id, Index),
|
IsDropped = inflight_drop(Name, Ref, Id, Index),
|
||||||
?MODULE:resume(Pid);
|
?MODULE:resume(Pid),
|
||||||
|
IsDropped;
|
||||||
false ->
|
false ->
|
||||||
inflight_drop(Name, Ref, Id, Index)
|
inflight_drop(Name, Ref, Id, Index)
|
||||||
end.
|
end.
|
||||||
|
@ -836,16 +943,18 @@ inflight_append(undefined, _Ref, _Query, _Id, _Index) ->
|
||||||
ok;
|
ok;
|
||||||
inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) ->
|
inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) ->
|
||||||
Batch = mark_as_sent(Batch0),
|
Batch = mark_as_sent(Batch0),
|
||||||
ets:insert(Name, {Ref, Batch}),
|
IsNew = ets:insert_new(Name, {Ref, Batch}),
|
||||||
BatchSize = length(Batch),
|
BatchSize = length(Batch),
|
||||||
ets:update_counter(Name, ?SIZE_REF, {2, BatchSize}),
|
IsNew andalso ets:update_counter(Name, ?SIZE_REF, {2, BatchSize}),
|
||||||
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
|
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
|
||||||
|
?tp(resource_worker_appended_to_inflight, #{batch => Batch, is_new => IsNew}),
|
||||||
ok;
|
ok;
|
||||||
inflight_append(Name, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) ->
|
inflight_append(Name, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) ->
|
||||||
Query = mark_as_sent(Query0),
|
Query = mark_as_sent(Query0),
|
||||||
ets:insert(Name, {Ref, Query}),
|
IsNew = ets:insert_new(Name, {Ref, Query}),
|
||||||
ets:update_counter(Name, ?SIZE_REF, {2, 1}),
|
IsNew andalso ets:update_counter(Name, ?SIZE_REF, {2, 1}),
|
||||||
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
|
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
|
||||||
|
?tp(resource_worker_appended_to_inflight, #{query => Query, is_new => IsNew}),
|
||||||
ok;
|
ok;
|
||||||
inflight_append(Name, Ref, Data, _Id, _Index) ->
|
inflight_append(Name, Ref, Data, _Id, _Index) ->
|
||||||
ets:insert(Name, {Ref, Data}),
|
ets:insert(Name, {Ref, Data}),
|
||||||
|
@ -853,8 +962,8 @@ inflight_append(Name, Ref, Data, _Id, _Index) ->
|
||||||
%% the inflight metric.
|
%% the inflight metric.
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
inflight_drop(undefined, _, _Id, _Index) ->
|
inflight_drop(undefined, _Ref, _Id, _Index) ->
|
||||||
ok;
|
false;
|
||||||
inflight_drop(Name, Ref, Id, Index) ->
|
inflight_drop(Name, Ref, Id, Index) ->
|
||||||
Count =
|
Count =
|
||||||
case ets:take(Name, Ref) of
|
case ets:take(Name, Ref) of
|
||||||
|
@ -862,9 +971,10 @@ inflight_drop(Name, Ref, Id, Index) ->
|
||||||
[{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch);
|
[{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch);
|
||||||
_ -> 0
|
_ -> 0
|
||||||
end,
|
end,
|
||||||
Count > 0 andalso ets:update_counter(Name, ?SIZE_REF, {2, -Count, 0, 0}),
|
IsDropped = Count > 0,
|
||||||
|
IsDropped andalso ets:update_counter(Name, ?SIZE_REF, {2, -Count, 0, 0}),
|
||||||
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
|
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
|
||||||
ok.
|
IsDropped.
|
||||||
|
|
||||||
%%==============================================================================
|
%%==============================================================================
|
||||||
|
|
||||||
|
|
|
@ -411,8 +411,21 @@ t_query_counter_async_inflight(_) ->
|
||||||
|
|
||||||
%% send async query to make the inflight window full
|
%% send async query to make the inflight window full
|
||||||
?check_trace(
|
?check_trace(
|
||||||
?TRACE_OPTS,
|
begin
|
||||||
|
{ok, SRef} = snabbkaffe:subscribe(
|
||||||
|
?match_event(
|
||||||
|
#{
|
||||||
|
?snk_kind := resource_worker_appended_to_inflight,
|
||||||
|
is_new := true
|
||||||
|
}
|
||||||
|
),
|
||||||
|
WindowSize,
|
||||||
|
_Timeout = 5_000
|
||||||
|
),
|
||||||
inc_counter_in_parallel(WindowSize, ReqOpts),
|
inc_counter_in_parallel(WindowSize, ReqOpts),
|
||||||
|
{ok, _} = snabbkaffe:receive_events(SRef),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
fun(Trace) ->
|
fun(Trace) ->
|
||||||
QueryTrace = ?of_kind(call_query_async, Trace),
|
QueryTrace = ?of_kind(call_query_async, Trace),
|
||||||
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
|
?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace)
|
||||||
|
@ -455,29 +468,29 @@ t_query_counter_async_inflight(_) ->
|
||||||
%% +1 because the tmp_query above will be retried and succeed
|
%% +1 because the tmp_query above will be retried and succeed
|
||||||
%% this time.
|
%% this time.
|
||||||
WindowSize + 1,
|
WindowSize + 1,
|
||||||
_Timeout = 60_000
|
_Timeout0 = 10_000
|
||||||
),
|
),
|
||||||
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
||||||
tap_metrics(?LINE),
|
tap_metrics(?LINE),
|
||||||
{ok, _} = snabbkaffe:receive_events(SRef0),
|
{ok, _} = snabbkaffe:receive_events(SRef0),
|
||||||
|
tap_metrics(?LINE),
|
||||||
%% since the previous tmp_query was enqueued to be retried, we
|
%% since the previous tmp_query was enqueued to be retried, we
|
||||||
%% take it again from the table; this time, it should have
|
%% take it again from the table; this time, it should have
|
||||||
%% succeeded.
|
%% succeeded.
|
||||||
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
|
?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)),
|
||||||
?assertEqual(WindowSize, ets:info(Tab0, size)),
|
?assertEqual(WindowSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
|
||||||
tap_metrics(?LINE),
|
tap_metrics(?LINE),
|
||||||
|
|
||||||
%% send async query, this time everything should be ok.
|
%% send async query, this time everything should be ok.
|
||||||
Num = 10,
|
Num = 10,
|
||||||
?check_trace(
|
?check_trace(
|
||||||
?TRACE_OPTS,
|
|
||||||
begin
|
begin
|
||||||
{ok, SRef} = snabbkaffe:subscribe(
|
{ok, SRef} = snabbkaffe:subscribe(
|
||||||
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
||||||
Num,
|
Num,
|
||||||
_Timeout = 60_000
|
_Timeout0 = 10_000
|
||||||
),
|
),
|
||||||
inc_counter_in_parallel(Num, ReqOpts),
|
inc_counter_in_parallel_increasing(Num, 1, ReqOpts),
|
||||||
{ok, _} = snabbkaffe:receive_events(SRef),
|
{ok, _} = snabbkaffe:receive_events(SRef),
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
|
@ -502,17 +515,18 @@ t_query_counter_async_inflight(_) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
%% this will block the resource_worker
|
%% this will block the resource_worker
|
||||||
ok = emqx_resource:query(?ID, {inc_counter, 1}),
|
ok = emqx_resource:query(?ID, {inc_counter, 4}),
|
||||||
|
|
||||||
Sent = WindowSize + Num + WindowSize,
|
Sent = WindowSize + Num + WindowSize,
|
||||||
{ok, SRef1} = snabbkaffe:subscribe(
|
{ok, SRef1} = snabbkaffe:subscribe(
|
||||||
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
?match_event(#{?snk_kind := connector_demo_inc_counter_async}),
|
||||||
WindowSize,
|
WindowSize,
|
||||||
_Timeout = 60_000
|
_Timeout0 = 10_000
|
||||||
),
|
),
|
||||||
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)),
|
||||||
{ok, _} = snabbkaffe:receive_events(SRef1),
|
{ok, _} = snabbkaffe:receive_events(SRef1),
|
||||||
?assertEqual(Sent, ets:info(Tab0, size)),
|
?assertEqual(Sent, ets:info(Tab0, size)),
|
||||||
|
tap_metrics(?LINE),
|
||||||
|
|
||||||
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
|
{ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter),
|
||||||
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
|
ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]),
|
||||||
|
@ -842,6 +856,8 @@ t_stop_start(_) ->
|
||||||
?assert(is_process_alive(Pid0)),
|
?assert(is_process_alive(Pid0)),
|
||||||
|
|
||||||
%% metrics are reset when recreating
|
%% metrics are reset when recreating
|
||||||
|
%% depending on timing, might show the request we just did.
|
||||||
|
ct:sleep(500),
|
||||||
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
|
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
|
||||||
|
|
||||||
ok = emqx_resource:stop(?ID),
|
ok = emqx_resource:stop(?ID),
|
||||||
|
@ -861,6 +877,7 @@ t_stop_start(_) ->
|
||||||
?assert(is_process_alive(Pid1)),
|
?assert(is_process_alive(Pid1)),
|
||||||
|
|
||||||
%% now stop while resetting the metrics
|
%% now stop while resetting the metrics
|
||||||
|
ct:sleep(500),
|
||||||
emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1),
|
emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1),
|
||||||
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4),
|
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4),
|
||||||
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
|
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
|
||||||
|
@ -1059,7 +1076,7 @@ t_retry_batch(_Config) ->
|
||||||
%% batch shall remain enqueued.
|
%% batch shall remain enqueued.
|
||||||
{ok, _} =
|
{ok, _} =
|
||||||
snabbkaffe:block_until(
|
snabbkaffe:block_until(
|
||||||
?match_n_events(2, #{?snk_kind := resource_worker_retry_queue_batch_failed}),
|
?match_n_events(2, #{?snk_kind := resource_worker_retry_inflight_failed}),
|
||||||
5_000
|
5_000
|
||||||
),
|
),
|
||||||
%% should not have increased the matched count with the retries
|
%% should not have increased the matched count with the retries
|
||||||
|
@ -1071,7 +1088,7 @@ t_retry_batch(_Config) ->
|
||||||
{ok, {ok, _}} =
|
{ok, {ok, _}} =
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
ok = emqx_resource:simple_sync_query(?ID, resume),
|
ok = emqx_resource:simple_sync_query(?ID, resume),
|
||||||
#{?snk_kind := resource_worker_retry_queue_batch_succeeded},
|
#{?snk_kind := resource_worker_retry_inflight_succeeded},
|
||||||
5_000
|
5_000
|
||||||
),
|
),
|
||||||
%% 1 more because of the `resume' call
|
%% 1 more because of the `resume' call
|
||||||
|
@ -1124,17 +1141,16 @@ t_delete_and_re_create_with_same_name(_Config) ->
|
||||||
),
|
),
|
||||||
%% pre-condition: we should have just created a new queue
|
%% pre-condition: we should have just created a new queue
|
||||||
Queuing0 = emqx_resource_metrics:queuing_get(?ID),
|
Queuing0 = emqx_resource_metrics:queuing_get(?ID),
|
||||||
|
Inflight0 = emqx_resource_metrics:inflight_get(?ID),
|
||||||
?assertEqual(0, Queuing0),
|
?assertEqual(0, Queuing0),
|
||||||
|
?assertEqual(0, Inflight0),
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
|
?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)),
|
||||||
NumRequests = 10,
|
NumRequests = 10,
|
||||||
{ok, SRef} = snabbkaffe:subscribe(
|
{ok, SRef} = snabbkaffe:subscribe(
|
||||||
?match_event(#{?snk_kind := resource_worker_appended_to_queue}),
|
?match_event(#{?snk_kind := resource_worker_appended_to_queue}),
|
||||||
%% +1 because the first request will fail,
|
NumRequests,
|
||||||
%% block the resource, and will be
|
|
||||||
%% re-appended to the queue.
|
|
||||||
NumRequests + 1,
|
|
||||||
_Timeout = 5_000
|
_Timeout = 5_000
|
||||||
),
|
),
|
||||||
%% ensure replayq offloads to disk
|
%% ensure replayq offloads to disk
|
||||||
|
@ -1153,8 +1169,11 @@ t_delete_and_re_create_with_same_name(_Config) ->
|
||||||
{ok, _} = snabbkaffe:receive_events(SRef),
|
{ok, _} = snabbkaffe:receive_events(SRef),
|
||||||
|
|
||||||
%% ensure that stuff got enqueued into disk
|
%% ensure that stuff got enqueued into disk
|
||||||
|
tap_metrics(?LINE),
|
||||||
Queuing1 = emqx_resource_metrics:queuing_get(?ID),
|
Queuing1 = emqx_resource_metrics:queuing_get(?ID),
|
||||||
?assertEqual(NumRequests, Queuing1),
|
Inflight1 = emqx_resource_metrics:inflight_get(?ID),
|
||||||
|
?assertEqual(NumRequests - 1, Queuing1),
|
||||||
|
?assertEqual(1, Inflight1),
|
||||||
|
|
||||||
%% now, we delete the resource
|
%% now, we delete the resource
|
||||||
ok = emqx_resource:remove_local(?ID),
|
ok = emqx_resource:remove_local(?ID),
|
||||||
|
@ -1182,7 +1201,9 @@ t_delete_and_re_create_with_same_name(_Config) ->
|
||||||
|
|
||||||
%% it shouldn't have anything enqueued, as it's a fresh resource
|
%% it shouldn't have anything enqueued, as it's a fresh resource
|
||||||
Queuing2 = emqx_resource_metrics:queuing_get(?ID),
|
Queuing2 = emqx_resource_metrics:queuing_get(?ID),
|
||||||
|
Inflight2 = emqx_resource_metrics:queuing_get(?ID),
|
||||||
?assertEqual(0, Queuing2),
|
?assertEqual(0, Queuing2),
|
||||||
|
?assertEqual(0, Inflight2),
|
||||||
|
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
|
@ -1219,6 +1240,29 @@ inc_counter_in_parallel(N, Opts0) ->
|
||||||
|| Pid <- Pids
|
|| Pid <- Pids
|
||||||
].
|
].
|
||||||
|
|
||||||
|
inc_counter_in_parallel_increasing(N, StartN, Opts0) ->
|
||||||
|
Parent = self(),
|
||||||
|
Pids = [
|
||||||
|
erlang:spawn(fun() ->
|
||||||
|
Opts =
|
||||||
|
case is_function(Opts0) of
|
||||||
|
true -> Opts0();
|
||||||
|
false -> Opts0
|
||||||
|
end,
|
||||||
|
emqx_resource:query(?ID, {inc_counter, M}, Opts),
|
||||||
|
Parent ! {complete, self()}
|
||||||
|
end)
|
||||||
|
|| M <- lists:seq(StartN, StartN + N - 1)
|
||||||
|
],
|
||||||
|
[
|
||||||
|
receive
|
||||||
|
{complete, Pid} -> ok
|
||||||
|
after 1000 ->
|
||||||
|
ct:fail({wait_for_query_timeout, Pid})
|
||||||
|
end
|
||||||
|
|| Pid <- Pids
|
||||||
|
].
|
||||||
|
|
||||||
bin_config() ->
|
bin_config() ->
|
||||||
<<"\"name\": \"test_resource\"">>.
|
<<"\"name\": \"test_resource\"">>.
|
||||||
|
|
||||||
|
|
|
@ -508,14 +508,16 @@ install_telemetry_handler(TestCase) ->
|
||||||
|
|
||||||
wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
|
wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) ->
|
||||||
Events = receive_all_events(GaugeName, Timeout),
|
Events = receive_all_events(GaugeName, Timeout),
|
||||||
case lists:last(Events) of
|
case length(Events) > 0 andalso lists:last(Events) of
|
||||||
#{measurements := #{gauge_set := ExpectedValue}} ->
|
#{measurements := #{gauge_set := ExpectedValue}} ->
|
||||||
ok;
|
ok;
|
||||||
#{measurements := #{gauge_set := Value}} ->
|
#{measurements := #{gauge_set := Value}} ->
|
||||||
ct:fail(
|
ct:fail(
|
||||||
"gauge ~p didn't reach expected value ~p; last value: ~p",
|
"gauge ~p didn't reach expected value ~p; last value: ~p",
|
||||||
[GaugeName, ExpectedValue, Value]
|
[GaugeName, ExpectedValue, Value]
|
||||||
)
|
);
|
||||||
|
false ->
|
||||||
|
ct:pal("no ~p gauge events received!", [GaugeName])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
receive_all_events(EventName, Timeout) ->
|
receive_all_events(EventName, Timeout) ->
|
||||||
|
@ -605,6 +607,8 @@ t_publish_success(Config) ->
|
||||||
ResourceId,
|
ResourceId,
|
||||||
#{n_events => ExpectedInflightEvents, timeout => 5_000}
|
#{n_events => ExpectedInflightEvents, timeout => 5_000}
|
||||||
),
|
),
|
||||||
|
wait_until_gauge_is(queuing, 0, 500),
|
||||||
|
wait_until_gauge_is(inflight, 0, 500),
|
||||||
assert_metrics(
|
assert_metrics(
|
||||||
#{
|
#{
|
||||||
dropped => 0,
|
dropped => 0,
|
||||||
|
@ -653,6 +657,8 @@ t_publish_success_local_topic(Config) ->
|
||||||
ResourceId,
|
ResourceId,
|
||||||
#{n_events => ExpectedInflightEvents, timeout => 5_000}
|
#{n_events => ExpectedInflightEvents, timeout => 5_000}
|
||||||
),
|
),
|
||||||
|
wait_until_gauge_is(queuing, 0, 500),
|
||||||
|
wait_until_gauge_is(inflight, 0, 500),
|
||||||
assert_metrics(
|
assert_metrics(
|
||||||
#{
|
#{
|
||||||
dropped => 0,
|
dropped => 0,
|
||||||
|
@ -739,6 +745,8 @@ t_publish_templated(Config) ->
|
||||||
ResourceId,
|
ResourceId,
|
||||||
#{n_events => ExpectedInflightEvents, timeout => 5_000}
|
#{n_events => ExpectedInflightEvents, timeout => 5_000}
|
||||||
),
|
),
|
||||||
|
wait_until_gauge_is(queuing, 0, 500),
|
||||||
|
wait_until_gauge_is(inflight, 0, 500),
|
||||||
assert_metrics(
|
assert_metrics(
|
||||||
#{
|
#{
|
||||||
dropped => 0,
|
dropped => 0,
|
||||||
|
@ -1111,19 +1119,17 @@ do_econnrefused_or_timeout_test(Config, Error) ->
|
||||||
ResourceId
|
ResourceId
|
||||||
);
|
);
|
||||||
{_, sync} ->
|
{_, sync} ->
|
||||||
wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{
|
|
||||||
timeout => 10_000, n_events => 2
|
|
||||||
}),
|
|
||||||
%% even waiting, hard to avoid flakiness... simpler to just sleep
|
%% even waiting, hard to avoid flakiness... simpler to just sleep
|
||||||
%% a bit until stabilization.
|
%% a bit until stabilization.
|
||||||
ct:sleep(200),
|
wait_until_gauge_is(queuing, 0, 500),
|
||||||
|
wait_until_gauge_is(inflight, 1, 500),
|
||||||
assert_metrics(
|
assert_metrics(
|
||||||
#{
|
#{
|
||||||
dropped => 0,
|
dropped => 0,
|
||||||
failed => 0,
|
failed => 0,
|
||||||
inflight => 0,
|
inflight => 1,
|
||||||
matched => 1,
|
matched => 1,
|
||||||
queuing => 1,
|
queuing => 0,
|
||||||
retried => 0,
|
retried => 0,
|
||||||
success => 0
|
success => 0
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in New Issue