fix(ds): don't iterate over incomplete epoch in bitmask lts storage

This commit is contained in:
Andrew Mayorov 2023-10-30 19:58:59 +07:00 committed by ieQu1
parent 46d8301bc0
commit 7a94db25c3
1 changed files with 78 additions and 47 deletions

View File

@ -59,7 +59,8 @@
db :: rocksdb:db_handle(),
data :: rocksdb:cf_handle(),
trie :: emqx_ds_lts:trie(),
keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper())
keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()),
ts_offset :: non_neg_integer()
}).
-type s() :: #s{}.
@ -147,7 +148,13 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
|| N <- lists:seq(0, MaxWildcardLevels)
]
),
#s{db = DBHandle, data = DataCF, trie = Trie, keymappers = KeymapperCache}.
#s{
db = DBHandle,
data = DataCF,
trie = Trie,
keymappers = KeymapperCache,
ts_offset = TSOffsetBits
}.
-spec store_batch(
emqx_ds_replication_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
@ -177,13 +184,26 @@ make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, Sta
storage_key = StorageKey
}}.
next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) ->
%% Compute safe cutoff time.
%% It's the point in time where the last complete epoch ends, so we need to know
%% the current time to compute it.
Now = emqx_message:timestamp_now(),
SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset,
next_until(Schema, It, SafeCutoffTime, BatchSize).
next_until(_Schema, It, SafeCutoffTime, _BatchSize) when It#it.start_time >= SafeCutoffTime ->
%% We're in the middle of the current epoch, so we can't yet iterate over it.
%% It would be unsafe otherwise: messages can be stored in the current epoch
%% concurrently with iterating over it. They can end up earlier (in the iteration
%% order) due to the nature of keymapping, potentially causing us to miss them.
{ok, It, []};
next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, BatchSize) ->
#it{
start_time = StartTime,
storage_key = StorageKey
} = It0,
storage_key = {TopicIndex, Varying}
} = It,
%% Make filter:
{TopicIndex, Varying} = StorageKey,
Inequations = [
{'=', TopicIndex},
{'>=', StartTime}
@ -197,10 +217,8 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
Varying
)
],
%% Obtain a keymapper for the current number of varying
%% levels. Magic constant 2: we have two extra dimensions of topic
%% index and time; the rest of dimensions are varying levels.
NVarying = length(Inequations) - 2,
%% Obtain a keymapper for the current number of varying levels.
NVarying = length(Varying),
%% Assert:
NVarying =< ?WILDCARD_LIMIT orelse
error({too_many_varying_topic_levels, NVarying}),
@ -215,7 +233,7 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
]),
try
put(?COUNTER, 0),
next_loop(ITHandle, Keymapper, Filter, It0, [], BatchSize)
next_loop(ITHandle, Keymapper, Filter, SafeCutoffTime, It, [], BatchSize)
after
rocksdb:iterator_close(ITHandle),
erase(?COUNTER)
@ -225,9 +243,9 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) ->
%% Internal functions
%%================================================================================
next_loop(_ITHandle, _KeyMapper, _Filter, It, Acc, 0) ->
next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
{ok, It, lists:reverse(Acc)};
next_loop(ITHandle, KeyMapper, Filter, It0, Acc0, N0) ->
next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
inc_counter(),
#it{last_seen_key = Key0} = It0,
case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of
@ -238,51 +256,64 @@ next_loop(ITHandle, KeyMapper, Filter, It0, Acc0, N0) ->
true = Key1 > Key0,
case rocksdb:iterator_move(ITHandle, {seek, Key1}) of
{ok, Key, Val} ->
It1 = It0#it{last_seen_key = Key},
case check_message(Filter, It1, Val) of
{true, Msg} ->
N1 = N0 - 1,
Acc1 = [Msg | Acc0];
false ->
N1 = N0,
Acc1 = Acc0
end,
{N, It, Acc} = traverse_interval(ITHandle, KeyMapper, Filter, It1, Acc1, N1),
next_loop(ITHandle, KeyMapper, Filter, It, Acc, N);
{N, It, Acc} =
traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N0),
next_loop(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N);
{error, invalid_iterator} ->
{ok, It0, lists:reverse(Acc0)}
end
end.
traverse_interval(_ITHandle, _KeyMapper, _Filter, It, Acc, 0) ->
{0, It, Acc};
traverse_interval(ITHandle, KeyMapper, Filter, It0, Acc, N) ->
inc_counter(),
case rocksdb:iterator_move(ITHandle, next) of
{ok, Key, Val} ->
traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) ->
It = It0#it{last_seen_key = Key},
case check_message(Filter, It, Val) of
{true, Msg} ->
traverse_interval(ITHandle, KeyMapper, Filter, It, [Msg | Acc], N - 1);
false ->
traverse_interval(ITHandle, KeyMapper, Filter, It, Acc, N)
end;
{error, invalid_iterator} ->
{0, It0, Acc}
end.
-spec check_message(emqx_ds_bitmask_keymapper:filter(), iterator(), binary()) ->
{true, emqx_types:message()} | false.
check_message(Filter, #it{last_seen_key = Key}, Val) ->
case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of
true ->
Msg = deserialize(Val),
%% TODO: check strict time and hash collisions
{true, Msg};
case check_message(Cutoff, It, Msg) of
true ->
Acc = [Msg | Acc0],
traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1);
false ->
false
traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N);
overflow ->
{0, It0, Acc0}
end;
false ->
{N, It, Acc0}
end.
traverse_interval(_ITHandle, _Filter, _Cutoff, It, Acc, 0) ->
{0, It, Acc};
traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N) ->
inc_counter(),
case rocksdb:iterator_move(ITHandle, next) of
{ok, Key, Val} ->
traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It, Acc, N);
{error, invalid_iterator} ->
{0, It, Acc}
end.
-spec check_message(emqx_ds:time(), iterator(), emqx_types:message()) ->
true | false | overflow.
check_message(
Cutoff,
_It,
#message{timestamp = Timestamp}
) when Timestamp >= Cutoff ->
%% We hit the current epoch, we can't continue iterating over it yet.
%% It would be unsafe otherwise: messages can be stored in the current epoch
%% concurrently with iterating over it. They can end up earlier (in the iteration
%% order) due to the nature of keymapping, potentially causing us to miss them.
overflow;
check_message(
_Cutoff,
#it{start_time = StartTime, topic_filter = TopicFilter},
#message{timestamp = Timestamp, topic = Topic}
) when Timestamp >= StartTime ->
emqx_topic:match(emqx_topic:words(Topic), TopicFilter);
check_message(_Cutoff, _It, _Msg) ->
false.
format_key(KeyMapper, Key) ->
Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])).