From 3cb36a5619199a93ed06a4df7f323f4bcf5f25aa Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 7 Mar 2024 15:53:07 +0100 Subject: [PATCH] feat(ds-lts): extract timestamp from storage key itself 1. This avoids the need to deserialize the message to get the timestamp. 2. It also makes possible to decouple the storage key timestamp from the message timestamp, which might be useful for replication purposes. --- .../src/emqx_ds_bitmask_keymapper.erl | 29 +++++-- .../src/emqx_ds_storage_bitfield_lts.erl | 86 ++++++++++--------- 2 files changed, 66 insertions(+), 49 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index e522f2a09..e98e235aa 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -112,7 +112,9 @@ vector_to_key/2, bin_vector_to_key/2, key_to_vector/2, + key_to_coord/3, bin_key_to_vector/2, + bin_key_to_coord/3, key_to_bitstring/2, bitstring_to_key/2, make_filter/2, @@ -297,13 +299,7 @@ bin_vector_to_key(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size = key_to_vector(#keymapper{vec_scanner = Scanner}, Key) -> lists:map( fun(Actions) -> - lists:foldl( - fun(Action, Acc) -> - Acc bor extract_inv(Key, Action) - end, - 0, - Actions - ) + extract_coord(Actions, Key) end, Scanner ). @@ -324,6 +320,16 @@ bin_key_to_vector(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size = DimSizeof ). +-spec key_to_coord(keymapper(), key(), dimension()) -> coord(). +key_to_coord(#keymapper{vec_scanner = Scanner}, Key, Dim) -> + Actions = lists:nth(Dim, Scanner), + extract_coord(Actions, Key). + +-spec bin_key_to_coord(keymapper(), key(), dimension()) -> coord(). +bin_key_to_coord(Keymapper = #keymapper{key_size = Size}, BinKey, Dim) -> + <> = BinKey, + key_to_coord(Keymapper, Key, Dim). + %% @doc Transform a bitstring to a key -spec bitstring_to_key(keymapper(), bitstring()) -> scalar(). bitstring_to_key(#keymapper{key_size = Size}, Bin) -> @@ -680,6 +686,15 @@ extract_inv(Dest, #scan_action{ }) -> ((Dest bsr DestOffset) band SrcBitmask) bsl SrcOffset. +extract_coord(Actions, Key) -> + lists:foldl( + fun(Action, Acc) -> + Acc bor extract_inv(Key, Action) + end, + 0, + Actions + ). + ones(Bits) -> 1 bsl Bits - 1. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 59df2bbdc..326926d20 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -137,6 +137,9 @@ -include("emqx_ds_bitmask.hrl"). +-define(DIM_TOPIC, 1). +-define(DIM_TS, 2). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -481,39 +484,44 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> true = Key1 > Key0, case rocksdb:iterator_move(ITHandle, {seek, Key1}) of {ok, Key, Val} -> - {N, It, Acc} = - traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N0), + {N, It, Acc} = traverse_interval( + ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N0 + ), next_loop(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N); {error, invalid_iterator} -> {ok, It0, lists:reverse(Acc0)} end end. -traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) -> +traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) -> It = It0#{?last_seen_key := Key}, - case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of + Timestamp = emqx_ds_bitmask_keymapper:bin_key_to_coord(KeyMapper, Key, ?DIM_TS), + case + emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) andalso + check_timestamp(Cutoff, It, Timestamp) + of true -> Msg = deserialize(Val), - case check_message(Cutoff, It, Msg) of + case check_message(It, Msg) of true -> Acc = [{Key, Msg} | Acc0], - traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1); + traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1); false -> - traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N); - overflow -> - {0, It0, Acc0} + traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N) end; + overflow -> + {0, It0, Acc0}; false -> {N, It, Acc0} end. -traverse_interval(_ITHandle, _Filter, _Cutoff, It, Acc, 0) -> +traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {0, It, Acc}; -traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N) -> +traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) -> inc_counter(), case rocksdb:iterator_move(ITHandle, next) of {ok, Key, Val} -> - traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It, Acc, N); + traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N); {error, invalid_iterator} -> {0, It, Acc} end. @@ -562,6 +570,7 @@ delete_traverse_interval(LoopContext0) -> storage_iter := It0, current_key := Key, current_val := Val, + keymapper := KeyMapper, filter := Filter, safe_cutoff_time := Cutoff, selector := Selector, @@ -572,10 +581,14 @@ delete_traverse_interval(LoopContext0) -> remaining := Remaining0 } = LoopContext0, It = It0#{?last_seen_key := Key}, - case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of + Timestamp = emqx_ds_bitmask_keymapper:bin_key_to_coord(KeyMapper, Key, ?DIM_TS), + case + emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) andalso + check_timestamp(Cutoff, It, Timestamp) + of true -> Msg = deserialize(Val), - case check_message(Cutoff, It, Msg) of + case check_message(It, Msg) of true -> case Selector(Msg) of true -> @@ -588,10 +601,10 @@ delete_traverse_interval(LoopContext0) -> delete_traverse_interval1(LoopContext0#{remaining := Remaining0 - 1}) end; false -> - delete_traverse_interval1(LoopContext0); - overflow -> - {0, It0, AccDel0, AccIter0} + delete_traverse_interval1(LoopContext0) end; + overflow -> + {0, It0, AccDel0, AccIter0}; false -> {Remaining0, It, AccDel0, AccIter0} end. @@ -619,32 +632,21 @@ delete_traverse_interval1(LoopContext0) -> {0, It, AccDel, AccIter} end. --spec check_message(emqx_ds:time(), iterator() | delete_iterator(), emqx_types:message()) -> +-spec check_timestamp(emqx_ds:time(), iterator() | delete_iterator(), emqx_ds:time()) -> true | false | overflow. -check_message( - Cutoff, - _It, - #message{timestamp = Timestamp} -) when Timestamp >= Cutoff -> +check_timestamp(Cutoff, _It, Timestamp) when Timestamp >= Cutoff -> %% We hit the current epoch, we can't continue iterating over it yet. %% It would be unsafe otherwise: messages can be stored in the current epoch %% concurrently with iterating over it. They can end up earlier (in the iteration %% order) due to the nature of keymapping, potentially causing us to miss them. overflow; -check_message( - _Cutoff, - #{?tag := ?IT, ?start_time := StartTime, ?topic_filter := TopicFilter}, - #message{timestamp = Timestamp, topic = Topic} -) when Timestamp >= StartTime -> - emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter); -check_message( - _Cutoff, - #{?tag := ?DELETE_IT, ?start_time := StartTime, ?topic_filter := TopicFilter}, - #message{timestamp = Timestamp, topic = Topic} -) when Timestamp >= StartTime -> - emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter); -check_message(_Cutoff, _It, _Msg) -> - false. +check_timestamp(_Cutoff, #{?start_time := StartTime}, Timestamp) -> + Timestamp >= StartTime. + +-spec check_message(iterator() | delete_iterator(), emqx_types:message()) -> + true | false. +check_message(#{?topic_filter := TopicFilter}, #message{topic = Topic}) -> + emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter). format_key(KeyMapper, Key) -> Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)], @@ -720,12 +722,12 @@ deserialize(Blob) -> %% erlfmt-ignore make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) -> Bitsources = - %% Dimension Offset Bitsize - [{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index - {2, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch - [{2 + I, 0, BitsPerTopicLevel } %% Varying topic levels + %% Dimension Offset Bitsize + [{?DIM_TOPIC, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index + {?DIM_TS, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch + [{?DIM_TS + I, 0, BitsPerTopicLevel } %% Varying topic levels || I <- lists:seq(1, N)] ++ - [{2, 0, TSOffsetBits }], %% Timestamp offset + [{?DIM_TS, 0, TSOffsetBits }], %% Timestamp offset Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)), %% Assert: case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of