diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 85f4f5aa7..4dddaff67 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -59,7 +59,8 @@ db :: rocksdb:db_handle(), data :: rocksdb:cf_handle(), trie :: emqx_ds_lts:trie(), - keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()) + keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()), + ts_offset :: non_neg_integer() }). -type s() :: #s{}. @@ -147,7 +148,13 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> || N <- lists:seq(0, MaxWildcardLevels) ] ), - #s{db = DBHandle, data = DataCF, trie = Trie, keymappers = KeymapperCache}. + #s{ + db = DBHandle, + data = DataCF, + trie = Trie, + keymappers = KeymapperCache, + ts_offset = TSOffsetBits + }. -spec store_batch( emqx_ds_replication_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() @@ -177,13 +184,26 @@ make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, Sta storage_key = StorageKey }}. -next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> +next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> + %% Compute safe cutoff time. + %% It's the point in time where the last complete epoch ends, so we need to know + %% the current time to compute it. + Now = emqx_message:timestamp_now(), + SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, + next_until(Schema, It, SafeCutoffTime, BatchSize). + +next_until(_Schema, It, SafeCutoffTime, _BatchSize) when It#it.start_time >= SafeCutoffTime -> + %% We're in the middle of the current epoch, so we can't yet iterate over it. + %% It would be unsafe otherwise: messages can be stored in the current epoch + %% concurrently with iterating over it. They can end up earlier (in the iteration + %% order) due to the nature of keymapping, potentially causing us to miss them. + {ok, It, []}; +next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, BatchSize) -> #it{ start_time = StartTime, - storage_key = StorageKey - } = It0, + storage_key = {TopicIndex, Varying} + } = It, %% Make filter: - {TopicIndex, Varying} = StorageKey, Inequations = [ {'=', TopicIndex}, {'>=', StartTime} @@ -197,10 +217,8 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> Varying ) ], - %% Obtain a keymapper for the current number of varying - %% levels. Magic constant 2: we have two extra dimensions of topic - %% index and time; the rest of dimensions are varying levels. - NVarying = length(Inequations) - 2, + %% Obtain a keymapper for the current number of varying levels. + NVarying = length(Varying), %% Assert: NVarying =< ?WILDCARD_LIMIT orelse error({too_many_varying_topic_levels, NVarying}), @@ -215,7 +233,7 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> ]), try put(?COUNTER, 0), - next_loop(ITHandle, Keymapper, Filter, It0, [], BatchSize) + next_loop(ITHandle, Keymapper, Filter, SafeCutoffTime, It, [], BatchSize) after rocksdb:iterator_close(ITHandle), erase(?COUNTER) @@ -225,9 +243,9 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> %% Internal functions %%================================================================================ -next_loop(_ITHandle, _KeyMapper, _Filter, It, Acc, 0) -> +next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {ok, It, lists:reverse(Acc)}; -next_loop(ITHandle, KeyMapper, Filter, It0, Acc0, N0) -> +next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> inc_counter(), #it{last_seen_key = Key0} = It0, case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of @@ -238,51 +256,64 @@ next_loop(ITHandle, KeyMapper, Filter, It0, Acc0, N0) -> true = Key1 > Key0, case rocksdb:iterator_move(ITHandle, {seek, Key1}) of {ok, Key, Val} -> - It1 = It0#it{last_seen_key = Key}, - case check_message(Filter, It1, Val) of - {true, Msg} -> - N1 = N0 - 1, - Acc1 = [Msg | Acc0]; - false -> - N1 = N0, - Acc1 = Acc0 - end, - {N, It, Acc} = traverse_interval(ITHandle, KeyMapper, Filter, It1, Acc1, N1), - next_loop(ITHandle, KeyMapper, Filter, It, Acc, N); + {N, It, Acc} = + traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N0), + next_loop(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N); {error, invalid_iterator} -> {ok, It0, lists:reverse(Acc0)} end end. -traverse_interval(_ITHandle, _KeyMapper, _Filter, It, Acc, 0) -> - {0, It, Acc}; -traverse_interval(ITHandle, KeyMapper, Filter, It0, Acc, N) -> - inc_counter(), - case rocksdb:iterator_move(ITHandle, next) of - {ok, Key, Val} -> - It = It0#it{last_seen_key = Key}, - case check_message(Filter, It, Val) of - {true, Msg} -> - traverse_interval(ITHandle, KeyMapper, Filter, It, [Msg | Acc], N - 1); - false -> - traverse_interval(ITHandle, KeyMapper, Filter, It, Acc, N) - end; - {error, invalid_iterator} -> - {0, It0, Acc} - end. - --spec check_message(emqx_ds_bitmask_keymapper:filter(), iterator(), binary()) -> - {true, emqx_types:message()} | false. -check_message(Filter, #it{last_seen_key = Key}, Val) -> +traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) -> + It = It0#it{last_seen_key = Key}, case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of true -> Msg = deserialize(Val), - %% TODO: check strict time and hash collisions - {true, Msg}; + case check_message(Cutoff, It, Msg) of + true -> + Acc = [Msg | Acc0], + traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1); + false -> + traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N); + overflow -> + {0, It0, Acc0} + end; false -> - false + {N, It, Acc0} end. +traverse_interval(_ITHandle, _Filter, _Cutoff, It, Acc, 0) -> + {0, It, Acc}; +traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N) -> + inc_counter(), + case rocksdb:iterator_move(ITHandle, next) of + {ok, Key, Val} -> + traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It, Acc, N); + {error, invalid_iterator} -> + {0, It, Acc} + end. + +-spec check_message(emqx_ds:time(), iterator(), emqx_types:message()) -> + true | false | overflow. +check_message( + Cutoff, + _It, + #message{timestamp = Timestamp} +) when Timestamp >= Cutoff -> + %% We hit the current epoch, we can't continue iterating over it yet. + %% It would be unsafe otherwise: messages can be stored in the current epoch + %% concurrently with iterating over it. They can end up earlier (in the iteration + %% order) due to the nature of keymapping, potentially causing us to miss them. + overflow; +check_message( + _Cutoff, + #it{start_time = StartTime, topic_filter = TopicFilter}, + #message{timestamp = Timestamp, topic = Topic} +) when Timestamp >= StartTime -> + emqx_topic:match(emqx_topic:words(Topic), TopicFilter); +check_message(_Cutoff, _It, _Msg) -> + false. + format_key(KeyMapper, Key) -> Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)], lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).