From b82009bc2955760259b696e242efa702a35865ce Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 16 Jan 2023 18:16:56 -0300 Subject: [PATCH] refactor: use monotonic times as refs and store initial times when creating ets with this, we may measure latencies in the future. --- apps/emqx_resource/src/emqx_resource_worker.erl | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 6875341b4..b1a34355b 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -924,17 +924,24 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) -> %%============================================================================== %% the inflight queue for async query --define(MAX_SIZE_REF, -1). --define(SIZE_REF, -2). +-define(MAX_SIZE_REF, max_size). +-define(SIZE_REF, size). +-define(INITIAL_TIME_REF, initial_time). +-define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time). + inflight_new(InfltWinSZ, Id, Index) -> TableId = ets:new( emqx_resource_worker_inflight_tab, [ordered_set, public, {write_concurrency, true}] ), - inflight_append(TableId, {?MAX_SIZE_REF, {max_size, InfltWinSZ}}, Id, Index), + inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index), %% we use this counter because we might deal with batches as %% elements. inflight_append(TableId, {?SIZE_REF, 0}, Id, Index), + inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index), + inflight_append( + TableId, {?INITIAL_MONOTONIC_TIME_REF, erlang:monotonic_time(nanosecond)}, Id, Index + ), TableId. -spec inflight_get_first_retriable(ets:tid()) -> @@ -958,7 +965,7 @@ inflight_get_first_retriable(InflightTID) -> is_inflight_full(undefined) -> false; is_inflight_full(InflightTID) -> - [{_, {max_size, MaxSize}}] = ets:lookup(InflightTID, ?MAX_SIZE_REF), + [{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF), %% we consider number of batches rather than number of messages %% because one batch request may hold several messages. Size = inflight_num_batches(InflightTID), @@ -1165,7 +1172,7 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) -> St#{tref => undefined}. make_message_ref() -> - erlang:unique_integer([monotonic, positive]). + erlang:monotonic_time(nanosecond). collect_requests(Acc, Limit) -> Count = length(Acc),