diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 2ec6674b6..1cbdb92ee 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -89,6 +89,7 @@ data :: rocksdb:cf_handle(), trie :: emqx_ds_lts:trie(), keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()), + ts_bits :: non_neg_integer(), ts_offset :: non_neg_integer() }). @@ -213,7 +214,8 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> data = DataCF, trie = Trie, keymappers = KeymapperCache, - ts_offset = TSOffsetBits + ts_offset = TSOffsetBits, + ts_bits = TSBits }. -spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) -> @@ -348,13 +350,39 @@ update_iterator( ) -> {ok, OldIter#{?last_seen_key => DSKey}}. -next(Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> - %% Compute safe cutoff time. - %% It's the point in time where the last complete epoch ends, so we need to know - %% the current time to compute it. +next( + Shard, + Schema = #s{ts_offset = TSOffset, ts_bits = TSBits}, + It = #{?topic_filter := TF, ?storage_key := Stream}, + BatchSize +) -> init_counters(), - Now = emqx_ds:timestamp_us(), - SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, + %% Compute safe cutoff time. It's the point in time where the last + %% complete epoch ends, so we need to know the current time to + %% compute it. This is needed because new keys can be added before + %% the iterator. + IsWildcard = + case Stream of + {_StaticKey, []} -> false; + _ -> true + end, + SafeCutoffTime = + case IsWildcard of + true -> + Now = emqx_ds:timestamp_us(), + (Now bsr TSOffset) bsl TSOffset; + false -> + %% Iterators scanning streams without varying topic + %% levels can operate on incomplete epochs, since new + %% matching keys for the single topic are added in + %% lexicographic order. + %% + %% Note: this DOES NOT apply to non-wildcard topic + %% filters operating on streams with varying parts: + %% iterator can jump to the next topic and then it + %% won't backtrack. + 1 bsl TSBits - 1 + end, try next_until(Schema, It, SafeCutoffTime, BatchSize) after