diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index e522f2a09..e98e235aa 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -112,7 +112,9 @@ vector_to_key/2, bin_vector_to_key/2, key_to_vector/2, + key_to_coord/3, bin_key_to_vector/2, + bin_key_to_coord/3, key_to_bitstring/2, bitstring_to_key/2, make_filter/2, @@ -297,13 +299,7 @@ bin_vector_to_key(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size = key_to_vector(#keymapper{vec_scanner = Scanner}, Key) -> lists:map( fun(Actions) -> - lists:foldl( - fun(Action, Acc) -> - Acc bor extract_inv(Key, Action) - end, - 0, - Actions - ) + extract_coord(Actions, Key) end, Scanner ). @@ -324,6 +320,16 @@ bin_key_to_vector(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size = DimSizeof ). +-spec key_to_coord(keymapper(), key(), dimension()) -> coord(). +key_to_coord(#keymapper{vec_scanner = Scanner}, Key, Dim) -> + Actions = lists:nth(Dim, Scanner), + extract_coord(Actions, Key). + +-spec bin_key_to_coord(keymapper(), key(), dimension()) -> coord(). +bin_key_to_coord(Keymapper = #keymapper{key_size = Size}, BinKey, Dim) -> + <> = BinKey, + key_to_coord(Keymapper, Key, Dim). + %% @doc Transform a bitstring to a key -spec bitstring_to_key(keymapper(), bitstring()) -> scalar(). bitstring_to_key(#keymapper{key_size = Size}, Bin) -> @@ -680,6 +686,15 @@ extract_inv(Dest, #scan_action{ }) -> ((Dest bsr DestOffset) band SrcBitmask) bsl SrcOffset. +extract_coord(Actions, Key) -> + lists:foldl( + fun(Action, Acc) -> + Acc bor extract_inv(Key, Action) + end, + 0, + Actions + ). + ones(Bits) -> 1 bsl Bits - 1. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 59df2bbdc..326926d20 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -137,6 +137,9 @@ -include("emqx_ds_bitmask.hrl"). +-define(DIM_TOPIC, 1). +-define(DIM_TS, 2). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -481,39 +484,44 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> true = Key1 > Key0, case rocksdb:iterator_move(ITHandle, {seek, Key1}) of {ok, Key, Val} -> - {N, It, Acc} = - traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N0), + {N, It, Acc} = traverse_interval( + ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N0 + ), next_loop(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N); {error, invalid_iterator} -> {ok, It0, lists:reverse(Acc0)} end end. -traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) -> +traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) -> It = It0#{?last_seen_key := Key}, - case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of + Timestamp = emqx_ds_bitmask_keymapper:bin_key_to_coord(KeyMapper, Key, ?DIM_TS), + case + emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) andalso + check_timestamp(Cutoff, It, Timestamp) + of true -> Msg = deserialize(Val), - case check_message(Cutoff, It, Msg) of + case check_message(It, Msg) of true -> Acc = [{Key, Msg} | Acc0], - traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1); + traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1); false -> - traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N); - overflow -> - {0, It0, Acc0} + traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N) end; + overflow -> + {0, It0, Acc0}; false -> {N, It, Acc0} end. -traverse_interval(_ITHandle, _Filter, _Cutoff, It, Acc, 0) -> +traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {0, It, Acc}; -traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N) -> +traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) -> inc_counter(), case rocksdb:iterator_move(ITHandle, next) of {ok, Key, Val} -> - traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It, Acc, N); + traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N); {error, invalid_iterator} -> {0, It, Acc} end. @@ -562,6 +570,7 @@ delete_traverse_interval(LoopContext0) -> storage_iter := It0, current_key := Key, current_val := Val, + keymapper := KeyMapper, filter := Filter, safe_cutoff_time := Cutoff, selector := Selector, @@ -572,10 +581,14 @@ delete_traverse_interval(LoopContext0) -> remaining := Remaining0 } = LoopContext0, It = It0#{?last_seen_key := Key}, - case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of + Timestamp = emqx_ds_bitmask_keymapper:bin_key_to_coord(KeyMapper, Key, ?DIM_TS), + case + emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) andalso + check_timestamp(Cutoff, It, Timestamp) + of true -> Msg = deserialize(Val), - case check_message(Cutoff, It, Msg) of + case check_message(It, Msg) of true -> case Selector(Msg) of true -> @@ -588,10 +601,10 @@ delete_traverse_interval(LoopContext0) -> delete_traverse_interval1(LoopContext0#{remaining := Remaining0 - 1}) end; false -> - delete_traverse_interval1(LoopContext0); - overflow -> - {0, It0, AccDel0, AccIter0} + delete_traverse_interval1(LoopContext0) end; + overflow -> + {0, It0, AccDel0, AccIter0}; false -> {Remaining0, It, AccDel0, AccIter0} end. @@ -619,32 +632,21 @@ delete_traverse_interval1(LoopContext0) -> {0, It, AccDel, AccIter} end. --spec check_message(emqx_ds:time(), iterator() | delete_iterator(), emqx_types:message()) -> +-spec check_timestamp(emqx_ds:time(), iterator() | delete_iterator(), emqx_ds:time()) -> true | false | overflow. -check_message( - Cutoff, - _It, - #message{timestamp = Timestamp} -) when Timestamp >= Cutoff -> +check_timestamp(Cutoff, _It, Timestamp) when Timestamp >= Cutoff -> %% We hit the current epoch, we can't continue iterating over it yet. %% It would be unsafe otherwise: messages can be stored in the current epoch %% concurrently with iterating over it. They can end up earlier (in the iteration %% order) due to the nature of keymapping, potentially causing us to miss them. overflow; -check_message( - _Cutoff, - #{?tag := ?IT, ?start_time := StartTime, ?topic_filter := TopicFilter}, - #message{timestamp = Timestamp, topic = Topic} -) when Timestamp >= StartTime -> - emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter); -check_message( - _Cutoff, - #{?tag := ?DELETE_IT, ?start_time := StartTime, ?topic_filter := TopicFilter}, - #message{timestamp = Timestamp, topic = Topic} -) when Timestamp >= StartTime -> - emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter); -check_message(_Cutoff, _It, _Msg) -> - false. +check_timestamp(_Cutoff, #{?start_time := StartTime}, Timestamp) -> + Timestamp >= StartTime. + +-spec check_message(iterator() | delete_iterator(), emqx_types:message()) -> + true | false. +check_message(#{?topic_filter := TopicFilter}, #message{topic = Topic}) -> + emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter). format_key(KeyMapper, Key) -> Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)], @@ -720,12 +722,12 @@ deserialize(Blob) -> %% erlfmt-ignore make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) -> Bitsources = - %% Dimension Offset Bitsize - [{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index - {2, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch - [{2 + I, 0, BitsPerTopicLevel } %% Varying topic levels + %% Dimension Offset Bitsize + [{?DIM_TOPIC, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index + {?DIM_TS, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch + [{?DIM_TS + I, 0, BitsPerTopicLevel } %% Varying topic levels || I <- lists:seq(1, N)] ++ - [{2, 0, TSOffsetBits }], %% Timestamp offset + [{?DIM_TS, 0, TSOffsetBits }], %% Timestamp offset Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)), %% Assert: case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of