refactor: use monotonic times as refs and store initial times when creating ets
with this, we may measure latencies in the future.
This commit is contained in:
parent
3ba65c4377
commit
b82009bc29
|
@ -924,17 +924,24 @@ append_queue(Id, Index, Q, Queries) when not is_binary(Q) ->
|
||||||
|
|
||||||
%%==============================================================================
|
%%==============================================================================
|
||||||
%% the inflight queue for async query
|
%% the inflight queue for async query
|
||||||
-define(MAX_SIZE_REF, -1).
|
-define(MAX_SIZE_REF, max_size).
|
||||||
-define(SIZE_REF, -2).
|
-define(SIZE_REF, size).
|
||||||
|
-define(INITIAL_TIME_REF, initial_time).
|
||||||
|
-define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time).
|
||||||
|
|
||||||
inflight_new(InfltWinSZ, Id, Index) ->
|
inflight_new(InfltWinSZ, Id, Index) ->
|
||||||
TableId = ets:new(
|
TableId = ets:new(
|
||||||
emqx_resource_worker_inflight_tab,
|
emqx_resource_worker_inflight_tab,
|
||||||
[ordered_set, public, {write_concurrency, true}]
|
[ordered_set, public, {write_concurrency, true}]
|
||||||
),
|
),
|
||||||
inflight_append(TableId, {?MAX_SIZE_REF, {max_size, InfltWinSZ}}, Id, Index),
|
inflight_append(TableId, {?MAX_SIZE_REF, InfltWinSZ}, Id, Index),
|
||||||
%% we use this counter because we might deal with batches as
|
%% we use this counter because we might deal with batches as
|
||||||
%% elements.
|
%% elements.
|
||||||
inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
|
inflight_append(TableId, {?SIZE_REF, 0}, Id, Index),
|
||||||
|
inflight_append(TableId, {?INITIAL_TIME_REF, erlang:system_time()}, Id, Index),
|
||||||
|
inflight_append(
|
||||||
|
TableId, {?INITIAL_MONOTONIC_TIME_REF, erlang:monotonic_time(nanosecond)}, Id, Index
|
||||||
|
),
|
||||||
TableId.
|
TableId.
|
||||||
|
|
||||||
-spec inflight_get_first_retriable(ets:tid()) ->
|
-spec inflight_get_first_retriable(ets:tid()) ->
|
||||||
|
@ -958,7 +965,7 @@ inflight_get_first_retriable(InflightTID) ->
|
||||||
is_inflight_full(undefined) ->
|
is_inflight_full(undefined) ->
|
||||||
false;
|
false;
|
||||||
is_inflight_full(InflightTID) ->
|
is_inflight_full(InflightTID) ->
|
||||||
[{_, {max_size, MaxSize}}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
|
[{_, MaxSize}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
|
||||||
%% we consider number of batches rather than number of messages
|
%% we consider number of batches rather than number of messages
|
||||||
%% because one batch request may hold several messages.
|
%% because one batch request may hold several messages.
|
||||||
Size = inflight_num_batches(InflightTID),
|
Size = inflight_num_batches(InflightTID),
|
||||||
|
@ -1165,7 +1172,7 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) ->
|
||||||
St#{tref => undefined}.
|
St#{tref => undefined}.
|
||||||
|
|
||||||
make_message_ref() ->
|
make_message_ref() ->
|
||||||
erlang:unique_integer([monotonic, positive]).
|
erlang:monotonic_time(nanosecond).
|
||||||
|
|
||||||
collect_requests(Acc, Limit) ->
|
collect_requests(Acc, Limit) ->
|
||||||
Count = length(Acc),
|
Count = length(Acc),
|
||||||
|
|
Loading…
Reference in New Issue