diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index b1a003e93..941573bf8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -43,6 +43,7 @@ stream/0, stream_rank/0, iterator/0, + message_id/0, next_result/1, next_result/0, store_batch_result/0, make_iterator_result/1, make_iterator_result/0 diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index a512a141c..e18c8498d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -105,6 +105,8 @@ ]} ). +-elvis([{elvis_style, no_if_expression, disable}]). + -ifdef(TEST). -include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -139,7 +141,9 @@ dst_offset :: offset() }). --type scanner() :: [[#scan_action{}]]. +-type scan_action() :: #scan_action{}. + +-type scanner() :: [[scan_action()]]. -record(keymapper, { schema :: [bitsource()], @@ -259,7 +263,9 @@ key_to_bitstring(#keymapper{size = Size}, Key) -> %% @doc Create a filter object that facilitates range scans. -spec make_filter(keymapper(), [scalar_range()]) -> filter(). -make_filter(KeyMapper = #keymapper{schema = Schema, dim_sizeof = DimSizeof, size = Size}, Filter0) -> +make_filter( + KeyMapper = #keymapper{schema = Schema, dim_sizeof = DimSizeof, size = TotalSize}, Filter0 +) -> NDim = length(DimSizeof), %% Transform "symbolic" inequations to ranges: Filter1 = inequations_to_ranges(KeyMapper, Filter0), @@ -326,7 +332,7 @@ make_filter(KeyMapper = #keymapper{schema = Schema, dim_sizeof = DimSizeof, size end, %% Final value #filter{ - size = Size, + size = TotalSize, bitmask = Bitmask, bitfilter = Bitfilter, bitsource_ranges = Ranges, @@ -420,7 +426,7 @@ ratchet_scan(Ranges, NDim, Key, I, Pivot0, Carry) -> %% Note: this function operates in bitsource basis, scanning it from %% NDim to 0. It applies the transformation specified by %% `ratchet_scan'. -ratchet_do(Ranges, Key, I, _Pivot, _Increment) when I < 0 -> +ratchet_do(_Ranges, _Key, I, _Pivot, _Increment) when I < 0 -> 0; ratchet_do(Ranges, Key, I, Pivot, Increment) -> #filter_scan_action{offset = Offset, size = Size, min = Min} = array:get(I, Ranges), @@ -495,12 +501,12 @@ do_vector_to_key([Action | Actions], Scanner, Coord, Vector, Acc0) -> Acc = Acc0 bor extract(Coord, Action), do_vector_to_key(Actions, Scanner, Coord, Vector, Acc). --spec extract(_Source :: scalar(), #scan_action{}) -> integer(). +-spec extract(_Source :: scalar(), scan_action()) -> integer(). extract(Src, #scan_action{src_bitmask = SrcBitmask, src_offset = SrcOffset, dst_offset = DstOffset}) -> ((Src bsr SrcOffset) band SrcBitmask) bsl DstOffset. %% extract^-1 --spec extract_inv(_Dest :: scalar(), #scan_action{}) -> integer(). +-spec extract_inv(_Dest :: scalar(), scan_action()) -> integer(). extract_inv(Dest, #scan_action{ src_bitmask = SrcBitmask, src_offset = SrcOffset, dst_offset = DestOffset }) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index c9a73e3e0..d06854fd0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -32,6 +32,8 @@ -include_lib("eunit/include/eunit.hrl"). -endif. +-elvis([{elvis_style, variable_naming_convention, disable}]). + %%================================================================================ %% Type declarations %%================================================================================ @@ -601,7 +603,11 @@ test_key(Trie, Threshold, Topic0) -> fun(Old) -> case Old =:= Topic of true -> Old; - false -> error(#{'$msg' => "Duplicate key!", key => Ret, old_topic => Old, new_topic => Topic}) + false -> error(#{ '$msg' => "Duplicate key!" + , key => Ret + , old_topic => Old + , new_topic => Topic + }) end end, Topic, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 8d406c93e..b85fb48b0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -21,7 +21,7 @@ %% used for testing. -module(emqx_ds_storage_bitfield_lts). --behavior(emqx_ds_storage_layer). +-behaviour(emqx_ds_storage_layer). %% API: -export([]). @@ -65,6 +65,8 @@ keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()) }). +-type s() :: #s{}. + -record(stream, { storage_key :: emqx_ds_lts:msg_storage_key() }). @@ -76,9 +78,7 @@ last_seen_key = <<>> :: binary() }). --define(QUICKCHECK_KEY(KEY, BITMASK, BITFILTER), - ((KEY band BITMASK) =:= BITFILTER) -). +-type iterator() :: #it{}. -define(COUNTER, emqx_ds_storage_bitfield_lts_counter). @@ -92,6 +92,13 @@ %% behavior callbacks %%================================================================================ +-spec create( + emqx_ds_replication_layer:shard_id(), + rocksdb:db_handle(), + emqx_ds_storage_layer:gen_id(), + options() +) -> + {schema(), emqx_ds_storage_layer:cf_refs()}. create(_ShardId, DBHandle, GenId, Options) -> %% Get options: BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64), @@ -112,6 +119,14 @@ create(_ShardId, DBHandle, GenId, Options) -> }, {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}. +-spec open( + emqx_ds_replication_layer:shard_id(), + rocksdb:db_handle(), + emqx_ds_storage_layer:gen_id(), + emqx_ds_storage_layer:cf_refs(), + schema() +) -> + s(). open(_Shard, DBHandle, GenId, CFRefs, Schema) -> #{ bits_per_wildcard_level := BitsPerTopicLevel, @@ -134,6 +149,10 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> ), #s{db = DBHandle, data = DataCF, trie = Trie, keymappers = KeymapperCache}. +-spec store_batch( + emqx_ds_replication_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() +) -> + emqx_ds:store_batch_result(). store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> lists:foreach( fun(Msg) -> @@ -203,7 +222,7 @@ next(_Shard, #s{db = DB, data = CF, keymappers = Keymappers}, It0, BatchSize) -> %% Internal functions %%================================================================================ -next_loop(ITHandle, KeyMapper, Filter, It, Acc, 0) -> +next_loop(_ITHandle, _KeyMapper, _Filter, It, Acc, 0) -> {ok, It, lists:reverse(Acc)}; next_loop(ITHandle, KeyMapper, Filter, It0, Acc0, N0) -> inc_counter(), @@ -249,8 +268,8 @@ traverse_interval(ITHandle, KeyMapper, Filter, It0, Acc, N) -> {0, It0, Acc} end. --spec check_message(emqx_ds_bitmask_keymapper:filter(), #it{}, binary()) -> - {true, #message{}} | false. +-spec check_message(emqx_ds_bitmask_keymapper:filter(), iterator(), binary()) -> + {true, emqx_types:message()} | false. check_message(Filter, #it{last_seen_key = Key}, Val) -> case emqx_ds_bitmask_keymapper:bin_checkmask(Filter, Key) of true -> @@ -270,7 +289,7 @@ format_keyfilter(any) -> format_keyfilter({Op, Val}) -> {Op, integer_to_list(Val, 16)}. --spec make_key(#s{}, #message{}) -> {binary(), [binary()]}. +-spec make_key(s(), emqx_types:message()) -> {binary(), [binary()]}. make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestamp, topic = TopicBin}) -> Tokens = emqx_topic:tokens(TopicBin), {TopicIndex, Varying} = emqx_ds_lts:topic_key(Trie, fun threshold_fun/1, Tokens), @@ -345,7 +364,7 @@ read_persisted_trie(IT, {ok, KeyB, ValB}) -> {binary_to_term(KeyB), binary_to_term(ValB)} | read_persisted_trie(IT, rocksdb:iterator_move(IT, next)) ]; -read_persisted_trie(IT, {error, invalid_iterator}) -> +read_persisted_trie(_IT, {error, invalid_iterator}) -> []. inc_counter() -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 9c7fc3158..ec00f1310 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -21,7 +21,7 @@ %% used for testing. -module(emqx_ds_storage_reference). --behavior(emqx_ds_storage_layer). +-behaviour(emqx_ds_storage_layer). %% API: -export([]). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index ac037e861..6dc24a269 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -354,36 +354,6 @@ store(Shard, PublishedAt, Topic, Payload) -> }, emqx_ds_storage_layer:message_store(Shard, [Msg], #{}). -%% iterate(Shard, TopicFilter, StartTime) -> -%% Streams = emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime), -%% lists:flatmap( -%% fun(Stream) -> -%% iterate(Shard, iterator(Shard, Stream, TopicFilter, StartTime)) -%% end, -%% Streams). - -%% iterate(Shard, It) -> -%% case emqx_ds_storage_layer:next(Shard, It) of -%% {ok, ItNext, [#message{payload = Payload}]} -> -%% [Payload | iterate(Shard, ItNext)]; -%% end_of_stream -> -%% [] -%% end. - -%% iterate(_Shard, end_of_stream, _N) -> -%% {end_of_stream, []}; -%% iterate(Shard, It, N) -> -%% case emqx_ds_storage_layer:next(Shard, It, N) of -%% {ok, ItFinal, Messages} -> -%% {ItFinal, [Payload || #message{payload = Payload} <- Messages]}; -%% end_of_stream -> -%% {end_of_stream, []} -%% end. - -%% iterator(Shard, Stream, TopicFilter, StartTime) -> -%% {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Stream, parse_topic(TopicFilter), StartTime), -%% It. - payloads(Messages) -> lists:map( fun(#message{payload = P}) ->