feat(ds-lts): extract timestamp from storage key itself
1. This avoids the need to deserialize the message to get the timestamp. 2. It also makes possible to decouple the storage key timestamp from the message timestamp, which might be useful for replication purposes.
This commit is contained in:
parent
5cc0246351
commit
3cb36a5619
|
@ -112,7 +112,9 @@
|
||||||
vector_to_key/2,
|
vector_to_key/2,
|
||||||
bin_vector_to_key/2,
|
bin_vector_to_key/2,
|
||||||
key_to_vector/2,
|
key_to_vector/2,
|
||||||
|
key_to_coord/3,
|
||||||
bin_key_to_vector/2,
|
bin_key_to_vector/2,
|
||||||
|
bin_key_to_coord/3,
|
||||||
key_to_bitstring/2,
|
key_to_bitstring/2,
|
||||||
bitstring_to_key/2,
|
bitstring_to_key/2,
|
||||||
make_filter/2,
|
make_filter/2,
|
||||||
|
@ -297,13 +299,7 @@ bin_vector_to_key(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size =
|
||||||
key_to_vector(#keymapper{vec_scanner = Scanner}, Key) ->
|
key_to_vector(#keymapper{vec_scanner = Scanner}, Key) ->
|
||||||
lists:map(
|
lists:map(
|
||||||
fun(Actions) ->
|
fun(Actions) ->
|
||||||
lists:foldl(
|
extract_coord(Actions, Key)
|
||||||
fun(Action, Acc) ->
|
|
||||||
Acc bor extract_inv(Key, Action)
|
|
||||||
end,
|
|
||||||
0,
|
|
||||||
Actions
|
|
||||||
)
|
|
||||||
end,
|
end,
|
||||||
Scanner
|
Scanner
|
||||||
).
|
).
|
||||||
|
@ -324,6 +320,16 @@ bin_key_to_vector(Keymapper = #keymapper{vec_coord_size = DimSizeof, key_size =
|
||||||
DimSizeof
|
DimSizeof
|
||||||
).
|
).
|
||||||
|
|
||||||
|
-spec key_to_coord(keymapper(), key(), dimension()) -> coord().
|
||||||
|
key_to_coord(#keymapper{vec_scanner = Scanner}, Key, Dim) ->
|
||||||
|
Actions = lists:nth(Dim, Scanner),
|
||||||
|
extract_coord(Actions, Key).
|
||||||
|
|
||||||
|
-spec bin_key_to_coord(keymapper(), key(), dimension()) -> coord().
|
||||||
|
bin_key_to_coord(Keymapper = #keymapper{key_size = Size}, BinKey, Dim) ->
|
||||||
|
<<Key:Size>> = BinKey,
|
||||||
|
key_to_coord(Keymapper, Key, Dim).
|
||||||
|
|
||||||
%% @doc Transform a bitstring to a key
|
%% @doc Transform a bitstring to a key
|
||||||
-spec bitstring_to_key(keymapper(), bitstring()) -> scalar().
|
-spec bitstring_to_key(keymapper(), bitstring()) -> scalar().
|
||||||
bitstring_to_key(#keymapper{key_size = Size}, Bin) ->
|
bitstring_to_key(#keymapper{key_size = Size}, Bin) ->
|
||||||
|
@ -680,6 +686,15 @@ extract_inv(Dest, #scan_action{
|
||||||
}) ->
|
}) ->
|
||||||
((Dest bsr DestOffset) band SrcBitmask) bsl SrcOffset.
|
((Dest bsr DestOffset) band SrcBitmask) bsl SrcOffset.
|
||||||
|
|
||||||
|
extract_coord(Actions, Key) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun(Action, Acc) ->
|
||||||
|
Acc bor extract_inv(Key, Action)
|
||||||
|
end,
|
||||||
|
0,
|
||||||
|
Actions
|
||||||
|
).
|
||||||
|
|
||||||
ones(Bits) ->
|
ones(Bits) ->
|
||||||
1 bsl Bits - 1.
|
1 bsl Bits - 1.
|
||||||
|
|
||||||
|
|
|
@ -137,6 +137,9 @@
|
||||||
|
|
||||||
-include("emqx_ds_bitmask.hrl").
|
-include("emqx_ds_bitmask.hrl").
|
||||||
|
|
||||||
|
-define(DIM_TOPIC, 1).
|
||||||
|
-define(DIM_TS, 2).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-endif.
|
-endif.
|
||||||
|
@ -481,39 +484,44 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) ->
|
||||||
true = Key1 > Key0,
|
true = Key1 > Key0,
|
||||||
case rocksdb:iterator_move(ITHandle, {seek, Key1}) of
|
case rocksdb:iterator_move(ITHandle, {seek, Key1}) of
|
||||||
{ok, Key, Val} ->
|
{ok, Key, Val} ->
|
||||||
{N, It, Acc} =
|
{N, It, Acc} = traverse_interval(
|
||||||
traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N0),
|
ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N0
|
||||||
|
),
|
||||||
next_loop(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N);
|
next_loop(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N);
|
||||||
{error, invalid_iterator} ->
|
{error, invalid_iterator} ->
|
||||||
{ok, It0, lists:reverse(Acc0)}
|
{ok, It0, lists:reverse(Acc0)}
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It0, Acc0, N) ->
|
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) ->
|
||||||
It = It0#{?last_seen_key := Key},
|
It = It0#{?last_seen_key := Key},
|
||||||
case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of
|
Timestamp = emqx_ds_bitmask_keymapper:bin_key_to_coord(KeyMapper, Key, ?DIM_TS),
|
||||||
|
case
|
||||||
|
emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) andalso
|
||||||
|
check_timestamp(Cutoff, It, Timestamp)
|
||||||
|
of
|
||||||
true ->
|
true ->
|
||||||
Msg = deserialize(Val),
|
Msg = deserialize(Val),
|
||||||
case check_message(Cutoff, It, Msg) of
|
case check_message(It, Msg) of
|
||||||
true ->
|
true ->
|
||||||
Acc = [{Key, Msg} | Acc0],
|
Acc = [{Key, Msg} | Acc0],
|
||||||
traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N - 1);
|
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1);
|
||||||
false ->
|
false ->
|
||||||
traverse_interval(ITHandle, Filter, Cutoff, It, Acc0, N);
|
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N)
|
||||||
overflow ->
|
|
||||||
{0, It0, Acc0}
|
|
||||||
end;
|
end;
|
||||||
|
overflow ->
|
||||||
|
{0, It0, Acc0};
|
||||||
false ->
|
false ->
|
||||||
{N, It, Acc0}
|
{N, It, Acc0}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
traverse_interval(_ITHandle, _Filter, _Cutoff, It, Acc, 0) ->
|
traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) ->
|
||||||
{0, It, Acc};
|
{0, It, Acc};
|
||||||
traverse_interval(ITHandle, Filter, Cutoff, It, Acc, N) ->
|
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) ->
|
||||||
inc_counter(),
|
inc_counter(),
|
||||||
case rocksdb:iterator_move(ITHandle, next) of
|
case rocksdb:iterator_move(ITHandle, next) of
|
||||||
{ok, Key, Val} ->
|
{ok, Key, Val} ->
|
||||||
traverse_interval(ITHandle, Filter, Cutoff, Key, Val, It, Acc, N);
|
traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N);
|
||||||
{error, invalid_iterator} ->
|
{error, invalid_iterator} ->
|
||||||
{0, It, Acc}
|
{0, It, Acc}
|
||||||
end.
|
end.
|
||||||
|
@ -562,6 +570,7 @@ delete_traverse_interval(LoopContext0) ->
|
||||||
storage_iter := It0,
|
storage_iter := It0,
|
||||||
current_key := Key,
|
current_key := Key,
|
||||||
current_val := Val,
|
current_val := Val,
|
||||||
|
keymapper := KeyMapper,
|
||||||
filter := Filter,
|
filter := Filter,
|
||||||
safe_cutoff_time := Cutoff,
|
safe_cutoff_time := Cutoff,
|
||||||
selector := Selector,
|
selector := Selector,
|
||||||
|
@ -572,10 +581,14 @@ delete_traverse_interval(LoopContext0) ->
|
||||||
remaining := Remaining0
|
remaining := Remaining0
|
||||||
} = LoopContext0,
|
} = LoopContext0,
|
||||||
It = It0#{?last_seen_key := Key},
|
It = It0#{?last_seen_key := Key},
|
||||||
case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of
|
Timestamp = emqx_ds_bitmask_keymapper:bin_key_to_coord(KeyMapper, Key, ?DIM_TS),
|
||||||
|
case
|
||||||
|
emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) andalso
|
||||||
|
check_timestamp(Cutoff, It, Timestamp)
|
||||||
|
of
|
||||||
true ->
|
true ->
|
||||||
Msg = deserialize(Val),
|
Msg = deserialize(Val),
|
||||||
case check_message(Cutoff, It, Msg) of
|
case check_message(It, Msg) of
|
||||||
true ->
|
true ->
|
||||||
case Selector(Msg) of
|
case Selector(Msg) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -588,10 +601,10 @@ delete_traverse_interval(LoopContext0) ->
|
||||||
delete_traverse_interval1(LoopContext0#{remaining := Remaining0 - 1})
|
delete_traverse_interval1(LoopContext0#{remaining := Remaining0 - 1})
|
||||||
end;
|
end;
|
||||||
false ->
|
false ->
|
||||||
delete_traverse_interval1(LoopContext0);
|
delete_traverse_interval1(LoopContext0)
|
||||||
overflow ->
|
|
||||||
{0, It0, AccDel0, AccIter0}
|
|
||||||
end;
|
end;
|
||||||
|
overflow ->
|
||||||
|
{0, It0, AccDel0, AccIter0};
|
||||||
false ->
|
false ->
|
||||||
{Remaining0, It, AccDel0, AccIter0}
|
{Remaining0, It, AccDel0, AccIter0}
|
||||||
end.
|
end.
|
||||||
|
@ -619,32 +632,21 @@ delete_traverse_interval1(LoopContext0) ->
|
||||||
{0, It, AccDel, AccIter}
|
{0, It, AccDel, AccIter}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec check_message(emqx_ds:time(), iterator() | delete_iterator(), emqx_types:message()) ->
|
-spec check_timestamp(emqx_ds:time(), iterator() | delete_iterator(), emqx_ds:time()) ->
|
||||||
true | false | overflow.
|
true | false | overflow.
|
||||||
check_message(
|
check_timestamp(Cutoff, _It, Timestamp) when Timestamp >= Cutoff ->
|
||||||
Cutoff,
|
|
||||||
_It,
|
|
||||||
#message{timestamp = Timestamp}
|
|
||||||
) when Timestamp >= Cutoff ->
|
|
||||||
%% We hit the current epoch, we can't continue iterating over it yet.
|
%% We hit the current epoch, we can't continue iterating over it yet.
|
||||||
%% It would be unsafe otherwise: messages can be stored in the current epoch
|
%% It would be unsafe otherwise: messages can be stored in the current epoch
|
||||||
%% concurrently with iterating over it. They can end up earlier (in the iteration
|
%% concurrently with iterating over it. They can end up earlier (in the iteration
|
||||||
%% order) due to the nature of keymapping, potentially causing us to miss them.
|
%% order) due to the nature of keymapping, potentially causing us to miss them.
|
||||||
overflow;
|
overflow;
|
||||||
check_message(
|
check_timestamp(_Cutoff, #{?start_time := StartTime}, Timestamp) ->
|
||||||
_Cutoff,
|
Timestamp >= StartTime.
|
||||||
#{?tag := ?IT, ?start_time := StartTime, ?topic_filter := TopicFilter},
|
|
||||||
#message{timestamp = Timestamp, topic = Topic}
|
-spec check_message(iterator() | delete_iterator(), emqx_types:message()) ->
|
||||||
) when Timestamp >= StartTime ->
|
true | false.
|
||||||
emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter);
|
check_message(#{?topic_filter := TopicFilter}, #message{topic = Topic}) ->
|
||||||
check_message(
|
emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter).
|
||||||
_Cutoff,
|
|
||||||
#{?tag := ?DELETE_IT, ?start_time := StartTime, ?topic_filter := TopicFilter},
|
|
||||||
#message{timestamp = Timestamp, topic = Topic}
|
|
||||||
) when Timestamp >= StartTime ->
|
|
||||||
emqx_topic:match(emqx_topic:tokens(Topic), TopicFilter);
|
|
||||||
check_message(_Cutoff, _It, _Msg) ->
|
|
||||||
false.
|
|
||||||
|
|
||||||
format_key(KeyMapper, Key) ->
|
format_key(KeyMapper, Key) ->
|
||||||
Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
|
Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)],
|
||||||
|
@ -720,12 +722,12 @@ deserialize(Blob) ->
|
||||||
%% erlfmt-ignore
|
%% erlfmt-ignore
|
||||||
make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
|
make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) ->
|
||||||
Bitsources =
|
Bitsources =
|
||||||
%% Dimension Offset Bitsize
|
%% Dimension Offset Bitsize
|
||||||
[{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index
|
[{?DIM_TOPIC, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index
|
||||||
{2, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch
|
{?DIM_TS, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch
|
||||||
[{2 + I, 0, BitsPerTopicLevel } %% Varying topic levels
|
[{?DIM_TS + I, 0, BitsPerTopicLevel } %% Varying topic levels
|
||||||
|| I <- lists:seq(1, N)] ++
|
|| I <- lists:seq(1, N)] ++
|
||||||
[{2, 0, TSOffsetBits }], %% Timestamp offset
|
[{?DIM_TS, 0, TSOffsetBits }], %% Timestamp offset
|
||||||
Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)),
|
Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)),
|
||||||
%% Assert:
|
%% Assert:
|
||||||
case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of
|
case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of
|
||||||
|
|
Loading…
Reference in New Issue