refactor(resource): resume from queue/inflight-window with async-sending and batching

This commit is contained in:
Shawn 2022-09-19 14:26:46 +08:00
parent cc327629c3
commit b325633390
1 changed files with 105 additions and 62 deletions

View File

@ -208,14 +208,6 @@ terminate(_Reason, #{id := Id, index := Index}) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
queue_item_marshaller(?Q_ITEM(_) = I) ->
term_to_binary(I);
queue_item_marshaller(Bin) when is_binary(Bin) ->
binary_to_term(Bin).
estimate_size(QItem) ->
size(queue_item_marshaller(QItem)).
%%==============================================================================
-define(PICK(ID, KEY, EXPR),
try gproc_pool:pick_worker(ID, KEY) of
@ -237,26 +229,60 @@ pick_call(Id, Key, Query, Timeout) ->
pick_cast(Id, Key, Query) ->
?PICK(Id, Key, gen_statem:cast(Pid, Query)).
do_resume(#{queue := Q, id := Id, name := Name} = St) ->
do_resume(#{id := Id, name := Name} = St) ->
case inflight_get_first(Name) of
empty ->
retry_first_from_queue(Q, Id, St);
retry_queue(St);
{Ref, FirstQuery} ->
retry_first_sync(Id, FirstQuery, Name, Ref, undefined, St)
%% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again.
retry_inflight_sync(Id, Ref, FirstQuery, Name, St)
end.
retry_first_from_queue(undefined, _Id, St) ->
retry_queue(#{queue := undefined} = St) ->
{next_state, running, St};
retry_first_from_queue(Q, Id, St) ->
case replayq:peek(Q) of
empty ->
retry_queue(#{queue := Q, id := Id, enable_batch := false, resume_interval := ResumeT} = St) ->
case get_first_n_from_queue(Q, 1) of
[] ->
{next_state, running, St};
?Q_ITEM(FirstQuery) ->
retry_first_sync(Id, FirstQuery, undefined, undefined, Q, St)
[?QUERY(_, Request, HasSent) = Query] ->
QueryOpts = #{inflight_name => maps:get(name, St)},
Result = call_query(configured, Id, Query, QueryOpts),
case reply_caller(Id, ?REPLY(undefined, Request, HasSent, Result)) of
true ->
{keep_state, St, {state_timeout, ResumeT, resume}};
false ->
retry_queue(St#{queue := drop_head(Q, Id)})
end
end;
retry_queue(
#{
queue := Q,
id := Id,
enable_batch := true,
batch_size := BatchSize,
resume_interval := ResumeT
} = St
) ->
case get_first_n_from_queue(Q, BatchSize) of
[] ->
{next_state, running, St};
Batch0 ->
QueryOpts = #{inflight_name => maps:get(name, St)},
Result = call_query(configured, Id, Batch0, QueryOpts),
%% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue,
%% we now change the 'from' field to 'undefined' so it will not reply the caller again.
Batch = [?QUERY(undefined, Request, HasSent) || ?QUERY(_, Request, HasSent) <- Batch0],
case batch_reply_caller(Id, Result, Batch) of
true ->
{keep_state, St, {state_timeout, ResumeT, resume}};
false ->
retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id)})
end
end.
retry_first_sync(
Id, ?QUERY(_, _, HasSent) = Query, Name, Ref, Q, #{resume_interval := ResumeT} = St0
retry_inflight_sync(
Id, Ref, ?QUERY(_, _, HasSent) = Query, Name, #{resume_interval := ResumeT} = St0
) ->
Result = call_query(sync, Id, Query, #{}),
case handle_query_result(Id, Result, HasSent, false) of
@ -265,25 +291,10 @@ retry_first_sync(
{keep_state, St0, {state_timeout, ResumeT, resume}};
%% Send ok or failed but the resource is working
false ->
%% We Send 'resume' to the end of the mailbox to give the worker
%% a chance to process 'query' requests.
St =
case Q of
undefined ->
inflight_drop(Name, Ref),
St0;
_ ->
St0#{queue => drop_head(Id, Q)}
end,
{keep_state, St, {state_timeout, 0, resume}}
inflight_drop(Name, Ref),
do_resume(St0)
end.
drop_head(Id, Q) ->
{Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
ok = replayq:ack(Q1, AckRef),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1),
Q1.
query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) ->
Acc1 = [?QUERY(From, Request, false) | Acc],
emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'),
@ -299,7 +310,7 @@ query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St)
Result = call_query(configured, Id, ?QUERY(From, Request, false), QueryOpts),
case reply_caller(Id, ?REPLY(From, Request, false, Result)) of
true ->
Query = ?QUERY(From, Request, 1),
Query = ?QUERY(From, Request, false),
{next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}};
false ->
{keep_state, St}
@ -330,29 +341,6 @@ flush(
{keep_state, St1}
end.
maybe_append_queue(Id, undefined, _Items) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'),
undefined;
maybe_append_queue(Id, Q, Items) ->
Q2 =
case replayq:overflow(Q) of
Overflow when Overflow =< 0 ->
Q;
Overflow ->
PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
{Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
ok = replayq:ack(Q1, QAckRef),
Dropped = length(Items2),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'),
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
Q1
end,
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
replayq:append(Q2, Items).
batch_reply_caller(Id, BatchResult, Batch) ->
lists:foldl(
fun(Reply, BlockWorker) ->
@ -550,12 +538,67 @@ drop_inflight_and_resume(Pid, Name, Ref) ->
inflight_drop(Name, Ref)
end.
%%==============================================================================
%% operations for queue
queue_item_marshaller(?Q_ITEM(_) = I) ->
term_to_binary(I);
queue_item_marshaller(Bin) when is_binary(Bin) ->
binary_to_term(Bin).
estimate_size(QItem) ->
size(queue_item_marshaller(QItem)).
maybe_append_queue(Id, undefined, _Items) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'),
undefined;
maybe_append_queue(Id, Q, Items) ->
Q2 =
case replayq:overflow(Q) of
Overflow when Overflow =< 0 ->
Q;
Overflow ->
PopOpts = #{bytes_limit => Overflow, count_limit => 999999999},
{Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts),
ok = replayq:ack(Q1, QAckRef),
Dropped = length(Items2),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'),
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
Q1
end,
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'),
replayq:append(Q2, Items).
get_first_n_from_queue(Q, N) ->
get_first_n_from_queue(Q, N, []).
get_first_n_from_queue(_Q, 0, Acc) ->
lists:reverse(Acc);
get_first_n_from_queue(Q, N, Acc) when N > 0 ->
case replayq:peek(Q) of
empty -> Acc;
?Q_ITEM(Query) -> get_first_n_from_queue(Q, N - 1, [Query | Acc])
end.
drop_first_n_from_queue(Q, 0, _Id) ->
Q;
drop_first_n_from_queue(Q, N, Id) when N > 0 ->
drop_first_n_from_queue(drop_head(Q, Id), N - 1, Id).
drop_head(Q, Id) ->
{Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
ok = replayq:ack(Q1, AckRef),
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1),
Q1.
%%==============================================================================
%% the inflight queue for async query
-define(SIZE_REF, -1).
inflight_new(Name, InfltWinSZ) ->
_ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]),
inflight_append(Name, ?SIZE_REF, {size, InfltWinSZ}),
inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}),
ok.
inflight_get_first(Name) ->
@ -575,7 +618,7 @@ inflight_get_first(Name) ->
inflight_is_full(undefined) ->
false;
inflight_is_full(Name) ->
[{_, {size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF),
[{_, {max_size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF),
case ets:info(Name, size) of
Size when Size > MaxSize -> true;
_ -> false