refactor(buffer_worker): remove `?Q_ITEM` wrapping and use lightweight size estimate

This commit is contained in:
Thales Macedo Garitezi 2023-01-11 16:19:32 -03:00
parent 32a9e60313
commit 4c04a01370
1 changed files with 11 additions and 15 deletions

View File

@ -57,8 +57,6 @@
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
-define(Q_ITEM(REQUEST), {q_item, REQUEST}).
-define(COLLECT_REQ_LIMIT, 1000).
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
-define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}).
@ -472,8 +470,7 @@ flush(Data0) ->
Data = cancel_flush_timer(Data0),
{keep_state, Data};
_ ->
{Q1, QAckRef, Batch0} = replayq:pop(Q0, #{count_limit => BatchSize}),
Batch = [Item || ?Q_ITEM(Item) <- Batch0],
{Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
IsBatch = BatchSize =/= 1,
%% We *must* use the new queue, because we currently can't
%% `nack' a `pop'.
@ -862,16 +859,18 @@ drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) ->
%%==============================================================================
%% operations for queue
queue_item_marshaller(?Q_ITEM(_) = I) ->
term_to_binary(I);
queue_item_marshaller(Bin) when is_binary(Bin) ->
binary_to_term(Bin).
binary_to_term(Bin);
queue_item_marshaller(Item) ->
term_to_binary(Item).
estimate_size(QItem) ->
size(queue_item_marshaller(QItem)).
erlang:external_size(QItem).
-spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> replayq:q().
append_queue(Id, Index, Q, Queries) ->
append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
%% we must not append a raw binary because the marshaller will get
%% lost.
Q2 =
case replayq:overflow(Q) of
Overflow when Overflow =< 0 ->
@ -885,22 +884,19 @@ append_queue(Id, Index, Q, Queries) ->
?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}),
Q1
end,
Items = [?Q_ITEM(X) || X <- Queries],
Q3 = replayq:append(Q2, Items),
Q3 = replayq:append(Q2, Queries),
emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)),
?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}),
Q3.
-spec get_first_n_from_queue(replayq:q(), pos_integer()) ->
empty | {replayq:q(), replayq:ack_ref(), [?Q_ITEM(?QUERY(_From, _Request, _HasBeenSent))]}.
empty | {replayq:q(), replayq:ack_ref(), [?QUERY(_From, _Request, _HasBeenSent)]}.
get_first_n_from_queue(Q, N) ->
case replayq:count(Q) of
0 ->
empty;
_ ->
{NewQ, QAckRef, Items} = replayq:pop(Q, #{count_limit => N}),
Queries = [X || ?Q_ITEM(X) <- Items],
{NewQ, QAckRef, Queries}
replayq:pop(Q, #{count_limit => N})
end.
%%==============================================================================