From 99329e1243d373c426d17620e1b6aef932509ff8 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 30 Oct 2023 21:43:16 +0100 Subject: [PATCH] refactor(ds): Address review remarks --- .../emqx_persistent_message_ds_replayer.erl | 8 +- apps/emqx/src/emqx_persistent_session_ds.erl | 3 +- .../src/emqx_ds_bitmask_keymapper.erl | 120 +++++++++++++----- .../src/emqx_ds_helper.erl | 73 ----------- .../src/emqx_ds_replication_layer.erl | 3 + .../src/emqx_ds_storage_bitfield_lts.erl | 17 ++- .../src/emqx_ds_storage_layer.erl | 8 +- .../src/proto/emqx_ds_proto_v1.erl | 3 +- tdd | 13 -- 9 files changed, 114 insertions(+), 134 deletions(-) delete mode 100644 apps/emqx_durable_storage/src/emqx_ds_helper.erl delete mode 100755 tdd diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index ce57eaa80..d137891a2 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -72,7 +72,13 @@ replay(_SessionId, _Inflight = #inflight{offset_ranges = _Ranges}) -> -spec commit_offset(emqx_persistent_session_ds:id(), emqx_types:packet_id(), inflight()) -> {_IsValidOffset :: boolean(), inflight()}. -commit_offset(SessionId, PacketId, Inflight0 = #inflight{acked_seqno = AckedSeqno0, next_seqno = NextSeqNo, offset_ranges = Ranges0}) -> +commit_offset( + SessionId, + PacketId, + Inflight0 = #inflight{ + acked_seqno = AckedSeqno0, next_seqno = NextSeqNo, offset_ranges = Ranges0 + } +) -> AckedSeqno = packet_id_to_seqno(NextSeqNo, PacketId), true = AckedSeqno0 < AckedSeqno, Ranges = lists:filter( diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index b8afc771f..c99b8c947 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -281,7 +281,8 @@ publish(_PacketId, Msg, Session) -> puback(_ClientInfo, PacketId, Session = #{id := Id, inflight := Inflight0}) -> case emqx_persistent_message_ds_replayer:commit_offset(Id, PacketId, Inflight0) of {true, Inflight} -> - Msg = #message{}, %% TODO + %% TODO + Msg = #message{}, {ok, Msg, [], Session#{inflight => Inflight}}; {false, _} -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index 90c381104..a67dbc0eb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -168,6 +168,10 @@ %% transformation from a list of bitsources. %% %% Note: Dimension is 1-based. +%% +%% Note: order of bitsources is important. First element of the list +%% is mapped to the _least_ significant bits of the key, and the last +%% element becomes most significant bits. -spec make_keymapper([bitsource()]) -> keymapper(). make_keymapper(Bitsources) -> Arr0 = array:new([{fixed, false}, {default, {0, []}}]), @@ -207,12 +211,13 @@ vector_to_key(#keymapper{scanner = [Actions | Scanner]}, [Coord | Vector]) -> %% @doc Same as `vector_to_key', but it works with binaries, and outputs a binary. -spec bin_vector_to_key(keymapper(), [binary()]) -> binary(). bin_vector_to_key(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, Binaries) -> - Vec = lists:map( - fun({Bin, SizeOf}) -> + Vec = lists:zipwith( + fun(Bin, SizeOf) -> <> = Bin, Int end, - lists:zip(Binaries, DimSizeof) + Binaries, + DimSizeof ), Key = vector_to_key(Keymapper, Vec), <>. @@ -241,13 +246,15 @@ key_to_vector(#keymapper{scanner = Scanner}, Key) -> bin_key_to_vector(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, BinKey) -> <> = BinKey, Vector = key_to_vector(Keymapper, Key), - lists:map( - fun({Elem, SizeOf}) -> + lists:zipwith( + fun(Elem, SizeOf) -> <> end, - lists:zip(Vector, DimSizeof) + Vector, + DimSizeof ). +%% @doc Transform a bitstring to a key -spec bitstring_to_key(keymapper(), bitstring()) -> key(). bitstring_to_key(#keymapper{size = Size}, Bin) -> case Bin of @@ -257,6 +264,7 @@ bitstring_to_key(#keymapper{size = Size}, Bin) -> error({invalid_key, Bin, Size}) end. +%% @doc Transform key to a fixed-size bistring -spec key_to_bitstring(keymapper(), key()) -> bitstring(). key_to_bitstring(#keymapper{size = Size}, Key) -> <>. @@ -267,13 +275,15 @@ make_filter( KeyMapper = #keymapper{schema = Schema, dim_sizeof = DimSizeof, size = TotalSize}, Filter0 ) -> NDim = length(DimSizeof), - %% Transform "symbolic" inequations to ranges: - Filter1 = inequations_to_ranges(KeyMapper, Filter0), + %% Transform "symbolic" constraints to ranges: + Filter1 = constraints_to_ranges(KeyMapper, Filter0), {Bitmask, Bitfilter} = make_bitfilter(KeyMapper, Filter1), %% Calculate maximum source offset as per bitsource specification: MaxOffset = lists:foldl( fun({Dim, Offset, _Size}, Acc) -> - maps:update_with(Dim, fun(OldVal) -> max(OldVal, Offset) end, 0, Acc) + maps:update_with( + Dim, fun(OldVal) -> max(OldVal, Offset) end, maps:merge(#{Dim => 0}, Acc) + ) end, #{}, Schema @@ -288,11 +298,11 @@ make_filter( %% %% This is needed so when we increment the vector, we always scan %% the full range of least significant bits. - Filter2 = lists:map( + Filter2 = lists:zipwith( fun - ({{Val, Val}, _Dim}) -> + ({Val, Val}, _Dim) -> {Val, Val}; - ({{Min0, Max0}, Dim}) -> + ({Min0, Max0}, Dim) -> Offset = maps:get(Dim, MaxOffset, 0), %% Set least significant bits of Min to 0: Min = (Min0 bsr Offset) bsl Offset, @@ -300,7 +310,8 @@ make_filter( Max = Max0 bor ones(Offset), {Min, Max} end, - lists:zip(Filter1, lists:seq(1, NDim)) + Filter1, + lists:seq(1, NDim) ), %% Project the vector into "bitsource coordinate system": {_, Filter} = fold_bitsources( @@ -340,10 +351,37 @@ make_filter( range_max = RangeMax }. +%% @doc Given a filter `F' and key `K0', return the smallest key `K' +%% that satisfies the following conditions: +%% +%% 1. `K >= K0' +%% +%% 2. `K' satisfies filter `F'. +%% +%% If these conditions cannot be satisfied, return `overflow'. +%% +%% Corollary: `K' may be equal to `K0'. -spec ratchet(filter(), key()) -> key() | overflow. ratchet(#filter{bitsource_ranges = Ranges, range_max = Max}, Key) when Key =< Max -> + %% This function works in two steps: first, it finds the position + %% of bitsource ("pivot point") corresponding to the part of the + %% key that should be incremented (or set to the _minimum_ value + %% of the range, in case the respective part of the original key + %% is less than the minimum). It also returns "increment": value + %% that should be added to the part of the key at the pivot point. + %% Increment can be 0 or 1. + %% + %% Then it transforms the key using the following operation: + %% + %% 1. Parts of the key that are less than the pivot point are + %% reset to their minimum values. + %% + %% 2. `Increment' is added to the part of the key at the pivot + %% point. + %% + %% 3. The rest of key stays the same NDim = array:size(Ranges), - case ratchet_scan(Ranges, NDim, Key, 0, _Pivot = {-1, 0}, _Carry = 0) of + case ratchet_scan(Ranges, NDim, Key, 0, {_Pivot0 = -1, _Increment0 = 0}, _Carry = 0) of overflow -> overflow; {Pivot, Increment} -> @@ -352,16 +390,21 @@ ratchet(#filter{bitsource_ranges = Ranges, range_max = Max}, Key) when Key =< Ma ratchet(_, _) -> overflow. +%% @doc Given a binary representing a key and a filter, return the +%% next key matching the filter, or `overflow' if such key doesn't +%% exist. -spec bin_increment(filter(), binary()) -> binary() | overflow. bin_increment(Filter = #filter{size = Size}, <<>>) -> Key = ratchet(Filter, 0), <>; -bin_increment(Filter = #filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter}, KeyBin) -> +bin_increment( + Filter = #filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter, range_max = RangeMax}, + KeyBin +) -> <> = KeyBin, Key1 = Key0 + 1, if - Key1 band Bitmask =:= Bitfilter -> - %% TODO: check overflow + Key1 band Bitmask =:= Bitfilter, Key1 =< RangeMax -> <>; true -> case ratchet(Filter, Key1) of @@ -372,6 +415,10 @@ bin_increment(Filter = #filter{size = Size, bitmask = Bitmask, bitfilter = Bitfi end end. +%% @doc Given a filter and a binary representation of a key, return +%% `false' if the key _doesn't_ match the fitler. This function +%% returning `true' is necessary, but not sufficient condition that +%% the key satisfies the filter. -spec bin_checkmask(filter(), binary()) -> boolean(). bin_checkmask(#filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter}, Key) -> case Key of @@ -449,35 +496,37 @@ ratchet_do(Ranges, Key, I, Pivot, Increment) -> -spec make_bitfilter(keymapper(), [{non_neg_integer(), non_neg_integer()}]) -> {non_neg_integer(), non_neg_integer()}. make_bitfilter(Keymapper = #keymapper{dim_sizeof = DimSizeof}, Ranges) -> - L = lists:map( + L = lists:zipwith( fun - ({{N, N}, Bits}) -> + ({N, N}, Bits) -> %% For strict equality we can employ bitmask: {ones(Bits), N}; - (_) -> + (_, _) -> {0, 0} end, - lists:zip(Ranges, DimSizeof) + Ranges, + DimSizeof ), {Bitmask, Bitfilter} = lists:unzip(L), {vector_to_key(Keymapper, Bitmask), vector_to_key(Keymapper, Bitfilter)}. %% Transform inequalities into a list of closed intervals that the %% vector elements should lie in. -inequations_to_ranges(#keymapper{dim_sizeof = DimSizeof}, Filter) -> - lists:map( +constraints_to_ranges(#keymapper{dim_sizeof = DimSizeof}, Filter) -> + lists:zipwith( fun - ({any, Bitsize}) -> + (any, Bitsize) -> {0, ones(Bitsize)}; - ({{'=', infinity}, Bitsize}) -> + ({'=', infinity}, Bitsize) -> Val = ones(Bitsize), {Val, Val}; - ({{'=', Val}, _Bitsize}) -> + ({'=', Val}, _Bitsize) -> {Val, Val}; - ({{'>=', Val}, Bitsize}) -> + ({'>=', Val}, Bitsize) -> {Val, ones(Bitsize)} end, - lists:zip(Filter, DimSizeof) + Filter, + DimSizeof ). -spec fold_bitsources(fun((_DstOffset :: non_neg_integer(), bitsource(), Acc) -> Acc), Acc, [ @@ -679,7 +728,7 @@ ratchet1_test() -> ?assertEqual(0, ratchet(F, 0)), ?assertEqual(16#fa, ratchet(F, 16#fa)), ?assertEqual(16#ff, ratchet(F, 16#ff)), - ?assertEqual(overflow, ratchet(F, 16#100), "TBD: filter must store the upper bound"). + ?assertEqual(overflow, ratchet(F, 16#100)). %% erlfmt-ignore ratchet2_test() -> @@ -696,6 +745,11 @@ ratchet2_test() -> ?assertEqual(16#aa11cc00, ratchet(F1, 16#aa10dc11)), ?assertEqual(overflow, ratchet(F1, 16#ab000000)), F2 = make_filter(M, [{'=', 16#aa}, {'>=', 16#dddd}, {'=', 16#cc}]), + %% TODO: note that it's `16#aaddcc00` instead of + %% `16#aaddccdd'. That is because currently ratchet function + %% doesn't take LSBs of an '>=' interval if it has a hole in the + %% middle (see `make_filter/2'). This only adds extra keys to the + %% very first interval, so it's not deemed a huge problem. ?assertEqual(16#aaddcc00, ratchet(F2, 0)), ?assertEqual(16#aa_de_cc_00, ratchet(F2, 16#aa_dd_cd_11)). @@ -721,18 +775,18 @@ ratchet3_test_() -> %% Note: this function iterates through the full range of keys, so its %% complexity grows _exponentially_ with the total size of the %% keymapper. -test_iterate(Filter, overflow) -> +test_iterate(_Filter, overflow) -> true; test_iterate(Filter, Key0) -> Key = ratchet(Filter, Key0 + 1), ?assert(ratchet_prop(Filter, Key0, Key)), test_iterate(Filter, Key). -ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = Size}, Key0, Key) -> +ratchet_prop(#filter{bitfilter = Bitfilter, bitmask = Bitmask, size = Size}, Key0, Key) -> %% Validate basic properties of the generated key. It must be %% greater than the old key, and match the bitmask: ?assert(Key =:= overflow orelse (Key band Bitmask =:= Bitfilter)), - ?assert(Key > Key0, {Key, '>=', Key}), + ?assert(Key > Key0, {Key, '>=', Key0}), IMax = ones(Size), %% Iterate through all keys between `Key0 + 1' and `Key' and %% validate that none of them match the bitmask. Ultimately, it @@ -750,7 +804,7 @@ ratchet_prop(Filter = #filter{bitfilter = Bitfilter, bitmask = Bitmask, size = S CheckGaps(Key0 + 1). mkbmask(Keymapper, Filter0) -> - Filter = inequations_to_ranges(Keymapper, Filter0), + Filter = constraints_to_ranges(Keymapper, Filter0), make_bitfilter(Keymapper, Filter). key2vec(Schema, Vector) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_helper.erl b/apps/emqx_durable_storage/src/emqx_ds_helper.erl deleted file mode 100644 index 5b55831d1..000000000 --- a/apps/emqx_durable_storage/src/emqx_ds_helper.erl +++ /dev/null @@ -1,73 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_ds_helper). - -%% API: --export([create_rr/1]). - -%% internal exports: --export([]). - --export_type([rr/0]). - -%%================================================================================ -%% Type declarations -%%================================================================================ - --type item() :: {emqx_ds:stream_rank(), emqx_ds:stream()}. - --type rr() :: #{ - queue := #{term() => [{integer(), emqx_ds:stream()}]}, - active_ring := {[item()], [item()]} -}. - -%%================================================================================ -%% API funcions -%%================================================================================ - --spec create_rr([item()]) -> rr(). -create_rr(Streams) -> - RR0 = #{latest_rank => #{}, active_ring => {[], []}}, - add_streams(RR0, Streams). - --spec add_streams(rr(), [item()]) -> rr(). -add_streams(#{queue := Q0, active_ring := R0}, Streams) -> - Q1 = lists:foldl( - fun({{RankX, RankY}, Stream}, Acc) -> - maps:update_with(RankX, fun(L) -> [{RankY, Stream} | L] end, Acc) - end, - Q0, - Streams - ), - Q2 = maps:map( - fun(_RankX, Streams1) -> - lists:usort(Streams1) - end, - Q1 - ), - #{queue => Q2, active_ring => R0}. - -%%================================================================================ -%% behavior callbacks -%%================================================================================ - -%%================================================================================ -%% Internal exports -%%================================================================================ - -%%================================================================================ -%% Internal functions -%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 9b1ff5c7c..34bb66031 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -13,6 +13,9 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- + +%% @doc Replication layer for DS backends that don't support +%% replication on their own. -module(emqx_ds_replication_layer). -export([ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index b85fb48b0..85f4f5aa7 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -14,11 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Reference implementation of the storage. -%% -%% Trivial, extremely slow and inefficient. It also doesn't handle -%% restart of the Erlang node properly, so obviously it's only to be -%% used for testing. +%% @doc A storage layout based on learned topic structure and using +%% bitfield mapping for the varying topic layers. -module(emqx_ds_storage_bitfield_lts). -behaviour(emqx_ds_storage_layer). @@ -82,6 +79,9 @@ -define(COUNTER, emqx_ds_storage_bitfield_lts_counter). +%% Limit on the number of wildcard levels in the learned topic trie: +-define(WILDCARD_LIMIT, 10). + -include("emqx_ds_bitmask.hrl"). %%================================================================================ @@ -140,7 +140,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> %% If user's topics have more than learned 10 wildcard levels %% (more than 2, really), then it's total carnage; learned topic %% structure won't help. - MaxWildcardLevels = 10, + MaxWildcardLevels = ?WILDCARD_LIMIT, KeymapperCache = array:from_list( [ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) @@ -201,6 +201,9 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> %% levels. Magic constant 2: we have two extra dimensions of topic %% index and time; the rest of dimensions are varying levels. NVarying = length(Inequations) - 2, + %% Assert: + NVarying =< ?WILDCARD_LIMIT orelse + error({too_many_varying_topic_levels, NVarying}), Keymapper = array:get(NVarying, Keymappers), Filter = #filter{range_min = LowerBound, range_max = UpperBound} = emqx_ds_bitmask_keymapper:make_filter( @@ -208,7 +211,7 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> ), {ok, ITHandle} = rocksdb:iterator(DB, CF, [ {iterate_lower_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, LowerBound)}, - {iterate_upper_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, UpperBound)} + {iterate_upper_bound, emqx_ds_bitmask_keymapper:key_to_bitstring(Keymapper, UpperBound + 1)} ]), try put(?COUNTER, 0), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 8b2e3cc61..32ca85935 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -45,7 +45,7 @@ %% Note: this record might be stored permanently on a remote node. -record(stream, { generation :: gen_id(), - enc :: _EncapsultatedData, + enc :: _EncapsulatedData, misc = #{} :: map() }). @@ -54,7 +54,7 @@ %% Note: this record might be stored permanently on a remote node. -record(it, { generation :: gen_id(), - enc :: _EncapsultatedData, + enc :: _EncapsulatedData, misc = #{} :: map() }). @@ -83,10 +83,10 @@ %%%% Shard: -type shard(GenData) :: #{ - %% ID of the current generation (where the new data is written:) + %% ID of the current generation (where the new data is written): current_generation := gen_id(), %% This data is used to create new generation: - prototype := {module(), term()}, + prototype := prototype(), %% Generations: {generation, gen_id()} => GenData }. diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index df9115a78..c79f94377 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -68,5 +68,4 @@ next(Node, Shard, Iter, BatchSize) -> %%================================================================================ introduced_in() -> - %% FIXME - "5.3.0". + "5.4.0". diff --git a/tdd b/tdd deleted file mode 100755 index 197891df6..000000000 --- a/tdd +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash - -make fmt > /dev/null &>1 & - -./rebar3 ct --name ct@127.0.0.1 --readable=true --suite ./_build/test/lib/emqx/test/emqx_persistent_session_SUITE.beam --case t_publish_while_client_is_gone_qos1 --group tcp - -suites=$(cat <