diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 4f7344772..f07c678b3 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -57,8 +57,6 @@ -elvis([{elvis_style, dont_repeat_yourself, disable}]). --define(Q_ITEM(REQUEST), {q_item, REQUEST}). - -define(COLLECT_REQ_LIMIT, 1000). -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). -define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}). @@ -472,8 +470,7 @@ flush(Data0) -> Data = cancel_flush_timer(Data0), {keep_state, Data}; _ -> - {Q1, QAckRef, Batch0} = replayq:pop(Q0, #{count_limit => BatchSize}), - Batch = [Item || ?Q_ITEM(Item) <- Batch0], + {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}), IsBatch = BatchSize =/= 1, %% We *must* use the new queue, because we currently can't %% `nack' a `pop'. @@ -862,16 +859,18 @@ drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) -> %%============================================================================== %% operations for queue -queue_item_marshaller(?Q_ITEM(_) = I) -> - term_to_binary(I); queue_item_marshaller(Bin) when is_binary(Bin) -> - binary_to_term(Bin). + binary_to_term(Bin); +queue_item_marshaller(Item) -> + term_to_binary(Item). estimate_size(QItem) -> - size(queue_item_marshaller(QItem)). + erlang:external_size(QItem). -spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> replayq:q(). -append_queue(Id, Index, Q, Queries) -> +append_queue(Id, Index, Q, Queries) when not is_binary(Q) -> + %% we must not append a raw binary because the marshaller will get + %% lost. Q2 = case replayq:overflow(Q) of Overflow when Overflow =< 0 -> @@ -885,22 +884,19 @@ append_queue(Id, Index, Q, Queries) -> ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, - Items = [?Q_ITEM(X) || X <- Queries], - Q3 = replayq:append(Q2, Items), + Q3 = replayq:append(Q2, Queries), emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)), ?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}), Q3. -spec get_first_n_from_queue(replayq:q(), pos_integer()) -> - empty | {replayq:q(), replayq:ack_ref(), [?Q_ITEM(?QUERY(_From, _Request, _HasBeenSent))]}. + empty | {replayq:q(), replayq:ack_ref(), [?QUERY(_From, _Request, _HasBeenSent)]}. get_first_n_from_queue(Q, N) -> case replayq:count(Q) of 0 -> empty; _ -> - {NewQ, QAckRef, Items} = replayq:pop(Q, #{count_limit => N}), - Queries = [X || ?Q_ITEM(X) <- Items], - {NewQ, QAckRef, Queries} + replayq:pop(Q, #{count_limit => N}) end. %%==============================================================================