From ef46c09cafe087c27fa515e4bcdc8515b428d9df Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 14 Oct 2023 01:01:10 +0200 Subject: [PATCH] feat(ds): Implement ratchet function for bitmask keymapper --- apps/emqx/src/emqx_persistent_message.erl | 5 +- .../src/emqx_ds_bitmask.hrl | 36 + .../src/emqx_ds_bitmask_keymapper.erl | 581 +++++++------- .../src/emqx_ds_storage_bitfield_lts.erl | 187 ++--- .../src/emqx_ds_storage_layer.erl_ | 714 ----------------- .../src/emqx_ds_storage_layer_bitmask.erl_ | 748 ------------------ .../emqx_ds_storage_bitfield_lts_SUITE.erl | 124 +-- 7 files changed, 436 insertions(+), 1959 deletions(-) create mode 100644 apps/emqx_durable_storage/src/emqx_ds_bitmask.hrl delete mode 100644 apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl_ delete mode 100644 apps/emqx_durable_storage/src/emqx_ds_storage_layer_bitmask.erl_ diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index f3ec9def5..632ff2a27 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -40,7 +40,10 @@ init() -> ?WHEN_ENABLED(begin - ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{}), + ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{ + backend => builtin, + storage => {emqx_ds_storage_bitfield_lts, #{}} + }), ok = emqx_persistent_session_ds_router:init_tables(), ok = emqx_persistent_session_ds:create_tables(), ok diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask.hrl b/apps/emqx_durable_storage/src/emqx_ds_bitmask.hrl new file mode 100644 index 000000000..31af0e034 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask.hrl @@ -0,0 +1,36 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_DS_BITMASK_HRL). +-define(EMQX_DS_BITMASK_HRL, true). + +-record(filter_scan_action, { + offset :: emqx_ds_bitmask_keymapper:offset(), + size :: emqx_ds_bitmask_keymapper:bitsize(), + min :: non_neg_integer(), + max :: non_neg_integer() +}). + +-record(filter, { + size :: non_neg_integer(), + bitfilter :: non_neg_integer(), + bitmask :: non_neg_integer(), + %% Ranges (in _bitsource_ basis): + bitsource_ranges :: array:array(#filter_scan_action{}), + range_min :: non_neg_integer(), + range_max :: non_neg_integer() +}). + +-endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index 5c3ae42d8..a512a141c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -86,9 +86,12 @@ bin_vector_to_key/2, key_to_vector/2, bin_key_to_vector/2, - next_range/3, key_to_bitstring/2, bitstring_to_key/2, + make_filter/2, + ratchet/2, + bin_increment/2, + bin_checkmask/2, bitsize/1 ]). @@ -149,6 +152,10 @@ -type scalar_range() :: any | {'=', scalar() | infinity} | {'>=', scalar()}. +-include("emqx_ds_bitmask.hrl"). + +-type filter() :: #filter{}. + %%================================================================================ %% API functions %%================================================================================ @@ -237,36 +244,6 @@ bin_key_to_vector(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, B lists:zip(Vector, DimSizeof) ). -%% @doc Given a keymapper, a filter, and a key, return a triple containing: -%% -%% 1. `NextKey', a key that is greater than the given one, and is -%% within the given range. -%% -%% 2. `Bitmask' -%% -%% 3. `Bitfilter' -%% -%% Bitmask and bitfilter can be used to verify that key any K is in -%% the range using the following inequality: -%% -%% K >= NextKey && (K band Bitmask) =:= Bitfilter. -%% -%% ...or `undefined' if the next key is outside the range. --spec next_range(keymapper(), [scalar_range()], key()) -> {key(), integer(), integer()} | undefined. -next_range(Keymapper, Filter0, PrevKey) -> - %% Key -> Vector -> +1 on vector -> Key - Filter = desugar_filter(Keymapper, Filter0), - PrevVec = key_to_vector(Keymapper, PrevKey), - case inc_vector(Filter, PrevVec) of - overflow -> - undefined; - NextVec -> - NewKey = vector_to_key(Keymapper, NextVec), - Bitmask = make_bitmask(Keymapper, Filter), - Bitfilter = NewKey band Bitmask, - {NewKey, Bitmask, Bitfilter} - end. - -spec bitstring_to_key(keymapper(), bitstring()) -> key(). bitstring_to_key(#keymapper{size = Size}, Bin) -> case Bin of @@ -280,60 +257,208 @@ bitstring_to_key(#keymapper{size = Size}, Bin) -> key_to_bitstring(#keymapper{size = Size}, Key) -> <>. +%% @doc Create a filter object that facilitates range scans. +-spec make_filter(keymapper(), [scalar_range()]) -> filter(). +make_filter(KeyMapper = #keymapper{schema = Schema, dim_sizeof = DimSizeof, size = Size}, Filter0) -> + NDim = length(DimSizeof), + %% Transform "symbolic" inequations to ranges: + Filter1 = inequations_to_ranges(KeyMapper, Filter0), + {Bitmask, Bitfilter} = make_bitfilter(KeyMapper, Filter1), + %% Calculate maximum source offset as per bitsource specification: + MaxOffset = lists:foldl( + fun({Dim, Offset, _Size}, Acc) -> + maps:update_with(Dim, fun(OldVal) -> max(OldVal, Offset) end, 0, Acc) + end, + #{}, + Schema + ), + %% Adjust minimum and maximum values for each interval like this: + %% + %% Min: 110100|101011 -> 110100|00000 + %% Max: 110101|001011 -> 110101|11111 + %% ^ + %% | + %% max offset + %% + %% This is needed so when we increment the vector, we always scan + %% the full range of least significant bits. + Filter2 = lists:map( + fun + ({{Val, Val}, _Dim}) -> + {Val, Val}; + ({{Min0, Max0}, Dim}) -> + Offset = maps:get(Dim, MaxOffset, 0), + %% Set least significant bits of Min to 0: + Min = (Min0 bsr Offset) bsl Offset, + %% Set least significant bits of Max to 1: + Max = Max0 bor ones(Offset), + {Min, Max} + end, + lists:zip(Filter1, lists:seq(1, NDim)) + ), + %% Project the vector into "bitsource coordinate system": + {_, Filter} = fold_bitsources( + fun(DstOffset, {Dim, SrcOffset, Size}, Acc) -> + {Min0, Max0} = lists:nth(Dim, Filter2), + Min = (Min0 bsr SrcOffset) band ones(Size), + Max = (Max0 bsr SrcOffset) band ones(Size), + Action = #filter_scan_action{ + offset = DstOffset, + size = Size, + min = Min, + max = Max + }, + [Action | Acc] + end, + [], + Schema + ), + Ranges = array:from_list(lists:reverse(Filter)), + %% Compute estimated upper and lower bounds of a _continous_ + %% interval where all keys lie: + case Filter of + [] -> + RangeMin = 0, + RangeMax = 0; + [#filter_scan_action{offset = MSBOffset, min = MSBMin, max = MSBMax} | _] -> + RangeMin = MSBMin bsl MSBOffset, + RangeMax = MSBMax bsl MSBOffset bor ones(MSBOffset) + end, + %% Final value + #filter{ + size = Size, + bitmask = Bitmask, + bitfilter = Bitfilter, + bitsource_ranges = Ranges, + range_min = RangeMin, + range_max = RangeMax + }. + +-spec ratchet(filter(), key()) -> key() | overflow. +ratchet(#filter{bitsource_ranges = Ranges, range_max = Max}, Key) when Key =< Max -> + NDim = array:size(Ranges), + case ratchet_scan(Ranges, NDim, Key, 0, _Pivot = {-1, 0}, _Carry = 0) of + overflow -> + overflow; + {Pivot, Increment} -> + ratchet_do(Ranges, Key, NDim - 1, Pivot, Increment) + end; +ratchet(_, _) -> + overflow. + +-spec bin_increment(filter(), binary()) -> binary() | overflow. +bin_increment(Filter = #filter{size = Size}, <<>>) -> + Key = ratchet(Filter, 0), + <>; +bin_increment(Filter = #filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter}, KeyBin) -> + <> = KeyBin, + Key1 = Key0 + 1, + if + Key1 band Bitmask =:= Bitfilter -> + %% TODO: check overflow + <>; + true -> + case ratchet(Filter, Key1) of + overflow -> + overflow; + Key -> + <> + end + end. + +-spec bin_checkmask(filter(), binary()) -> boolean(). +bin_checkmask(#filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter}, Key) -> + case Key of + <> -> + Int band Bitmask =:= Bitfilter; + _ -> + false + end. + %%================================================================================ %% Internal functions %%================================================================================ --spec make_bitmask(keymapper(), [{non_neg_integer(), non_neg_integer()}]) -> non_neg_integer(). -make_bitmask(Keymapper = #keymapper{dim_sizeof = DimSizeof}, Ranges) -> - BitmaskVector = lists:map( +%% Note: this function operates in bitsource basis, scanning it from 0 +%% to NDim (i.e. from the least significant bits to the most +%% significant bits) +ratchet_scan(_Ranges, NDim, _Key, NDim, Pivot, 0) -> + %% We've reached the end: + Pivot; +ratchet_scan(_Ranges, NDim, _Key, NDim, _Pivot, 1) -> + %% We've reached the end, but key is still not large enough: + overflow; +ratchet_scan(Ranges, NDim, Key, I, Pivot0, Carry) -> + #filter_scan_action{offset = Offset, size = Size, min = Min, max = Max} = array:get(I, Ranges), + %% Extract I-th element of the vector from the original key: + Elem = ((Key bsr Offset) band ones(Size)) + Carry, + if + Elem < Min -> + %% I-th coordinate is less than the specified minimum. + %% + %% We reset this coordinate to the minimum value. It means + %% we incremented this bitposition, the less significant + %% bits have to be reset to their respective minimum + %% values: + Pivot = {I + 1, 0}, + ratchet_scan(Ranges, NDim, Key, I + 1, Pivot, 0); + Elem > Max -> + %% I-th coordinate is larger than the specified + %% minimum. We can only fix this problem by incrementing + %% the next coordinate (i.e. more significant bits). + %% + %% We reset this coordinate to the minimum value, and + %% increment the next coordinate (by setting `Carry' to + %% 1). + Pivot = {I + 1, 1}, + ratchet_scan(Ranges, NDim, Key, I + 1, Pivot, 1); + true -> + %% Coordinate is within range: + ratchet_scan(Ranges, NDim, Key, I + 1, Pivot0, 0) + end. + +%% Note: this function operates in bitsource basis, scanning it from +%% NDim to 0. It applies the transformation specified by +%% `ratchet_scan'. +ratchet_do(Ranges, Key, I, _Pivot, _Increment) when I < 0 -> + 0; +ratchet_do(Ranges, Key, I, Pivot, Increment) -> + #filter_scan_action{offset = Offset, size = Size, min = Min} = array:get(I, Ranges), + Mask = ones(Offset + Size) bxor ones(Offset), + Elem = + if + I > Pivot -> + Mask band Key; + I =:= Pivot -> + (Mask band Key) + (Increment bsl Offset); + true -> + Min bsl Offset + end, + %% erlang:display( + %% {ratchet_do, I, integer_to_list(Key, 16), integer_to_list(Mask, 2), + %% integer_to_list(Elem, 16)} + %% ), + Elem bor ratchet_do(Ranges, Key, I - 1, Pivot, Increment). + +-spec make_bitfilter(keymapper(), [{non_neg_integer(), non_neg_integer()}]) -> + {non_neg_integer(), non_neg_integer()}. +make_bitfilter(Keymapper = #keymapper{dim_sizeof = DimSizeof}, Ranges) -> + L = lists:map( fun ({{N, N}, Bits}) -> %% For strict equality we can employ bitmask: - ones(Bits); + {ones(Bits), N}; (_) -> - 0 + {0, 0} end, lists:zip(Ranges, DimSizeof) ), - vector_to_key(Keymapper, BitmaskVector). - --spec inc_vector([{non_neg_integer(), non_neg_integer()}], vector()) -> vector() | overflow. -inc_vector(Filter, Vec0) -> - case normalize_vector(Filter, Vec0) of - {true, Vec} -> - Vec; - {false, Vec} -> - do_inc_vector(Filter, Vec, []) - end. - -do_inc_vector([], [], _Acc) -> - overflow; -do_inc_vector([{Min, Max} | Intervals], [Elem | Vec], Acc) -> - case Elem of - Max -> - do_inc_vector(Intervals, Vec, [Min | Acc]); - _ when Elem < Max -> - lists:reverse(Acc) ++ [Elem + 1 | Vec] - end. - -normalize_vector(Intervals, Vec0) -> - Vec = lists:map( - fun - ({{Min, _Max}, Elem}) when Min > Elem -> - Min; - ({{_Min, Max}, Elem}) when Max < Elem -> - Max; - ({_, Elem}) -> - Elem - end, - lists:zip(Intervals, Vec0) - ), - {Vec > Vec0, Vec}. + {Bitmask, Bitfilter} = lists:unzip(L), + {vector_to_key(Keymapper, Bitmask), vector_to_key(Keymapper, Bitfilter)}. %% Transform inequalities into a list of closed intervals that the %% vector elements should lie in. -desugar_filter(#keymapper{dim_sizeof = DimSizeof}, Filter) -> +inequations_to_ranges(#keymapper{dim_sizeof = DimSizeof}, Filter) -> lists:map( fun ({any, Bitsize}) -> @@ -390,24 +515,6 @@ ones(Bits) -> -ifdef(TEST). -%% %% Create a bitmask that is sufficient to cover a given number. E.g.: -%% %% -%% %% 2#1000 -> 2#1111; 2#0 -> 2#0; 2#10101 -> 2#11111 -%% bitmask_of(N) -> -%% %% FIXME: avoid floats -%% NBits = ceil(math:log2(N + 1)), -%% ones(NBits). - -%% bitmask_of_test() -> -%% ?assertEqual(2#0, bitmask_of(0)), -%% ?assertEqual(2#1, bitmask_of(1)), -%% ?assertEqual(2#11, bitmask_of(2#10)), -%% ?assertEqual(2#11, bitmask_of(2#11)), -%% ?assertEqual(2#1111, bitmask_of(2#1000)), -%% ?assertEqual(2#1111, bitmask_of(2#1111)), -%% ?assertEqual(ones(128), bitmask_of(ones(128))), -%% ?assertEqual(ones(256), bitmask_of(ones(256))). - make_keymapper0_test() -> Schema = [], ?assertEqual( @@ -510,235 +617,117 @@ key_to_vector2_test() -> key2vec(Schema, [0, 1]), key2vec(Schema, [255, 0]). -inc_vector0_test() -> - Keymapper = make_keymapper([]), - ?assertMatch(overflow, incvec(Keymapper, [], [])). - -inc_vector1_test() -> - Keymapper = make_keymapper([{1, 0, 8}]), - ?assertMatch([3], incvec(Keymapper, [{'=', 3}], [1])), - ?assertMatch([3], incvec(Keymapper, [{'=', 3}], [2])), - ?assertMatch(overflow, incvec(Keymapper, [{'=', 3}], [3])), - ?assertMatch(overflow, incvec(Keymapper, [{'=', 3}], [4])), - ?assertMatch(overflow, incvec(Keymapper, [{'=', 3}], [255])), - %% Now with >=: - ?assertMatch([1], incvec(Keymapper, [{'>=', 0}], [0])), - ?assertMatch([255], incvec(Keymapper, [{'>=', 0}], [254])), - ?assertMatch(overflow, incvec(Keymapper, [{'>=', 0}], [255])), - - ?assertMatch([100], incvec(Keymapper, [{'>=', 100}], [0])), - ?assertMatch([100], incvec(Keymapper, [{'>=', 100}], [99])), - ?assertMatch([255], incvec(Keymapper, [{'>=', 100}], [254])), - ?assertMatch(overflow, incvec(Keymapper, [{'>=', 100}], [255])). - -inc_vector2_test() -> - Keymapper = make_keymapper([{1, 0, 8}, {2, 0, 8}, {3, 0, 8}]), - Filter = [{'>=', 0}, {'=', 100}, {'>=', 30}], - ?assertMatch([0, 100, 30], incvec(Keymapper, Filter, [0, 0, 0])), - ?assertMatch([1, 100, 30], incvec(Keymapper, Filter, [0, 100, 30])), - ?assertMatch([255, 100, 30], incvec(Keymapper, Filter, [254, 100, 30])), - ?assertMatch([0, 100, 31], incvec(Keymapper, Filter, [255, 100, 30])), - ?assertMatch([0, 100, 30], incvec(Keymapper, Filter, [0, 100, 29])), - ?assertMatch(overflow, incvec(Keymapper, Filter, [255, 100, 255])), - ?assertMatch([255, 100, 255], incvec(Keymapper, Filter, [254, 100, 255])), - ?assertMatch([0, 100, 255], incvec(Keymapper, Filter, [255, 100, 254])), - %% Nasty cases (shouldn't happen, hopefully): - ?assertMatch([1, 100, 30], incvec(Keymapper, Filter, [0, 101, 0])), - ?assertMatch([1, 100, 33], incvec(Keymapper, Filter, [0, 101, 33])), - ?assertMatch([0, 100, 255], incvec(Keymapper, Filter, [255, 101, 254])), - ?assertMatch(overflow, incvec(Keymapper, Filter, [255, 101, 255])). - make_bitmask0_test() -> Keymapper = make_keymapper([]), - ?assertMatch(0, mkbmask(Keymapper, [])). + ?assertMatch({0, 0}, mkbmask(Keymapper, [])). make_bitmask1_test() -> Keymapper = make_keymapper([{1, 0, 8}]), - ?assertEqual(0, mkbmask(Keymapper, [any])), - ?assertEqual(16#ff, mkbmask(Keymapper, [{'=', 1}])), - ?assertEqual(16#ff, mkbmask(Keymapper, [{'=', 255}])), - ?assertEqual(0, mkbmask(Keymapper, [{'>=', 0}])), - ?assertEqual(0, mkbmask(Keymapper, [{'>=', 1}])), - ?assertEqual(0, mkbmask(Keymapper, [{'>=', 16#f}])). + ?assertEqual({0, 0}, mkbmask(Keymapper, [any])), + ?assertEqual({16#ff, 1}, mkbmask(Keymapper, [{'=', 1}])), + ?assertEqual({16#ff, 255}, mkbmask(Keymapper, [{'=', 255}])), + ?assertEqual({0, 0}, mkbmask(Keymapper, [{'>=', 0}])), + ?assertEqual({0, 0}, mkbmask(Keymapper, [{'>=', 1}])), + ?assertEqual({0, 0}, mkbmask(Keymapper, [{'>=', 16#f}])). make_bitmask2_test() -> Keymapper = make_keymapper([{1, 0, 3}, {2, 0, 4}, {3, 0, 2}]), - ?assertEqual(2#00_0000_000, mkbmask(Keymapper, [any, any, any])), - ?assertEqual(2#11_0000_000, mkbmask(Keymapper, [any, any, {'=', 0}])), - ?assertEqual(2#00_1111_000, mkbmask(Keymapper, [any, {'=', 0}, any])), - ?assertEqual(2#00_0000_111, mkbmask(Keymapper, [{'=', 0}, any, any])). + ?assertEqual({2#00_0000_000, 2#00_0000_000}, mkbmask(Keymapper, [any, any, any])), + ?assertEqual({2#11_0000_000, 2#00_0000_000}, mkbmask(Keymapper, [any, any, {'=', 0}])), + ?assertEqual({2#00_1111_000, 2#00_0000_000}, mkbmask(Keymapper, [any, {'=', 0}, any])), + ?assertEqual({2#00_0000_111, 2#00_0000_000}, mkbmask(Keymapper, [{'=', 0}, any, any])). make_bitmask3_test() -> %% Key format of type |TimeOffset|Topic|Epoch|: - Keymapper = make_keymapper([{1, 8, 8}, {2, 0, 8}, {1, 0, 8}]), - ?assertEqual(2#00000000_00000000_00000000, mkbmask(Keymapper, [any, any])), - ?assertEqual(2#11111111_11111111_11111111, mkbmask(Keymapper, [{'=', 33}, {'=', 22}])), - ?assertEqual(2#11111111_11111111_11111111, mkbmask(Keymapper, [{'=', 33}, {'=', 22}])), - ?assertEqual(2#00000000_11111111_00000000, mkbmask(Keymapper, [{'>=', 255}, {'=', 22}])). + Keymapper = make_keymapper([{1, 0, 8}, {2, 0, 8}, {1, 8, 8}]), + ?assertEqual({2#00000000_00000000_00000000, 16#00_00_00}, mkbmask(Keymapper, [any, any])), + ?assertEqual( + {2#11111111_11111111_11111111, 16#aa_cc_bb}, + mkbmask(Keymapper, [{'=', 16#aabb}, {'=', 16#cc}]) + ), + ?assertEqual( + {2#00000000_11111111_00000000, 16#00_bb_00}, mkbmask(Keymapper, [{'>=', 255}, {'=', 16#bb}]) + ). -next_range0_test() -> - Keymapper = make_keymapper([]), +make_filter_test() -> + KeyMapper = make_keymapper([]), Filter = [], - PrevKey = 0, - ?assertMatch(undefined, next_range(Keymapper, Filter, PrevKey)). + ?assertMatch(#filter{size = 0, bitmask = 0, bitfilter = 0}, make_filter(KeyMapper, Filter)). -next_range1_test() -> - Keymapper = make_keymapper([{1, 0, 8}, {2, 0, 8}]), - ?assertMatch(undefined, next_range(Keymapper, [{'=', 0}, {'=', 0}], 0)), - ?assertMatch({1, 16#ffff, 1}, next_range(Keymapper, [{'=', 1}, {'=', 0}], 0)), - ?assertMatch({16#100, 16#ffff, 16#100}, next_range(Keymapper, [{'=', 0}, {'=', 1}], 0)), - %% Now with any: - ?assertMatch({1, 0, 0}, next_range(Keymapper, [any, any], 0)), - ?assertMatch({2, 0, 0}, next_range(Keymapper, [any, any], 1)), - ?assertMatch({16#fffb, 0, 0}, next_range(Keymapper, [any, any], 16#fffa)), - %% Now with >=: +ratchet1_test() -> + Bitsources = [{1, 0, 8}], + M = make_keymapper(Bitsources), + F = make_filter(M, [any]), + #filter{bitsource_ranges = Rarr} = F, ?assertMatch( - {16#42_30, 16#ff00, 16#42_00}, next_range(Keymapper, [{'>=', 16#30}, {'=', 16#42}], 0) - ), - ?assertMatch( - {16#42_31, 16#ff00, 16#42_00}, - next_range(Keymapper, [{'>=', 16#30}, {'=', 16#42}], 16#42_30) + [ + #filter_scan_action{ + offset = 0, + size = 8, + min = 0, + max = 16#ff + } + ], + array:to_list(Rarr) ), + ?assertEqual(0, ratchet(F, 0)), + ?assertEqual(16#fa, ratchet(F, 16#fa)), + ?assertEqual(16#ff, ratchet(F, 16#ff)), + ?assertEqual(overflow, ratchet(F, 16#100), "TBD: filter must store the upper bound"). - ?assertMatch( - {16#30_42, 16#00ff, 16#00_42}, next_range(Keymapper, [{'=', 16#42}, {'>=', 16#30}], 0) - ), - ?assertMatch( - {16#31_42, 16#00ff, 16#00_42}, - next_range(Keymapper, [{'=', 16#42}, {'>=', 16#30}], 16#00_43) - ). +%% erlfmt-ignore +ratchet2_test() -> + Bitsources = [{1, 0, 8}, %% Static topic index + {2, 8, 8}, %% Epoch + {3, 0, 8}, %% Varying topic hash + {2, 0, 8}], %% Timestamp offset + M = make_keymapper(lists:reverse(Bitsources)), + F1 = make_filter(M, [{'=', 16#aa}, any, {'=', 16#cc}]), + ?assertEqual(16#aa00cc00, ratchet(F1, 0)), + ?assertEqual(16#aa01cc00, ratchet(F1, 16#aa00cd00)), + ?assertEqual(16#aa01cc11, ratchet(F1, 16#aa01cc11)), + ?assertEqual(16#aa11cc00, ratchet(F1, 16#aa10cd00)), + ?assertEqual(16#aa11cc00, ratchet(F1, 16#aa10dc11)), + ?assertEqual(overflow, ratchet(F1, 16#ab000000)), + F2 = make_filter(M, [{'=', 16#aa}, {'>=', 16#dddd}, {'=', 16#cc}]), + ?assertEqual(16#aaddcc00, ratchet(F2, 0)), + ?assertEqual(16#aa_de_cc_00, ratchet(F2, 16#aa_dd_cd_11)). -%% Bunch of tests that verifying that next_range doesn't skip over keys: +ratchet3_test() -> + ?assert(proper:quickcheck(ratchet1_prop(), 100)). --define(assertIterComplete(A, B), - ?assertEqual(A -- [0], B) -). +%% erlfmt-ignore +ratchet1_prop() -> + EpochBits = 4, + Bitsources = [{1, 0, 2}, %% Static topic index + {2, EpochBits, 4}, %% Epoch + {3, 0, 2}, %% Varying topic hash + {2, 0, EpochBits}], %% Timestamp offset + M = make_keymapper(lists:reverse(Bitsources)), + F1 = make_filter(M, [{'=', 2#10}, any, {'=', 2#01}]), + ?FORALL(N, integer(0, ones(12)), + ratchet_prop(F1, N)). --define(assertSameSet(A, B), - ?assertIterComplete(lists:sort(A), lists:sort(B)) -). - -iterate1_test() -> - SizeX = 3, - SizeY = 3, - Keymapper = make_keymapper([{1, 0, SizeX}, {2, 0, SizeY}]), - Keys = test_iteration(Keymapper, [any, any]), - Expected = [ - X bor (Y bsl SizeX) - || Y <- lists:seq(0, ones(SizeY)), X <- lists:seq(0, ones(SizeX)) - ], - ?assertIterComplete(Expected, Keys). - -iterate2_test() -> - SizeX = 64, - SizeY = 3, - Keymapper = make_keymapper([{1, 0, SizeX}, {2, 0, SizeY}]), - X = 123456789, - Keys = test_iteration(Keymapper, [{'=', X}, any]), - Expected = [ - X bor (Y bsl SizeX) - || Y <- lists:seq(0, ones(SizeY)) - ], - ?assertIterComplete(Expected, Keys). - -iterate3_test() -> - SizeX = 3, - SizeY = 64, - Y = 42, - Keymapper = make_keymapper([{1, 0, SizeX}, {2, 0, SizeY}]), - Keys = test_iteration(Keymapper, [any, {'=', Y}]), - Expected = [ - X bor (Y bsl SizeX) - || X <- lists:seq(0, ones(SizeX)) - ], - ?assertIterComplete(Expected, Keys). - -iterate4_test() -> - SizeX = 8, - SizeY = 4, - MinX = 16#fa, - MinY = 16#a, - Keymapper = make_keymapper([{1, 0, SizeX}, {2, 0, SizeY}]), - Keys = test_iteration(Keymapper, [{'>=', MinX}, {'>=', MinY}]), - Expected = [ - X bor (Y bsl SizeX) - || Y <- lists:seq(MinY, ones(SizeY)), X <- lists:seq(MinX, ones(SizeX)) - ], - ?assertIterComplete(Expected, Keys). - -iterate1_prop() -> - Size = 4, - ?FORALL( - {SizeX, SizeY}, - {integer(1, Size), integer(1, Size)}, - ?FORALL( - {SplitX, MinX, MinY}, - {integer(0, SizeX), integer(0, SizeX), integer(0, SizeY)}, - begin - Keymapper = make_keymapper([ - {1, 0, SplitX}, {2, 0, SizeY}, {1, SplitX, SizeX - SplitX} - ]), - Keys = test_iteration(Keymapper, [{'>=', MinX}, {'>=', MinY}]), - Expected = [ - vector_to_key(Keymapper, [X, Y]) - || X <- lists:seq(MinX, ones(SizeX)), - Y <- lists:seq(MinY, ones(SizeY)) - ], - ?assertSameSet(Expected, Keys), - true - end - ) - ). - -iterate5_test() -> - ?assert(proper:quickcheck(iterate1_prop(), 100)). - -iterate2_prop() -> - Size = 4, - ?FORALL( - {SizeX, SizeY}, - {integer(1, Size), integer(1, Size)}, - ?FORALL( - {SplitX, MinX, MinY}, - {integer(0, SizeX), integer(0, SizeX), integer(0, SizeY)}, - begin - Keymapper = make_keymapper([ - {1, SplitX, SizeX - SplitX}, {2, 0, SizeY}, {1, 0, SplitX} - ]), - Keys = test_iteration(Keymapper, [{'>=', MinX}, {'>=', MinY}]), - Expected = [ - vector_to_key(Keymapper, [X, Y]) - || X <- lists:seq(MinX, ones(SizeX)), - Y <- lists:seq(MinY, ones(SizeY)) - ], - ?assertSameSet(Expected, Keys), - true - end - ) - ). - -iterate6_test() -> - ?assert(proper:quickcheck(iterate2_prop(), 1000)). - -test_iteration(Keymapper, Filter) -> - test_iteration(Keymapper, Filter, 0). - -test_iteration(Keymapper, Filter, PrevKey) -> - case next_range(Keymapper, Filter, PrevKey) of - undefined -> - []; - {Key, Bitmask, Bitfilter} -> - ?assert((Key band Bitmask) =:= Bitfilter), - [Key | test_iteration(Keymapper, Filter, Key)] - end. +ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = Size}, Key0) -> + Key = ratchet(Filter, Key0), + ?assert(Key =:= overflow orelse (Key band Bitmask =:= Bitfilter)), + ?assert(Key >= Key0, {Key, '>=', Key}), + IMax = ones(Size), + CheckGaps = fun + F(I) when I >= Key; I > IMax -> + true; + F(I) -> + ?assertNot( + I band Bitmask =:= Bitfilter, + {found_gap, Key0, I, Key} + ), + F(I + 1) + end, + CheckGaps(Key0). mkbmask(Keymapper, Filter0) -> - Filter = desugar_filter(Keymapper, Filter0), - make_bitmask(Keymapper, Filter). - -incvec(Keymapper, Filter0, Vector) -> - Filter = desugar_filter(Keymapper, Filter0), - inc_vector(Filter, Vector). + Filter = inequations_to_ranges(Keymapper, Filter0), + make_bitfilter(Keymapper, Filter). key2vec(Schema, Vector) -> Keymapper = make_keymapper(Schema), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 7b8fbab0d..8d406c93e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -30,7 +30,7 @@ -export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]). %% internal exports: --export([]). +-export([format_key/2, format_keyfilter/1]). -export_type([options/0]). @@ -73,8 +73,7 @@ topic_filter :: emqx_ds:topic_filter(), start_time :: emqx_ds:time(), storage_key :: emqx_ds_lts:msg_storage_key(), - last_seen_key = 0 :: emqx_ds_bitmask_keymapper:key(), - key_filter :: [emqx_ds_bitmask_keymapper:scalar_range()] + last_seen_key = <<>> :: binary() }). -define(QUICKCHECK_KEY(KEY, BITMASK, BITFILTER), @@ -83,6 +82,8 @@ -define(COUNTER, emqx_ds_storage_bitfield_lts_counter). +-include("emqx_ds_bitmask.hrl"). + %%================================================================================ %% API funcions %%================================================================================ @@ -95,7 +96,8 @@ create(_ShardId, DBHandle, GenId, Options) -> %% Get options: BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64), TopicIndexBytes = maps:get(topic_index_bytes, Options, 4), - TSOffsetBits = maps:get(epoch_bits, Options, 8), %% TODO: change to 10 to make it around ~1 sec + %% 10 bits -> 1024 ms -> ~1 sec + TSOffsetBits = maps:get(epoch_bits, Options, 10), %% Create column families: DataCFName = data_cf(GenId), TrieCFName = trie_cf(GenId), @@ -120,17 +122,17 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), Trie = restore_trie(TopicIndexBytes, DBHandle, TrieCF), - %% If user's topics have more than learned 10 wildcard levels, - %% then it's total carnage; learned topic structure won't help - %% much: + %% If user's topics have more than learned 10 wildcard levels + %% (more than 2, really), then it's total carnage; learned topic + %% structure won't help. MaxWildcardLevels = 10, - Keymappers = array:from_list( + KeymapperCache = array:from_list( [ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) || N <- lists:seq(0, MaxWildcardLevels) ] ), - #s{db = DBHandle, data = DataCF, trie = Trie, keymappers = Keymappers}. + #s{db = DBHandle, data = DataCF, trie = Trie, keymappers = KeymapperCache}. store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> lists:foreach( @@ -144,16 +146,26 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter), - [ - #stream{ - storage_key = I - } - || I <- Indexes - ]. + [#stream{storage_key = I} || I <- Indexes]. make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, StartTime) -> + %% Note: it's a good idea to keep the iterator structure lean, + %% since it can be stored on a remote node that could update its + %% code independently from us. + {ok, #it{ + topic_filter = TopicFilter, + start_time = StartTime, + storage_key = StorageKey + }}. + +next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> + #it{ + start_time = StartTime, + storage_key = StorageKey + } = It0, + %% Make filter: {TopicIndex, Varying} = StorageKey, - Filter = [ + Inequations = [ {'=', TopicIndex}, {'>=', StartTime} | lists:map( @@ -166,29 +178,22 @@ make_iterator(_Shard, _Data, #stream{storage_key = StorageKey}, TopicFilter, Sta Varying ) ], - {ok, #it{ - topic_filter = TopicFilter, - start_time = StartTime, - storage_key = StorageKey, - key_filter = Filter - }}. - -next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> - #it{ - key_filter = KeyFilter - } = It0, - % TODO: ugh, so ugly - NVarying = length(KeyFilter) - 2, + %% Obtain a keymapper for the current number of varying + %% levels. Magic constant 2: we have two extra dimensions of topic + %% index and time; the rest of dimensions are varying levels. + NVarying = length(Inequations) - 2, Keymapper = array:get(NVarying, Keymappers), - %% Calculate lower and upper bounds for iteration: - LowerBound = lower_bound(Keymapper, KeyFilter), - UpperBound = upper_bound(Keymapper, KeyFilter), + Filter = + #filter{range_min = LowerBound, range_max = UpperBound} = emqx_ds_bitmask_keymapper:make_filter( + Keymapper, Inequations + ), {ok, ITHandle} = rocksdb:iterator(DB, CF, [ - {iterate_lower_bound, LowerBound}, {iterate_upper_bound, UpperBound} + {iterate_lower_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, LowerBound)}, + {iterate_upper_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, UpperBound)} ]), try put(?COUNTER, 0), - next_loop(ITHandle, Keymapper, It0, [], BatchSize) + next_loop(ITHandle, Keymapper, Filter, It0, [], BatchSize) after rocksdb:iterator_close(ITHandle), erase(?COUNTER) @@ -198,100 +203,64 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> %% Internal functions %%================================================================================ -next_loop(_, _, It, Acc, 0) -> +next_loop(ITHandle, KeyMapper, Filter, It, Acc, 0) -> {ok, It, lists:reverse(Acc)}; -next_loop(ITHandle, KeyMapper, It0 = #it{last_seen_key = Key0, key_filter = KeyFilter}, Acc0, N0) -> +next_loop(ITHandle, KeyMapper, Filter, It0, Acc0, N0) -> inc_counter(), - case next_range(KeyMapper, It0) of - {Key1, Bitmask, Bitfilter} when Key1 > Key0 -> - case iterator_move(KeyMapper, ITHandle, {seek, Key1}) of - {ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) -> - assert_progress(bitmask_match, KeyMapper, KeyFilter, Key0, Key1), - Msg = deserialize(Val), + #it{last_seen_key = Key0} = It0, + case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of + overflow -> + {ok, It0, lists:reverse(Acc0)}; + Key1 -> + %% assert + true = Key1 > Key0, + case rocksdb:iterator_move(ITHandle, {seek, Key1}) of + {ok, Key, Val} -> It1 = It0#it{last_seen_key = Key}, - case check_message(It1, Msg) of - true -> + case check_message(Filter, It1, Val) of + {true, Msg} -> N1 = N0 - 1, Acc1 = [Msg | Acc0]; false -> N1 = N0, Acc1 = Acc0 end, - {N, It, Acc} = traverse_interval( - ITHandle, KeyMapper, Bitmask, Bitfilter, It1, Acc1, N1 - ), - next_loop(ITHandle, KeyMapper, It, Acc, N); - {ok, Key, _Val} -> - assert_progress(bitmask_miss, KeyMapper, KeyFilter, Key0, Key1), - It = It0#it{last_seen_key = Key}, - next_loop(ITHandle, KeyMapper, It, Acc0, N0); + {N, It, Acc} = traverse_interval(ITHandle, KeyMapper, Filter, It1, Acc1, N1), + next_loop(ITHandle, KeyMapper, Filter, It, Acc, N); {error, invalid_iterator} -> {ok, It0, lists:reverse(Acc0)} - end; - _ -> - {ok, It0, lists:reverse(Acc0)} + end end. -traverse_interval(_, _, _, _, It, Acc, 0) -> +traverse_interval(_ITHandle, _KeyMapper, _Filter, It, Acc, 0) -> {0, It, Acc}; -traverse_interval(ITHandle, KeyMapper, Bitmask, Bitfilter, It0, Acc, N) -> +traverse_interval(ITHandle, KeyMapper, Filter, It0, Acc, N) -> inc_counter(), - case iterator_move(KeyMapper, ITHandle, next) of - {ok, Key, Val} when ?QUICKCHECK_KEY(Key, Bitmask, Bitfilter) -> - Msg = deserialize(Val), + case rocksdb:iterator_move(ITHandle, next) of + {ok, Key, Val} -> It = It0#it{last_seen_key = Key}, - case check_message(It, Msg) of - true -> - traverse_interval( - ITHandle, KeyMapper, Bitmask, Bitfilter, It, [Msg | Acc], N - 1 - ); + case check_message(Filter, It, Val) of + {true, Msg} -> + traverse_interval(ITHandle, KeyMapper, Filter, It, [Msg | Acc], N - 1); false -> - traverse_interval(ITHandle, KeyMapper, Bitmask, Bitfilter, It, Acc, N) + traverse_interval(ITHandle, KeyMapper, Filter, It, Acc, N) end; - {ok, Key, _Val} -> - It = It0#it{last_seen_key = Key}, - {N, It, Acc}; {error, invalid_iterator} -> {0, It0, Acc} end. -next_range(KeyMapper, #it{key_filter = KeyFilter, last_seen_key = PrevKey}) -> - emqx_ds_bitmask_keymapper:next_range(KeyMapper, KeyFilter, PrevKey). - -check_message(_Iterator, _Msg) -> - %% TODO. - true. - -iterator_move(KeyMapper, ITHandle, Action0) -> - Action = - case Action0 of - next -> - next; - {seek, Int} -> - {seek, emqx_ds_bitmask_keymapper:key_to_bitstring(KeyMapper, Int)} - end, - case rocksdb:iterator_move(ITHandle, Action) of - {ok, KeyBin, Val} -> - {ok, emqx_ds_bitmask_keymapper:bitstring_to_key(KeyMapper, KeyBin), Val}; - {ok, KeyBin} -> - {ok, emqx_ds_bitmask_keymapper:bitstring_to_key(KeyMapper, KeyBin)}; - Other -> - Other +-spec check_message(emqx_ds_bitmask_keymapper:filter(), #it{}, binary()) -> + {true, #message{}} | false. +check_message(Filter, #it{last_seen_key = Key}, Val) -> + case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of + true -> + Msg = deserialize(Val), + %% TODO: check strict time and hash collisions + {true, Msg}; + false -> + false end. -assert_progress(_Msg, _KeyMapper, _KeyFilter, Key0, Key1) when Key1 > Key0 -> - ?tp_ignore_side_effects_in_prod( - emqx_ds_storage_bitfield_lts_iter_move, - #{ location => _Msg - , key0 => format_key(_KeyMapper, Key0) - , key1 => format_key(_KeyMapper, Key1) - }), - ok; -assert_progress(Msg, KeyMapper, KeyFilter, Key0, Key1) -> - Str0 = format_key(KeyMapper, Key0), - Str1 = format_key(KeyMapper, Key1), - error(#{'$msg' => Msg, key0 => Str0, key1 => Str1, step => get(?COUNTER), keyfilter => lists:map(fun format_keyfilter/1, KeyFilter)}). - format_key(KeyMapper, Key) -> Vec = [integer_to_list(I, 16) || I <- emqx_ds_bitmask_keymapper:key_to_vector(KeyMapper, Key)], lists:flatten(io_lib:format("~.16B (~s)", [Key, string:join(Vec, ",")])). @@ -357,16 +326,6 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) -> end, Keymapper. -upper_bound(Keymapper, [TopicIndex | Rest]) -> - filter_to_key(Keymapper, [TopicIndex | [{'=', infinity} || _ <- Rest]]). - -lower_bound(Keymapper, [TopicIndex | Rest]) -> - filter_to_key(Keymapper, [TopicIndex | [{'=', 0} || _ <- Rest]]). - -filter_to_key(KeyMapper, KeyFilter) -> - {Key, _, _} = emqx_ds_bitmask_keymapper:next_range(KeyMapper, KeyFilter, 0), - emqx_ds_bitmask_keymapper:key_to_bitstring(KeyMapper, Key). - -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie(). restore_trie(TopicIndexBytes, DB, CF) -> PersistCallback = fun(Key, Val) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl_ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl_ deleted file mode 100644 index 32f18d18b..000000000 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl_ +++ /dev/null @@ -1,714 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_ds_storage_layer). - --behaviour(gen_server). - -%% API: --export([start_link/2]). --export([create_generation/3]). - --export([open_shard/2, get_streams/3]). --export([message_store/3]). --export([delete/4]). - --export([make_iterator/3, next/1, next/2]). - --export([ - preserve_iterator/2, - restore_iterator/2, - discard_iterator/2, - ensure_iterator/3, - discard_iterator_prefix/2, - list_iterator_prefix/2, - foldl_iterator_prefix/4 -]). - -%% gen_server callbacks: --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). - --export_type([stream/0, cf_refs/0, gen_id/0, options/0, state/0, iterator/0]). --export_type([db_options/0, db_write_options/0, db_read_options/0]). - --compile({inline, [meta_lookup/2]}). - --include_lib("emqx/include/emqx.hrl"). - -%%================================================================================ -%% Type declarations -%%================================================================================ - --type options() :: #{ - dir => file:filename() -}. - -%% see rocksdb:db_options() --type db_options() :: proplists:proplist(). -%% see rocksdb:write_options() --type db_write_options() :: proplists:proplist(). -%% see rocksdb:read_options() --type db_read_options() :: proplists:proplist(). - --type cf_refs() :: [{string(), rocksdb:cf_handle()}]. - -%% Message storage generation -%% Keep in mind that instances of this type are persisted in long-term storage. --type generation() :: #{ - %% Module that handles data for the generation - module := module(), - %% Module-specific data defined at generation creation time - data := term(), - %% When should this generation become active? - %% This generation should only contain messages timestamped no earlier than that. - %% The very first generation will have `since` equal 0. - since := emqx_ds:time() -}. - --record(s, { - shard :: emqx_ds:shard(), - keyspace :: emqx_ds_conf:keyspace(), - db :: rocksdb:db_handle(), - cf_iterator :: rocksdb:cf_handle(), - cf_generations :: cf_refs() -}). - --record(stream, - { generation :: gen_id() - , topic_filter :: emqx_ds:topic_filter() - , since :: emqx_ds:time() - , enc :: _EncapsultatedData - }). - --opaque stream() :: #stream{}. - --record(it, { - shard :: emqx_ds:shard(), - gen :: gen_id(), - replay :: emqx_ds:replay(), - module :: module(), - data :: term() -}). - --type gen_id() :: 0..16#ffff. - --opaque state() :: #s{}. --opaque iterator() :: #it{}. - -%% Contents of the default column family: -%% -%% [{<<"genNN">>, #generation{}}, ..., -%% {<<"current">>, GenID}] - --define(DEFAULT_CF, "default"). --define(DEFAULT_CF_OPTS, []). - --define(ITERATOR_CF, "$iterators"). - -%% TODO -%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`. -%% 2. Supposedly might be compressed _very_ effectively. -%% 3. `inplace_update_support`? --define(ITERATOR_CF_OPTS, []). - --define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). - -%%================================================================================ -%% Callbacks -%%================================================================================ - --callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) -> - {_Schema, cf_refs()}. - --callback open( - emqx_ds:shard(), - rocksdb:db_handle(), - gen_id(), - cf_refs(), - _Schema -) -> - _DB. - --callback store( - _DB, - _MessageID :: binary(), - emqx_ds:time(), - emqx_ds:topic(), - _Payload :: binary() -) -> - ok | {error, _}. - --callback delete(_DB, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) -> - ok | {error, _}. - --callback get_streams(_DB, emqx_ds:topic_filter(), emqx_ds:time()) -> - [{_TopicRankX, _Stream}]. - --callback make_iterator(_DB, emqx_ds:replay()) -> - {ok, _It} | {error, _}. - --callback restore_iterator(_DB, _Serialized :: binary()) -> {ok, _It} | {error, _}. - --callback preserve_iterator(_It) -> term(). - --callback next(It) -> {value, binary(), It} | none | {error, closed}. - -%%================================================================================ -%% Replication layer API -%%================================================================================ - --spec open_shard(emqx_ds_replication_layer:shard(), emqx_ds_storage_layer:options()) -> ok. -open_shard(Shard, Options) -> - emqx_ds_storage_layer_sup:ensure_shard(Shard, Options). - --spec get_streams(emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), _Stream}]. -get_streams(Shard, TopicFilter, StartTime) -> - %% TODO: lookup ALL generations - {GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, StartTime), - lists:map( - fun({RankX, ModStream}) -> - Stream = #stream{ generation = GenId - , topic_filter = TopicFilter - , since = StartTime - , enc = ModStream - }, - Rank = {RankX, GenId}, - {Rank, Stream} - end, - Mod:get_streams(ModState, TopicFilter, StartTime)). - --spec message_store(emqx_ds:shard(), [emqx_types:message()], emqx_ds:message_store_opts()) -> - {ok, _MessageId} | {error, _}. -message_store(Shard, Msgs, _Opts) -> - {ok, lists:map( - fun(Msg) -> - GUID = emqx_message:id(Msg), - Timestamp = Msg#message.timestamp, - {_GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, Timestamp), - Topic = emqx_topic:words(emqx_message:topic(Msg)), - Payload = serialize(Msg), - Mod:store(ModState, GUID, Timestamp, Topic, Payload), - GUID - end, - Msgs)}. - --spec next(iterator()) -> {ok, iterator(), [binary()]} | end_of_stream. -next(It = #it{}) -> - next(It, _BatchSize = 1). - --spec next(iterator(), pos_integer()) -> {ok, iterator(), [binary()]} | end_of_stream. -next(#it{data = {?MODULE, end_of_stream}}, _BatchSize) -> - end_of_stream; -next( - It = #it{shard = Shard, module = Mod, gen = Gen, data = {?MODULE, retry, Serialized}}, BatchSize -) -> - #{data := DBData} = meta_get_gen(Shard, Gen), - {ok, ItData} = Mod:restore_iterator(DBData, Serialized), - next(It#it{data = ItData}, BatchSize); -next(It = #it{}, BatchSize) -> - do_next(It, BatchSize, _Acc = []). - -%%================================================================================ -%% API functions -%%================================================================================ - --spec create_generation( - emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config() -) -> - {ok, gen_id()} | {error, nonmonotonic}. -create_generation(ShardId, Since, Config = {_Module, _Options}) -> - gen_server:call(?REF(ShardId), {create_generation, Since, Config}). - --spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) -> - ok | {error, _}. -delete(Shard, GUID, Time, Topic) -> - {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), - Mod:delete(Data, GUID, Time, Topic). - --spec make_iterator(emqx_ds:shard(), stream(), emqx_ds:time()) -> - {ok, iterator()} | {error, _TODO}. -make_iterator(Shard, Stream, StartTime) -> - #stream{ topic_filter = TopicFilter - , since = Since - , enc = Enc - } = Stream, - {GenId, Gen} = meta_lookup_gen(Shard, StartTime), - Replay = {TopicFilter, Since}, - case Mod:make_iterator(Data, Replay, Options) of - #it{ gen = GenId, - replay = {TopicFilter, Since} - }. - --spec do_next(iterator(), non_neg_integer(), [binary()]) -> - {ok, iterator(), [binary()]} | end_of_stream. -do_next(It, N, Acc) when N =< 0 -> - {ok, It, lists:reverse(Acc)}; -do_next(It = #it{module = Mod, data = ItData}, N, Acc) -> - case Mod:next(ItData) of - {value, Bin, ItDataNext} -> - Val = deserialize(Bin), - do_next(It#it{data = ItDataNext}, N - 1, [Val | Acc]); - {error, _} = _Error -> - %% todo: log? - %% iterator might be invalid now; will need to re-open it. - Serialized = Mod:preserve_iterator(ItData), - {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)}; - none -> - case open_next_iterator(It) of - {ok, ItNext} -> - do_next(ItNext, N, Acc); - {error, _} = _Error -> - %% todo: log? - %% fixme: only bad options may lead to this? - %% return an "empty" iterator to be re-opened when retrying? - Serialized = Mod:preserve_iterator(ItData), - {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)}; - none -> - case Acc of - [] -> - end_of_stream; - _ -> - {ok, It#it{data = {?MODULE, end_of_stream}}, lists:reverse(Acc)} - end - end - end. - --spec preserve_iterator(iterator(), emqx_ds:iterator_id()) -> - ok | {error, _TODO}. -preserve_iterator(It = #it{}, IteratorID) -> - iterator_put_state(IteratorID, It). - --spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> - {ok, iterator()} | {error, _TODO}. -restore_iterator(Shard, ReplayID) -> - case iterator_get_state(Shard, ReplayID) of - {ok, Serial} -> - restore_iterator_state(Shard, Serial); - not_found -> - {error, not_found}; - {error, _Reason} = Error -> - Error - end. - --spec ensure_iterator(emqx_ds:shard(), emqx_ds:iterator_id(), emqx_ds:replay()) -> - {ok, iterator()} | {error, _TODO}. -ensure_iterator(Shard, IteratorID, Replay = {_TopicFilter, _StartMS}) -> - case restore_iterator(Shard, IteratorID) of - {ok, It} -> - {ok, It}; - {error, not_found} -> - {ok, It} = make_iterator(Shard, Replay), - ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), - {ok, It}; - Error -> - Error - end. - --spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> - ok | {error, _TODO}. -discard_iterator(Shard, ReplayID) -> - iterator_delete(Shard, ReplayID). - --spec discard_iterator_prefix(emqx_ds:shard(), binary()) -> - ok | {error, _TODO}. -discard_iterator_prefix(Shard, KeyPrefix) -> - case do_discard_iterator_prefix(Shard, KeyPrefix) of - {ok, _} -> ok; - Error -> Error - end. - --spec list_iterator_prefix( - emqx_ds:shard(), - binary() -) -> {ok, [emqx_ds:iterator_id()]} | {error, _TODO}. -list_iterator_prefix(Shard, KeyPrefix) -> - do_list_iterator_prefix(Shard, KeyPrefix). - --spec foldl_iterator_prefix( - emqx_ds:shard(), - binary(), - fun((_Key :: binary(), _Value :: binary(), Acc) -> Acc), - Acc -) -> {ok, Acc} | {error, _TODO} when - Acc :: term(). -foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> - do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc). - -%%================================================================================ -%% gen_server -%%================================================================================ - --spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> - {ok, pid()}. -start_link(Shard, Options) -> - gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). - -init({Shard, Options}) -> - process_flag(trap_exit, true), - {ok, S0} = do_open_db(Shard, Options), - S = ensure_current_generation(S0), - ok = populate_metadata(S), - {ok, S}. - -handle_call({create_generation, Since, Config}, _From, S) -> - case create_new_gen(Since, Config, S) of - {ok, GenId, NS} -> - {reply, {ok, GenId}, NS}; - {error, _} = Error -> - {reply, Error, S} - end; -handle_call(_Call, _From, S) -> - {reply, {error, unknown_call}, S}. - -handle_cast(_Cast, S) -> - {noreply, S}. - -handle_info(_Info, S) -> - {noreply, S}. - -terminate(_Reason, #s{db = DB, shard = Shard}) -> - meta_erase(Shard), - ok = rocksdb:close(DB). - -%%================================================================================ -%% Internal functions -%%================================================================================ - --record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}). - --spec populate_metadata(state()) -> ok. -populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) -> - ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}), - Current = schema_get_current(DBHandle), - lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)). - --spec populate_metadata(gen_id(), state()) -> ok. -populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) -> - Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S), - meta_register_gen(Shard, GenId, Gen). - --spec ensure_current_generation(state()) -> state(). -ensure_current_generation(S = #s{shard = _Shard, keyspace = Keyspace, db = DBHandle}) -> - case schema_get_current(DBHandle) of - undefined -> - Config = emqx_ds_conf:keyspace_config(Keyspace), - {ok, _, NS} = create_new_gen(0, Config, S), - NS; - _GenId -> - S - end. - --spec create_new_gen(emqx_ds:time(), emqx_ds_conf:backend_config(), state()) -> - {ok, gen_id(), state()} | {error, nonmonotonic}. -create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) -> - GenId = get_next_id(meta_get_current(Shard)), - GenId = get_next_id(schema_get_current(DBHandle)), - case is_gen_valid(Shard, GenId, Since) of - ok -> - {ok, Gen, NS} = create_gen(GenId, Since, Config, S), - %% TODO: Transaction? Column family creation can't be transactional, anyway. - ok = schema_put_gen(DBHandle, GenId, Gen), - ok = schema_put_current(DBHandle, GenId), - ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)), - {ok, GenId, NS}; - {error, _} = Error -> - Error - end. - --spec create_gen(gen_id(), emqx_ds:time(), emqx_ds_conf:backend_config(), state()) -> - {ok, generation(), state()}. -create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) -> - % TODO: Backend implementation should ensure idempotency. - {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options), - Gen = #{ - module => Module, - data => Schema, - since => Since - }, - {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. - --spec do_open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. -do_open_db(Shard, Options) -> - DefaultDir = binary_to_list(Shard), - DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)), - %% TODO: properly forward keyspace - Keyspace = maps:get(keyspace, Options, default_keyspace), - DBOptions = [ - {create_if_missing, true}, - {create_missing_column_families, true} - | emqx_ds_conf:db_options(Keyspace) - ], - _ = filelib:ensure_dir(DBDir), - ExistingCFs = - case rocksdb:list_column_families(DBDir, DBOptions) of - {ok, CFs} -> - [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF]; - % DB is not present. First start - {error, {db_open, _}} -> - [] - end, - ColumnFamilies = [ - {?DEFAULT_CF, ?DEFAULT_CF_OPTS}, - {?ITERATOR_CF, ?ITERATOR_CF_OPTS} - | ExistingCFs - ], - case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of - {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} -> - {CFNames, _} = lists:unzip(ExistingCFs), - {ok, #s{ - shard = Shard, - keyspace = Keyspace, - db = DBHandle, - cf_iterator = CFIterator, - cf_generations = lists:zip(CFNames, CFRefs) - }}; - Error -> - Error - end. - --spec open_gen(gen_id(), generation(), state()) -> generation(). -open_gen( - GenId, - Gen = #{module := Mod, data := Data}, - #s{shard = Shard, db = DBHandle, cf_generations = CFs} -) -> - DB = Mod:open(Shard, DBHandle, GenId, CFs, Data), - Gen#{data := DB}. - --spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. -open_next_iterator(It = #it{shard = Shard, gen = GenId}) -> - open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}). - -open_next_iterator(undefined, _It) -> - none; -open_next_iterator(Gen = #{}, It) -> - open_iterator(Gen, It). - --spec open_restore_iterator(generation(), iterator(), binary()) -> - {ok, iterator()} | {error, _Reason}. -open_restore_iterator(#{module := Mod, data := Data}, It = #it{}, Serial) -> - case Mod:restore_iterator(Data, Serial) of - {ok, ItData} -> - {ok, It#it{module = Mod, data = ItData}}; - Err -> - Err - end. - -%% - --define(KEY_REPLAY_STATE(IteratorId), <<(IteratorId)/binary, "rs">>). --define(KEY_REPLAY_STATE_PAT(KeyReplayState), begin - <> = (KeyReplayState), - IteratorId -end). - --define(ITERATION_WRITE_OPTS, []). --define(ITERATION_READ_OPTS, []). - -iterator_get_state(Shard, ReplayID) -> - #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), - rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS). - -iterator_put_state(ID, It = #it{shard = Shard}) -> - #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), - Serial = preserve_iterator_state(It), - rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS). - -iterator_delete(Shard, ID) -> - #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), - rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS). - -preserve_iterator_state(#it{ - gen = Gen, - replay = {TopicFilter, StartTime}, - module = Mod, - data = ItData -}) -> - term_to_binary(#{ - v => 1, - gen => Gen, - filter => TopicFilter, - start => StartTime, - st => Mod:preserve_iterator(ItData) - }). - -restore_iterator_state(Shard, Serial) when is_binary(Serial) -> - restore_iterator_state(Shard, binary_to_term(Serial)); -restore_iterator_state( - Shard, - #{ - v := 1, - gen := Gen, - filter := TopicFilter, - start := StartTime, - st := State - } -) -> - It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}}, - open_restore_iterator(meta_get_gen(Shard, Gen), It, State). - -do_list_iterator_prefix(Shard, KeyPrefix) -> - Fn = fun(K0, _V, Acc) -> - K = ?KEY_REPLAY_STATE_PAT(K0), - [K | Acc] - end, - do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, []). - -do_discard_iterator_prefix(Shard, KeyPrefix) -> - #db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db), - Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end, - do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, ok). - -do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> - #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), - case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of - {ok, It} -> - NextAction = {seek, KeyPrefix}, - do_foldl_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction, Fn, Acc); - Error -> - Error - end. - -do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction, Fn, Acc) -> - case rocksdb:iterator_move(It, NextAction) of - {ok, K = <>, V} -> - NewAcc = Fn(K, V, Acc), - do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, next, Fn, NewAcc); - {ok, _K, _V} -> - ok = rocksdb:iterator_close(It), - {ok, Acc}; - {error, invalid_iterator} -> - ok = rocksdb:iterator_close(It), - {ok, Acc}; - Error -> - ok = rocksdb:iterator_close(It), - Error - end. - -%% Functions for dealing with the metadata stored persistently in rocksdb - --define(CURRENT_GEN, <<"current">>). --define(SCHEMA_WRITE_OPTS, []). --define(SCHEMA_READ_OPTS, []). - --spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation(). -schema_get_gen(DBHandle, GenId) -> - {ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS), - binary_to_term(Bin). - --spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}. -schema_put_gen(DBHandle, GenId, Gen) -> - rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS). - --spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined. -schema_get_current(DBHandle) -> - case rocksdb:get(DBHandle, ?CURRENT_GEN, ?SCHEMA_READ_OPTS) of - {ok, Bin} -> - binary_to_integer(Bin); - not_found -> - undefined - end. - --spec schema_put_current(rocksdb:db_handle(), gen_id()) -> ok | {error, _}. -schema_put_current(DBHandle, GenId) -> - rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS). - --spec schema_gen_key(integer()) -> binary(). -schema_gen_key(N) -> - <<"gen", N:32>>. - --undef(CURRENT_GEN). --undef(SCHEMA_WRITE_OPTS). --undef(SCHEMA_READ_OPTS). - -%% Functions for dealing with the runtime shard metadata: - --define(PERSISTENT_TERM(SHARD, GEN), {emqx_ds_storage_layer, SHARD, GEN}). - --spec meta_register_gen(emqx_ds:shard(), gen_id(), generation()) -> ok. -meta_register_gen(Shard, GenId, Gen) -> - Gs = - case GenId > 0 of - true -> meta_lookup(Shard, GenId - 1); - false -> [] - end, - ok = meta_put(Shard, GenId, [Gen | Gs]), - ok = meta_put(Shard, current, GenId). - --spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}. -meta_lookup_gen(Shard, Time) -> - %% TODO - %% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning - %% towards a "no". - Current = meta_lookup(Shard, current), - Gens = meta_lookup(Shard, Current), - find_gen(Time, Current, Gens). - -find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since -> - {GenId, Gen}; -find_gen(Time, GenId, [_Gen | Rest]) -> - find_gen(Time, GenId - 1, Rest). - --spec meta_get_gen(emqx_ds:shard(), gen_id()) -> generation() | undefined. -meta_get_gen(Shard, GenId) -> - case meta_lookup(Shard, GenId, []) of - [Gen | _Older] -> Gen; - [] -> undefined - end. - --spec meta_get_current(emqx_ds:shard()) -> gen_id() | undefined. -meta_get_current(Shard) -> - meta_lookup(Shard, current, undefined). - --spec meta_lookup(emqx_ds:shard(), _K) -> _V. -meta_lookup(Shard, Key) -> - persistent_term:get(?PERSISTENT_TERM(Shard, Key)). - --spec meta_lookup(emqx_ds:shard(), _K, Default) -> _V | Default. -meta_lookup(Shard, K, Default) -> - persistent_term:get(?PERSISTENT_TERM(Shard, K), Default). - --spec meta_put(emqx_ds:shard(), _K, _V) -> ok. -meta_put(Shard, K, V) -> - persistent_term:put(?PERSISTENT_TERM(Shard, K), V). - --spec meta_erase(emqx_ds:shard()) -> ok. -meta_erase(Shard) -> - [ - persistent_term:erase(K) - || {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard - ], - ok. - --undef(PERSISTENT_TERM). - -get_next_id(undefined) -> 0; -get_next_id(GenId) -> GenId + 1. - -is_gen_valid(Shard, GenId, Since) when GenId > 0 -> - [GenPrev | _] = meta_lookup(Shard, GenId - 1), - case GenPrev of - #{since := SincePrev} when Since > SincePrev -> - ok; - #{} -> - {error, nonmonotonic} - end; -is_gen_valid(_Shard, 0, 0) -> - ok. - -serialize(Msg) -> - %% TODO: remove topic, GUID, etc. from the stored - %% message. Reconstruct it from the metadata. - term_to_binary(emqx_message:to_map(Msg)). - -deserialize(Bin) -> - emqx_message:from_map(binary_to_term(Bin)). - - -%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. -%% store_cfs(DBHandle, CFRefs) -> -%% lists:foreach( -%% fun({CFName, CFRef}) -> -%% persistent_term:put({self(), CFName}, {DBHandle, CFRef}) -%% end, -%% CFRefs). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_bitmask.erl_ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_bitmask.erl_ deleted file mode 100644 index bdf5a1453..000000000 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_bitmask.erl_ +++ /dev/null @@ -1,748 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_ds_message_storage_bitmask). - -%%================================================================================ -%% @doc Description of the schema -%% -%% Let us assume that `T' is a topic and `t' is time. These are the two -%% dimensions used to index messages. They can be viewed as -%% "coordinates" of an MQTT message in a 2D space. -%% -%% Oftentimes, when wildcard subscription is used, keys must be -%% scanned in both dimensions simultaneously. -%% -%% Rocksdb allows to iterate over sorted keys very fast. This means we -%% need to map our two-dimentional keys to a single index that is -%% sorted in a way that helps to iterate over both time and topic -%% without having to do a lot of random seeks. -%% -%% == Mapping of 2D keys to rocksdb keys == -%% -%% We use "zigzag" pattern to store messages, where rocksdb key is -%% composed like like this: -%% -%% |ttttt|TTTTTTTTT|tttt| -%% ^ ^ ^ -%% | | | -%% +-------+ | +---------+ -%% | | | -%% most significant topic hash least significant -%% bits of timestamp bits of timestamp -%% (a.k.a epoch) (a.k.a time offset) -%% -%% Topic hash is level-aware: each topic level is hashed separately -%% and the resulting hashes are bitwise-concatentated. This allows us -%% to map topics to fixed-length bitstrings while keeping some degree -%% of information about the hierarchy. -%% -%% Next important concept is what we call "epoch". Duration of the -%% epoch is determined by maximum time offset. Epoch is calculated by -%% shifting bits of the timestamp right. -%% -%% The resulting index is a space-filling curve that looks like -%% this in the topic-time 2D space: -%% -%% T ^ ---->------ |---->------ |---->------ -%% | --/ / --/ / --/ -%% | -<-/ | -<-/ | -<-/ -%% | -/ | -/ | -/ -%% | ---->------ | ---->------ | ---->------ -%% | --/ / --/ / --/ -%% | ---/ | ---/ | ---/ -%% | -/ ^ -/ ^ -/ -%% | ---->------ | ---->------ | ---->------ -%% | --/ / --/ / --/ -%% | -<-/ | -<-/ | -<-/ -%% | -/ | -/ | -/ -%% | ---->------| ---->------| ----------> -%% | -%% -+------------+-----------------------------> t -%% epoch -%% -%% This structure allows to quickly seek to a the first message that -%% was recorded in a certain epoch in a certain topic or a -%% group of topics matching filter like `foo/bar/#`. -%% -%% Due to its structure, for each pair of rocksdb keys K1 and K2, such -%% that K1 > K2 and topic(K1) = topic(K2), timestamp(K1) > -%% timestamp(K2). -%% That is, replay doesn't reorder messages published in each -%% individual topic. -%% -%% This property doesn't hold between different topics, but it's not deemed -%% a problem right now. -%% -%%================================================================================ - --behaviour(emqx_ds_storage_layer). - -%% API: --export([create_new/3, open/5]). --export([make_keymapper/1]). - --export([store/5, delete/4]). - --export([get_streams/3, make_iterator/3, next/1]). - --export([preserve_iterator/1, restore_iterator/2, refresh_iterator/1]). - -%% Debug/troubleshooting: -%% Keymappers --export([ - keymapper_info/1, - compute_bitstring/3, - compute_topic_bitmask/2, - compute_time_bitmask/1, - hash/2 -]). - -%% Keyspace filters --export([ - make_keyspace_filter/2, - compute_initial_seek/1, - compute_next_seek/2, - compute_time_seek/3, - compute_topic_seek/4 -]). - --export_type([db/0, stream/0, iterator/0, schema/0]). - --export_type([options/0]). --export_type([iteration_options/0]). - --compile( - {inline, [ - bitwise_concat/3, - ones/1, - successor/1, - topic_hash_matches/3, - time_matches/3 - ]} -). - -%%================================================================================ -%% Type declarations -%%================================================================================ - --opaque stream() :: emqx_ds:topic_filter(). - --type topic() :: emqx_ds:topic(). --type topic_filter() :: emqx_ds:topic_filter(). --type time() :: emqx_ds:time(). - -%% Number of bits --type bits() :: non_neg_integer(). - -%% Key of a RocksDB record. --type key() :: binary(). - -%% Distribution of entropy among topic levels. -%% Example: [4, 8, 16] means that level 1 gets 4 bits, level 2 gets 8 bits, -%% and _rest of levels_ (if any) get 16 bits. --type bits_per_level() :: [bits(), ...]. - --type options() :: #{ - %% Number of bits in a message timestamp. - timestamp_bits := bits(), - %% Number of bits in a key allocated to each level in a message topic. - topic_bits_per_level := bits_per_level(), - %% Maximum granularity of iteration over time. - epoch := time(), - - iteration => iteration_options(), - - cf_options => emqx_ds_storage_layer:db_cf_options() -}. - --type iteration_options() :: #{ - %% Request periodic iterator refresh. - %% This might be helpful during replays taking a lot of time (e.g. tens of seconds). - %% Note that `{every, 1000}` means 1000 _operations_ with the iterator which is not - %% the same as 1000 replayed messages. - iterator_refresh => {every, _NumOperations :: pos_integer()} -}. - -%% Persistent configuration of the generation, it is used to create db -%% record when the database is reopened --record(schema, {keymapper :: keymapper()}). - --opaque schema() :: #schema{}. - --record(db, { - shard :: emqx_ds:shard(), - handle :: rocksdb:db_handle(), - cf :: rocksdb:cf_handle(), - keymapper :: keymapper(), - write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(), - read_options = [] :: emqx_ds_storage_layer:db_read_options() -}). - --record(it, { - handle :: rocksdb:itr_handle(), - filter :: keyspace_filter(), - cursor :: binary() | undefined, - next_action :: {seek, binary()} | next, - refresh_counter :: {non_neg_integer(), pos_integer()} | undefined -}). - --record(filter, { - keymapper :: keymapper(), - topic_filter :: topic_filter(), - start_time :: integer(), - hash_bitfilter :: integer(), - hash_bitmask :: integer(), - time_bitfilter :: integer(), - time_bitmask :: integer() -}). - -% NOTE -% Keymapper decides how to map messages into RocksDB column family keyspace. --record(keymapper, { - source :: [bitsource(), ...], - bitsize :: bits(), - epoch :: non_neg_integer() -}). - --type bitsource() :: - %% Consume `_Size` bits from timestamp starting at `_Offset`th bit. - %% TODO consistency - {timestamp, _Offset :: bits(), _Size :: bits()} - %% Consume next topic level (either one or all of them) and compute `_Size` bits-wide hash. - | {hash, level | levels, _Size :: bits()}. - --opaque db() :: #db{}. --opaque iterator() :: #it{}. --type serialized_iterator() :: binary(). --type keymapper() :: #keymapper{}. --type keyspace_filter() :: #filter{}. - -%%================================================================================ -%% API funcions -%%================================================================================ - -%% Create a new column family for the generation and a serializable representation of the schema --spec create_new(rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), options()) -> - {schema(), emqx_ds_storage_layer:cf_refs()}. -create_new(DBHandle, GenId, Options) -> - CFName = data_cf(GenId), - CFOptions = maps:get(cf_options, Options, []), - {ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, CFOptions), - Schema = #schema{keymapper = make_keymapper(Options)}, - {Schema, [{CFName, CFHandle}]}. - -%% Reopen the database --spec open( - emqx_ds:shard(), - rocksdb:db_handle(), - emqx_ds_storage_layer:gen_id(), - emqx_ds_storage_layer:cf_refs(), - schema() -) -> - db(). -open(Shard, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> - {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), - #db{ - shard = Shard, - handle = DBHandle, - cf = CFHandle, - keymapper = Keymapper - }. - --spec make_keymapper(options()) -> keymapper(). -make_keymapper(#{ - timestamp_bits := TimestampBits, - topic_bits_per_level := BitsPerLevel, - epoch := MaxEpoch -}) -> - TimestampLSBs = min(TimestampBits, floor(math:log2(MaxEpoch))), - TimestampMSBs = TimestampBits - TimestampLSBs, - NLevels = length(BitsPerLevel), - {LevelBits, [TailLevelsBits]} = lists:split(NLevels - 1, BitsPerLevel), - Source = lists:flatten([ - [{timestamp, TimestampLSBs, TimestampMSBs} || TimestampMSBs > 0], - [{hash, level, Bits} || Bits <- LevelBits], - {hash, levels, TailLevelsBits}, - [{timestamp, 0, TimestampLSBs} || TimestampLSBs > 0] - ]), - #keymapper{ - source = Source, - bitsize = lists:sum([S || {_, _, S} <- Source]), - epoch = 1 bsl TimestampLSBs - }. - --spec store(db(), emqx_guid:guid(), emqx_ds:time(), topic(), binary()) -> - ok | {error, _TODO}. -store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, MessagePayload) -> - Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper), - Value = make_message_value(Topic, MessagePayload), - rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). - --spec delete(db(), emqx_guid:guid(), emqx_ds:time(), topic()) -> - ok | {error, _TODO}. -delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic) -> - Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper), - rocksdb:delete(DBHandle, CFHandle, Key, DB#db.write_options). - --spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) -> - [stream()]. -get_streams(_, TopicFilter, _) -> - [{0, TopicFilter}]. - --spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> - % {error, invalid_start_time}? might just start from the beginning of time - % and call it a day: client violated the contract anyway. - {ok, iterator()} | {error, _TODO}. -make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, Replay, Options) -> - case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of - {ok, ITHandle} -> - Filter = make_keyspace_filter(Replay, DB#db.keymapper), - InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), - RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)), - {ok, #it{ - handle = ITHandle, - filter = Filter, - next_action = {seek, InitialSeek}, - refresh_counter = RefreshCounter - }}; - Err -> - Err - end. - --spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. -next(It0 = #it{filter = #filter{keymapper = Keymapper}}) -> - It = maybe_refresh_iterator(It0), - case rocksdb:iterator_move(It#it.handle, It#it.next_action) of - % spec says `{ok, Key}` is also possible but the implementation says it's not - {ok, Key, Value} -> - % Preserve last seen key in the iterator so it could be restored / refreshed later. - ItNext = It#it{cursor = Key}, - Bitstring = extract(Key, Keymapper), - case match_next(Bitstring, Value, It#it.filter) of - {_Topic, Payload} -> - {value, Payload, ItNext#it{next_action = next}}; - next -> - next(ItNext#it{next_action = next}); - NextBitstring when is_integer(NextBitstring) -> - NextSeek = combine(NextBitstring, <<>>, Keymapper), - next(ItNext#it{next_action = {seek, NextSeek}}); - none -> - stop_iteration(ItNext) - end; - {error, invalid_iterator} -> - stop_iteration(It); - {error, iterator_closed} -> - {error, closed} - end. - --spec preserve_iterator(iterator()) -> serialized_iterator(). -preserve_iterator(#it{ - cursor = Cursor, - filter = #filter{ - topic_filter = TopicFilter, - start_time = StartTime - } -}) -> - State = #{ - v => 1, - cursor => Cursor, - replay => {TopicFilter, StartTime} - }, - term_to_binary(State). - --spec restore_iterator(db(), serialized_iterator()) -> - {ok, iterator()} | {error, _TODO}. -restore_iterator(DB, Serial) when is_binary(Serial) -> - State = binary_to_term(Serial), - restore_iterator(DB, State); -restore_iterator(DB, #{ - v := 1, - cursor := Cursor, - replay := Replay = {_TopicFilter, _StartTime} -}) -> - Options = #{}, % TODO: passthrough options - case make_iterator(DB, Replay, Options) of - {ok, It} when Cursor == undefined -> - % Iterator was preserved right after it has been made. - {ok, It}; - {ok, It} -> - % Iterator was preserved mid-replay, seek right past the last seen key. - {ok, It#it{cursor = Cursor, next_action = {seek, successor(Cursor)}}}; - Err -> - Err - end. - --spec refresh_iterator(iterator()) -> iterator(). -refresh_iterator(It = #it{handle = Handle, cursor = Cursor, next_action = Action}) -> - case rocksdb:iterator_refresh(Handle) of - ok when Action =:= next -> - % Now the underlying iterator is invalid, need to seek instead. - It#it{next_action = {seek, successor(Cursor)}}; - ok -> - % Now the underlying iterator is invalid, but will seek soon anyway. - It; - {error, _} -> - % Implementation could in theory return an {error, ...} tuple. - % Supposedly our best bet is to ignore it. - % TODO logging? - It - end. - -%%================================================================================ -%% Internal exports -%%================================================================================ - --spec keymapper_info(keymapper()) -> - #{source := [bitsource()], bitsize := bits(), epoch := time()}. -keymapper_info(#keymapper{source = Source, bitsize = Bitsize, epoch = Epoch}) -> - #{source => Source, bitsize => Bitsize, epoch => Epoch}. - -make_message_key(Topic, PublishedAt, MessageID, Keymapper) -> - combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper). - -make_message_value(Topic, MessagePayload) -> - term_to_binary({Topic, MessagePayload}). - -unwrap_message_value(Binary) -> - binary_to_term(Binary). - --spec combine(_Bitstring :: integer(), emqx_guid:guid() | <<>>, keymapper()) -> - key(). -combine(Bitstring, MessageID, #keymapper{bitsize = Size}) -> - <>. - --spec extract(key(), keymapper()) -> - _Bitstring :: integer(). -extract(Key, #keymapper{bitsize = Size}) -> - <> = Key, - Bitstring. - --spec compute_bitstring(topic_filter(), time(), keymapper()) -> integer(). -compute_bitstring(TopicFilter, Timestamp, #keymapper{source = Source}) -> - compute_bitstring(TopicFilter, Timestamp, Source, 0). - --spec compute_topic_bitmask(topic_filter(), keymapper()) -> integer(). -compute_topic_bitmask(TopicFilter, #keymapper{source = Source}) -> - compute_topic_bitmask(TopicFilter, Source, 0). - --spec compute_time_bitmask(keymapper()) -> integer(). -compute_time_bitmask(#keymapper{source = Source}) -> - compute_time_bitmask(Source, 0). - --spec hash(term(), bits()) -> integer(). -hash(Input, Bits) -> - % at most 32 bits - erlang:phash2(Input, 1 bsl Bits). - --spec make_keyspace_filter(emqx_ds:replay(), keymapper()) -> keyspace_filter(). -make_keyspace_filter({TopicFilter, StartTime}, Keymapper) -> - Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), - HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), - TimeBitmask = compute_time_bitmask(Keymapper), - HashBitfilter = Bitstring band HashBitmask, - TimeBitfilter = Bitstring band TimeBitmask, - #filter{ - keymapper = Keymapper, - topic_filter = TopicFilter, - start_time = StartTime, - hash_bitfilter = HashBitfilter, - hash_bitmask = HashBitmask, - time_bitfilter = TimeBitfilter, - time_bitmask = TimeBitmask - }. - --spec compute_initial_seek(keyspace_filter()) -> integer(). -compute_initial_seek(#filter{hash_bitfilter = HashBitfilter, time_bitfilter = TimeBitfilter}) -> - % Should be the same as `compute_initial_seek(0, Filter)`. - HashBitfilter bor TimeBitfilter. - --spec compute_next_seek(integer(), keyspace_filter()) -> integer(). -compute_next_seek( - Bitstring, - Filter = #filter{ - hash_bitfilter = HashBitfilter, - hash_bitmask = HashBitmask, - time_bitfilter = TimeBitfilter, - time_bitmask = TimeBitmask - } -) -> - HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), - TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask), - compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter). - -%%================================================================================ -%% Internal functions -%%================================================================================ - -compute_bitstring(Topic, Timestamp, [{timestamp, Offset, Size} | Rest], Acc) -> - I = (Timestamp bsr Offset) band ones(Size), - compute_bitstring(Topic, Timestamp, Rest, bitwise_concat(Acc, I, Size)); -compute_bitstring([], Timestamp, [{hash, level, Size} | Rest], Acc) -> - I = hash(<<"/">>, Size), - compute_bitstring([], Timestamp, Rest, bitwise_concat(Acc, I, Size)); -compute_bitstring([Level | Tail], Timestamp, [{hash, level, Size} | Rest], Acc) -> - I = hash(Level, Size), - compute_bitstring(Tail, Timestamp, Rest, bitwise_concat(Acc, I, Size)); -compute_bitstring(Tail, Timestamp, [{hash, levels, Size} | Rest], Acc) -> - I = hash(Tail, Size), - compute_bitstring(Tail, Timestamp, Rest, bitwise_concat(Acc, I, Size)); -compute_bitstring(_, _, [], Acc) -> - Acc. - -compute_topic_bitmask(Filter, [{timestamp, _, Size} | Rest], Acc) -> - compute_topic_bitmask(Filter, Rest, bitwise_concat(Acc, 0, Size)); -compute_topic_bitmask(['#'], [{hash, _, Size} | Rest], Acc) -> - compute_topic_bitmask(['#'], Rest, bitwise_concat(Acc, 0, Size)); -compute_topic_bitmask(['+' | Tail], [{hash, _, Size} | Rest], Acc) -> - compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, 0, Size)); -compute_topic_bitmask([], [{hash, level, Size} | Rest], Acc) -> - compute_topic_bitmask([], Rest, bitwise_concat(Acc, ones(Size), Size)); -compute_topic_bitmask([_ | Tail], [{hash, level, Size} | Rest], Acc) -> - compute_topic_bitmask(Tail, Rest, bitwise_concat(Acc, ones(Size), Size)); -compute_topic_bitmask(Tail, [{hash, levels, Size} | Rest], Acc) -> - Mask = - case lists:member('+', Tail) orelse lists:member('#', Tail) of - true -> 0; - false -> ones(Size) - end, - compute_topic_bitmask([], Rest, bitwise_concat(Acc, Mask, Size)); -compute_topic_bitmask(_, [], Acc) -> - Acc. - -compute_time_bitmask([{timestamp, _, Size} | Rest], Acc) -> - compute_time_bitmask(Rest, bitwise_concat(Acc, ones(Size), Size)); -compute_time_bitmask([{hash, _, Size} | Rest], Acc) -> - compute_time_bitmask(Rest, bitwise_concat(Acc, 0, Size)); -compute_time_bitmask([], Acc) -> - Acc. - -bitwise_concat(Acc, Item, ItemSize) -> - (Acc bsl ItemSize) bor Item. - -ones(Bits) -> - 1 bsl Bits - 1. - --spec successor(key()) -> key(). -successor(Key) -> - <>. - -%% |123|345|678| -%% foo bar baz - -%% |123|000|678| - |123|fff|678| - -%% foo + baz - -%% |fff|000|fff| - -%% |123|000|678| - -%% |123|056|678| & |fff|000|fff| = |123|000|678|. - -match_next( - Bitstring, - Value, - Filter = #filter{ - topic_filter = TopicFilter, - hash_bitfilter = HashBitfilter, - hash_bitmask = HashBitmask, - time_bitfilter = TimeBitfilter, - time_bitmask = TimeBitmask - } -) -> - HashMatches = topic_hash_matches(Bitstring, HashBitfilter, HashBitmask), - TimeMatches = time_matches(Bitstring, TimeBitfilter, TimeBitmask), - case HashMatches and TimeMatches of - true -> - Message = {Topic, _Payload} = unwrap_message_value(Value), - case emqx_topic:match(Topic, TopicFilter) of - true -> - Message; - false -> - next - end; - false -> - compute_next_seek(HashMatches, TimeMatches, Bitstring, Filter) - end. - -%% `Bitstring` is out of the hash space defined by `HashBitfilter`. -compute_next_seek( - _HashMatches = false, - _TimeMatches, - Bitstring, - Filter = #filter{ - keymapper = Keymapper, - hash_bitfilter = HashBitfilter, - hash_bitmask = HashBitmask, - time_bitfilter = TimeBitfilter, - time_bitmask = TimeBitmask - } -) -> - NextBitstring = compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper), - case NextBitstring of - none -> - none; - _ -> - TimeMatches = time_matches(NextBitstring, TimeBitfilter, TimeBitmask), - compute_next_seek(true, TimeMatches, NextBitstring, Filter) - end; -%% `Bitstring` is out of the time range defined by `TimeBitfilter`. -compute_next_seek( - _HashMatches = true, - _TimeMatches = false, - Bitstring, - #filter{ - time_bitfilter = TimeBitfilter, - time_bitmask = TimeBitmask - } -) -> - compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask); -compute_next_seek(true, true, Bitstring, _It) -> - Bitstring. - -topic_hash_matches(Bitstring, HashBitfilter, HashBitmask) -> - (Bitstring band HashBitmask) == HashBitfilter. - -time_matches(Bitstring, TimeBitfilter, TimeBitmask) -> - (Bitstring band TimeBitmask) >= TimeBitfilter. - -compute_time_seek(Bitstring, TimeBitfilter, TimeBitmask) -> - % Replace the bits of the timestamp in `Bistring` with bits from `Timebitfilter`. - (Bitstring band (bnot TimeBitmask)) bor TimeBitfilter. - -%% Find the closest bitstring which is: -%% * greater than `Bitstring`, -%% * and falls into the hash space defined by `HashBitfilter`. -%% Note that the result can end up "back" in time and out of the time range. -compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Keymapper) -> - Sources = Keymapper#keymapper.source, - Size = Keymapper#keymapper.bitsize, - compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size). - -compute_topic_seek(Bitstring, HashBitfilter, HashBitmask, Sources, Size) -> - % NOTE - % We're iterating through `Substring` here, in lockstep with `HashBitfilter` - % and `HashBitmask`, starting from least signigicant bits. Each bitsource in - % `Sources` has a bitsize `S` and, accordingly, gives us a sub-bitstring `S` - % bits long which we interpret as a "digit". There are 2 flavors of those - % "digits": - % * regular digit with 2^S possible values, - % * degenerate digit with exactly 1 possible value U (represented with 0). - % Our goal here is to find a successor of `Bistring` and perform a kind of - % digit-by-digit addition operation with carry propagation. - NextSeek = zipfoldr3( - fun(Source, Substring, Filter, LBitmask, Offset, Acc) -> - case Source of - {hash, _, S} when LBitmask =:= 0 -> - % Regular case - bitwise_add_digit(Substring, Acc, S, Offset); - {hash, _, _} when LBitmask =/= 0, Substring < Filter -> - % Degenerate case, I_digit < U, no overflow. - % Successor is `U bsl Offset` which is equivalent to 0. - 0; - {hash, _, S} when LBitmask =/= 0, Substring > Filter -> - % Degenerate case, I_digit > U, overflow. - % Successor is `(1 bsl Size + U) bsl Offset`. - overflow_digit(S, Offset); - {hash, _, S} when LBitmask =/= 0 -> - % Degenerate case, I_digit = U - % Perform digit addition with I_digit = 0, assuming "digit" has - % 0 bits of information (but is `S` bits long at the same time). - % This will overflow only if the result of previous iteration - % was an overflow. - bitwise_add_digit(0, Acc, 0, S, Offset); - {timestamp, _, S} -> - % Regular case - bitwise_add_digit(Substring, Acc, S, Offset) - end - end, - 0, - Bitstring, - HashBitfilter, - HashBitmask, - Size, - Sources - ), - case NextSeek bsr Size of - _Carry = 0 -> - % Found the successor. - % We need to recover values of those degenerate digits which we - % represented with 0 during digit-by-digit iteration. - NextSeek bor (HashBitfilter band HashBitmask); - _Carry = 1 -> - % We got "carried away" past the range, time to stop iteration. - none - end. - -bitwise_add_digit(Digit, Number, Width, Offset) -> - bitwise_add_digit(Digit, Number, Width, Width, Offset). - -%% Add "digit" (represented with integer `Digit`) to the `Number` assuming -%% this digit starts at `Offset` bits in `Number` and is `Width` bits long. -%% Perform an overflow if the result of addition would not fit into `Bits` -%% bits. -bitwise_add_digit(Digit, Number, Bits, Width, Offset) -> - Sum = (Digit bsl Offset) + Number, - case (Sum bsr Offset) < (1 bsl Bits) of - true -> Sum; - false -> overflow_digit(Width, Offset) - end. - -%% Constuct a number which denotes an overflow of digit that starts at -%% `Offset` bits and is `Width` bits long. -overflow_digit(Width, Offset) -> - (1 bsl Width) bsl Offset. - -%% Iterate through sub-bitstrings of 3 integers in lockstep, starting from least -%% significant bits first. -%% -%% Each integer is assumed to be `Size` bits long. Lengths of sub-bitstring are -%% specified in `Sources` list, in order from most significant bits to least -%% significant. Each iteration calls `FoldFun` with: -%% * bitsource that was used to extract sub-bitstrings, -%% * 3 sub-bitstrings in integer representation, -%% * bit offset into integers, -%% * current accumulator. --spec zipfoldr3(FoldFun, Acc, integer(), integer(), integer(), _Size :: bits(), [bitsource()]) -> - Acc -when - FoldFun :: fun((bitsource(), integer(), integer(), integer(), _Offset :: bits(), Acc) -> Acc). -zipfoldr3(_FoldFun, Acc, _, _, _, 0, []) -> - Acc; -zipfoldr3(FoldFun, Acc, I1, I2, I3, Offset, [Source = {_, _, S} | Rest]) -> - OffsetNext = Offset - S, - AccNext = zipfoldr3(FoldFun, Acc, I1, I2, I3, OffsetNext, Rest), - FoldFun( - Source, - substring(I1, OffsetNext, S), - substring(I2, OffsetNext, S), - substring(I3, OffsetNext, S), - OffsetNext, - AccNext - ). - -substring(I, Offset, Size) -> - (I bsr Offset) band ones(Size). - -%% @doc Generate a column family ID for the MQTT messages --spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. -data_cf(GenId) -> - ?MODULE_STRING ++ integer_to_list(GenId). - -make_refresh_counter({every, N}) when is_integer(N), N > 0 -> - {0, N}; -make_refresh_counter(undefined) -> - undefined. - -maybe_refresh_iterator(It = #it{refresh_counter = {N, N}}) -> - refresh_iterator(It#it{refresh_counter = {0, N}}); -maybe_refresh_iterator(It = #it{refresh_counter = {M, N}}) -> - It#it{refresh_counter = {M + 1, N}}; -maybe_refresh_iterator(It = #it{refresh_counter = undefined}) -> - It. - -stop_iteration(It) -> - ok = rocksdb:iterator_close(It#it.handle), - none. diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 957383f30..ac037e861 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -129,7 +129,7 @@ t_get_streams(_Config) -> t_replay(_Config) -> %% Create concrete topics: Topics = [<<"foo/bar">>, <<"foo/bar/baz">>], - Timestamps = lists:seq(1, 10), + Timestamps = lists:seq(1, 10_000, 100), Batch1 = [ make_message(PublishedAt, Topic, integer_to_binary(PublishedAt)) || Topic <- Topics, PublishedAt <- Timestamps @@ -140,10 +140,10 @@ t_replay(_Config) -> begin B = integer_to_binary(I), make_message( - TS, <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, integer_to_binary(TS) + TS, <<"wildcard/", B/binary, "/suffix/", Suffix/binary>>, integer_to_binary(TS) ) end - || I <- lists:seq(1, 200), TS <- lists:seq(1, 10), Suffix <- [<<"foo">>, <<"bar">>] + || I <- lists:seq(1, 200), TS <- Timestamps, Suffix <- [<<"foo">>, <<"bar">>] ], ok = emqx_ds_storage_layer:store_batch(?SHARD, Batch2, []), %% Check various topic filters: @@ -158,6 +158,9 @@ t_replay(_Config) -> ?assert(check(?SHARD, <<"foo/+/+">>, 0, Messages)), ?assert(check(?SHARD, <<"+/+/+">>, 0, Messages)), ?assert(check(?SHARD, <<"+/+/baz">>, 0, Messages)), + %% Restart shard to make sure trie is persisted and restored: + ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}), %% Learned wildcard topics: ?assertNot(check(?SHARD, <<"wildcard/1000/suffix/foo">>, 0, [])), ?assert(check(?SHARD, <<"wildcard/1/suffix/foo">>, 0, Messages)), @@ -179,23 +182,24 @@ check(Shard, TopicFilter, StartTime, ExpectedMessages) -> ExpectedMessages ), ?check_trace( - #{timetrap => 10_000}, - begin - Dump = dump_messages(Shard, TopicFilter, StartTime), - verify_dump(TopicFilter, StartTime, Dump), - Missing = ExpectedFiltered -- Dump, - Extras = Dump -- ExpectedFiltered, - ?assertMatch( - #{missing := [], unexpected := []}, - #{ - missing => Missing, - unexpected => Extras, - topic_filter => TopicFilter, - start_time => StartTime - } - ) - end, - []), + #{timetrap => 10_000}, + begin + Dump = dump_messages(Shard, TopicFilter, StartTime), + verify_dump(TopicFilter, StartTime, Dump), + Missing = ExpectedFiltered -- Dump, + Extras = Dump -- ExpectedFiltered, + ?assertMatch( + #{missing := [], unexpected := []}, + #{ + missing => Missing, + unexpected => Extras, + topic_filter => TopicFilter, + start_time => StartTime + } + ) + end, + [] + ), length(ExpectedFiltered) > 0. verify_dump(TopicFilter, StartTime, Dump) -> @@ -227,78 +231,26 @@ dump_messages(Shard, TopicFilter, StartTime) -> ). dump_stream(Shard, Stream, TopicFilter, StartTime) -> - BatchSize = 3, + BatchSize = 100, {ok, Iterator} = emqx_ds_storage_layer:make_iterator( Shard, Stream, parse_topic(TopicFilter), StartTime ), - Loop = fun F(It, 0) -> - error({too_many_iterations, It}); - F(It, N) -> - case emqx_ds_storage_layer:next(Shard, It, BatchSize) of - end_of_stream -> - []; - {ok, _NextIt, []} -> - []; - {ok, NextIt, Batch} -> - Batch ++ F(NextIt, N - 1) - end + Loop = fun + F(It, 0) -> + error({too_many_iterations, It}); + F(It, N) -> + case emqx_ds_storage_layer:next(Shard, It, BatchSize) of + end_of_stream -> + []; + {ok, _NextIt, []} -> + []; + {ok, NextIt, Batch} -> + Batch ++ F(NextIt, N - 1) + end end, - MaxIterations = 1000, + MaxIterations = 1000000, Loop(Iterator, MaxIterations). -%% Smoke test for iteration with wildcard topic filter -%% t_iterate_wildcard(_Config) -> -%% %% Prepare data: -%% Topics = ["foo/bar", "foo/bar/baz", "a", "a/bar"], -%% Timestamps = lists:seq(1, 10), -%% _ = [ -%% store(?SHARD, PublishedAt, Topic, term_to_binary({Topic, PublishedAt})) -%% || Topic <- Topics, PublishedAt <- Timestamps -%% ], -%% ?assertEqual( -%% lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- Timestamps]), -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 0)]) -%% ), -%% ?assertEqual( -%% [], -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 10 + 1)]) -%% ), -%% ?assertEqual( -%% lists:sort([{Topic, PublishedAt} || Topic <- Topics, PublishedAt <- lists:seq(5, 10)]), -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "#", 5)]) -%% ), -%% ?assertEqual( -%% lists:sort([ -%% {Topic, PublishedAt} -%% || Topic <- ["foo/bar", "foo/bar/baz"], PublishedAt <- Timestamps -%% ]), -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/#", 0)]) -%% ), -%% ?assertEqual( -%% lists:sort([{"foo/bar", PublishedAt} || PublishedAt <- Timestamps]), -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+", 0)]) -%% ), -%% ?assertEqual( -%% [], -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "foo/+/bar", 0)]) -%% ), -%% ?assertEqual( -%% lists:sort([ -%% {Topic, PublishedAt} -%% || Topic <- ["foo/bar", "foo/bar/baz", "a/bar"], PublishedAt <- Timestamps -%% ]), -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "+/bar/#", 0)]) -%% ), -%% ?assertEqual( -%% lists:sort([{Topic, PublishedAt} || Topic <- ["a", "a/bar"], PublishedAt <- Timestamps]), -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/#", 0)]) -%% ), -%% ?assertEqual( -%% [], -%% lists:sort([binary_to_term(Payload) || Payload <- iterate(?SHARD, "a/+/+", 0)]) -%% ), -%% ok. - %% t_create_gen(_Config) -> %% {ok, 1} = emqx_ds_storage_layer:create_generation(?SHARD, 5, ?DEFAULT_CONFIG), %% ?assertEqual(