feat(ds): Ignore safe cutoff time for streams without varying levels

This commit is contained in:
ieQu1 2024-04-22 17:39:31 +02:00
parent 004dc80fb2
commit 9999ccd36c
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
1 changed files with 35 additions and 7 deletions

View File

@ -89,6 +89,7 @@
data :: rocksdb:cf_handle(),
trie :: emqx_ds_lts:trie(),
keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
ts_bits :: non_neg_integer(),
ts_offset :: non_neg_integer()
}).
@ -213,7 +214,8 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
data = DataCF,
trie = Trie,
keymappers = KeymapperCache,
ts_offset = TSOffsetBits
ts_offset = TSOffsetBits,
ts_bits = TSBits
}.
-spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) ->
@ -348,13 +350,39 @@ update_iterator(
) ->
{ok, OldIter#{?last_seen_key => DSKey}}.
next(Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
%% Compute safe cutoff time.
%% It's the point in time where the last complete epoch ends, so we need to know
%% the current time to compute it.
next(
Shard,
Schema = #s{ts_offset = TSOffset, ts_bits = TSBits},
It = #{?topic_filter := TF, ?storage_key := Stream},
BatchSize
) ->
init_counters(),
%% Compute safe cutoff time. It's the point in time where the last
%% complete epoch ends, so we need to know the current time to
%% compute it. This is needed because new keys can be added before
%% the iterator.
IsWildcard =
case Stream of
{_StaticKey, []} -> false;
_ -> true
end,
SafeCutoffTime =
case IsWildcard of
true ->
Now = emqx_ds:timestamp_us(),
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
(Now bsr TSOffset) bsl TSOffset;
false ->
%% Iterators scanning streams without varying topic
%% levels can operate on incomplete epochs, since new
%% matching keys for the single topic are added in
%% lexicographic order.
%%
%% Note: this DOES NOT apply to non-wildcard topic
%% filters operating on streams with varying parts:
%% iterator can jump to the next topic and then it
%% won't backtrack.
1 bsl TSBits - 1
end,
try
next_until(Schema, It, SafeCutoffTime, BatchSize)
after