From d504d415e6b367ccb54caaf65637b06886ef0ce2 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 10 Jan 2023 11:57:04 +0300 Subject: [PATCH 1/6] feat: enable periodic iterator refresh This might be helpful during replays taking multiple tens of seconds so that underlying iterators won't hold onto in-memory / on-disk data structures for too long, preventing rocksdb from recycling them. --- .../src/emqx_replay_message_storage.erl | 71 ++++++++++++++++--- .../props/prop_replay_message_storage.erl | 39 ++++++++++ 2 files changed, 100 insertions(+), 10 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index fe0a0e08a..f58c006cd 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -95,10 +95,12 @@ -export([store/5]). -export([make_iterator/3]). +-export([make_iterator/4]). -export([next/1]). -export([preserve_iterator/1]). -export([restore_iterator/2]). +-export([refresh_iterator/1]). %% Debug/troubleshooting: %% Keymappers @@ -159,9 +161,18 @@ topic_bits_per_level := bits_per_level(), %% Maximum granularity of iteration over time. epoch := time(), + cf_options => emqx_replay_local_store:db_cf_options() }. +-type iteration_options() :: #{ + %% Request periodic iterator refresh. + %% This might be helpful during replays taking a lot of time (e.g. tens of seconds). + %% Note that `{every, 1000}` means 1000 _operations_ with the iterator which is not + %% the same as 1000 replayed messages. + iterator_refresh => {every, _NumOperations :: pos_integer()} +}. + %% Persistent configuration of the generation, it is used to create db %% record when the database is reopened -record(schema, {keymapper :: keymapper()}). @@ -173,14 +184,16 @@ cf :: rocksdb:cf_handle(), keymapper :: keymapper(), write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(), - read_options = [] :: emqx_replay_local_store:db_write_options() + read_options = [] :: emqx_replay_local_store:db_write_options(), + iteration_options = #{} :: iteration_options() }). -record(it, { handle :: rocksdb:itr_handle(), filter :: keyspace_filter(), cursor :: binary() | undefined, - next_action :: {seek, binary()} | next + next_action :: {seek, binary()} | next, + refresh_counter :: {non_neg_integer(), pos_integer()} | undefined }). -record(filter, { @@ -274,41 +287,51 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). -spec make_iterator(db(), emqx_topic:words(), time() | earliest) -> + {ok, iterator()} | {error, _TODO}. +make_iterator(DB, TopicFilter, StartTime) -> + % TODO wire it up somehow to the upper level + make_iterator(DB, TopicFilter, StartTime, DB#db.iteration_options). + +-spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. {ok, iterator()} | {error, _TODO}. -make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) -> +make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) -> case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of {ok, ITHandle} -> % TODO earliest Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper), InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper), + RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)), {ok, #it{ handle = ITHandle, filter = Filter, - next_action = {seek, InitialSeek} + next_action = {seek, InitialSeek}, + refresh_counter = RefreshCounter }}; Err -> Err end. -spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}. -next(It = #it{filter = #filter{keymapper = Keymapper}}) -> +next(It0 = #it{filter = #filter{keymapper = Keymapper}}) -> + It = maybe_refresh_iterator(It0), case rocksdb:iterator_move(It#it.handle, It#it.next_action) of % spec says `{ok, Key}` is also possible but the implementation says it's not {ok, Key, Value} -> + % Preserve last seen key in the iterator so it could be restored / refreshed later. + ItNext = It#it{cursor = Key}, Bitstring = extract(Key, Keymapper), case match_next(Bitstring, Value, It#it.filter) of {_Topic, Payload} -> - % Preserve last seen key in the iterator so it could be restored later. - {value, Payload, It#it{cursor = Key, next_action = next}}; + {value, Payload, ItNext#it{next_action = next}}; next -> - next(It#it{next_action = next}); + next(ItNext#it{next_action = next}); NextBitstring when is_integer(NextBitstring) -> NextSeek = combine(NextBitstring, <<>>, Keymapper), - next(It#it{next_action = {seek, NextSeek}}); + next(ItNext#it{next_action = {seek, NextSeek}}); none -> - stop_iteration(It) + stop_iteration(ItNext) end; {error, invalid_iterator} -> stop_iteration(It); @@ -347,6 +370,22 @@ restore_iterator(DB, #{ Err end. +-spec refresh_iterator(iterator()) -> iterator(). +refresh_iterator(It = #it{handle = Handle, cursor = Cursor, next_action = Action}) -> + case rocksdb:iterator_refresh(Handle) of + ok when Action =:= next -> + % Now the underlying iterator is invalid, need to seek instead. + It#it{next_action = {seek, successor(Cursor)}}; + ok -> + % Now the underlying iterator is invalid, but will seek soon anyway. + It; + {error, _} -> + % Implementation could in theory return an {error, ...} tuple. + % Supposedly our best bet is to ignore it. + % TODO logging? + It + end. + %%================================================================================ %% Internal exports %%================================================================================ @@ -687,6 +726,18 @@ substring(I, Offset, Size) -> data_cf(GenId) -> ?MODULE_STRING ++ integer_to_list(GenId). +make_refresh_counter({every, N}) when is_integer(N), N > 0 -> + {0, N}; +make_refresh_counter(undefined) -> + undefined. + +maybe_refresh_iterator(It = #it{refresh_counter = {N, N}}) -> + refresh_iterator(It#it{refresh_counter = {0, N}}); +maybe_refresh_iterator(It = #it{refresh_counter = {M, N}}) -> + It#it{refresh_counter = {M + 1, N}}; +maybe_refresh_iterator(It = #it{refresh_counter = undefined}) -> + It. + stop_iteration(It) -> ok = rocksdb:iterator_close(It#it.handle), none. diff --git a/apps/emqx_replay/test/props/prop_replay_message_storage.erl b/apps/emqx_replay/test/props/prop_replay_message_storage.erl index 9619c4f05..20c897c2a 100644 --- a/apps/emqx_replay/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_replay/test/props/prop_replay_message_storage.erl @@ -150,6 +150,41 @@ prop_iterate_eq_iterate_with_preserve_restore() -> ) end). +prop_iterate_eq_iterate_with_refresh() -> + TBPL = [4, 8, 16, 12], + Options = #{ + timestamp_bits => 32, + topic_bits_per_level => TBPL, + epoch => 500 + }, + {DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options), + ?FORALL(Stream, non_empty(messages(topic(TBPL))), begin + % TODO + % This proptest is also impure, see above. + ok = store_db(DB, Stream), + ?FORALL( + { + {Topic, _}, + Pat, + StartTime, + RefreshEvery + }, + { + nth(Stream), + topic_filter_pattern(), + start_time(), + pos_integer() + }, + ?TIMEOUT(5000, begin + TopicFilter = make_topic_filter(Pat, Topic), + IterationOptions = #{iterator_refresh => {every, RefreshEvery}}, + Iterator = make_iterator(DB, TopicFilter, StartTime, IterationOptions), + Messages = iterate_db(Iterator), + equals(Messages, iterate_db(DB, TopicFilter, StartTime)) + end) + ) + end). + % store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) -> % MessageID = emqx_guid:gen(), % PublishedAt = ChunkNum, @@ -184,6 +219,10 @@ make_iterator(DB, TopicFilter, StartTime) -> {ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime), It. +make_iterator(DB, TopicFilter, StartTime, Options) -> + {ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime, Options), + It. + run_iterator_commands([iterate | Rest], It, DB) -> case emqx_replay_message_storage:next(It) of {value, Payload, ItNext} -> From 0d495c97c8dd807bab08eb543dd4a3f94fe6365c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 12 Jan 2023 17:12:45 +0300 Subject: [PATCH 2/6] chore: rename testsuite to reflect test subject better --- ...play_storage_SUITE.erl => emqx_replay_local_store_SUITE.erl} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename apps/emqx_replay/test/{emqx_replay_storage_SUITE.erl => emqx_replay_local_store_SUITE.erl} (99%) diff --git a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl similarity index 99% rename from apps/emqx_replay/test/emqx_replay_storage_SUITE.erl rename to apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl index 3d7e7cb41..5a1bb59f4 100644 --- a/apps/emqx_replay/test/emqx_replay_storage_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_replay_storage_SUITE). +-module(emqx_replay_local_store_SUITE). -compile(export_all). -compile(nowarn_export_all). From b7566ab7e7d46392b938f3984344393cfa13af31 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 12 Jan 2023 17:59:15 +0300 Subject: [PATCH 3/6] test: provide more general `keymapper_info/1` --- .../src/emqx_replay_message_storage.erl | 8 ++++---- .../test/props/prop_replay_message_storage.erl | 14 +++++++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index f58c006cd..d14a07c6d 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -105,7 +105,7 @@ %% Debug/troubleshooting: %% Keymappers -export([ - bitsize/1, + keymapper_info/1, compute_bitstring/3, compute_topic_bitmask/2, compute_time_bitmask/1, @@ -390,9 +390,9 @@ refresh_iterator(It = #it{handle = Handle, cursor = Cursor, next_action = Action %% Internal exports %%================================================================================ --spec bitsize(keymapper()) -> bits(). -bitsize(#keymapper{bitsize = Bitsize}) -> - Bitsize. +-spec keymapper_info(keymapper()) -> [bitsource()]. +keymapper_info(#keymapper{source = Source, bitsize = Bitsize, epoch = Epoch}) -> + #{source => Source, bitsize => Bitsize, epoch => Epoch}. make_message_key(Topic, PublishedAt, MessageID, Keymapper) -> combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper). diff --git a/apps/emqx_replay/test/props/prop_replay_message_storage.erl b/apps/emqx_replay/test/props/prop_replay_message_storage.erl index 20c897c2a..c468097c7 100644 --- a/apps/emqx_replay/test/props/prop_replay_message_storage.erl +++ b/apps/emqx_replay/test/props/prop_replay_message_storage.erl @@ -28,13 +28,14 @@ %%-------------------------------------------------------------------- prop_bitstring_computes() -> - ?FORALL(Keymapper, keymapper(), begin - Bitsize = emqx_replay_message_storage:bitsize(Keymapper), + ?FORALL( + Keymapper, + keymapper(), ?FORALL({Topic, Timestamp}, {topic(), integer()}, begin BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper), - is_integer(BS) andalso (BS < (1 bsl Bitsize)) + is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper))) end) - end). + ). prop_topic_bitmask_computes() -> Keymapper = make_keymapper(16, [8, 12, 16], 100), @@ -56,7 +57,7 @@ prop_next_seek_monotonic() -> ), ?FORALL( Bitstring, - bitstr(emqx_replay_message_storage:bitsize(Keymapper)), + bitstr(get_keymapper_bitsize(Keymapper)), emqx_replay_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring ) end @@ -436,6 +437,9 @@ make_keymapper(TimestampBits, TopicBits, MaxEpoch) -> epoch => MaxEpoch }). +get_keymapper_bitsize(Keymapper) -> + maps:get(bitsize, emqx_replay_message_storage:keymapper_info(Keymapper)). + -spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}). interleave(Seqs, Rng) -> interleave(Seqs, length(Seqs), Rng). From d950efc9fa0666f729d56629818680696f4695b1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 12 Jan 2023 17:59:55 +0300 Subject: [PATCH 4/6] test: split unit tests off into a full-fledged suite --- .../src/emqx_replay_message_storage.erl | 160 -------------- .../emqx_replay_message_storage_SUITE.erl | 200 ++++++++++++++++++ 2 files changed, 200 insertions(+), 160 deletions(-) create mode 100644 apps/emqx_replay/test/emqx_replay_message_storage_SUITE.erl diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index d14a07c6d..b1c5f1806 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -741,163 +741,3 @@ maybe_refresh_iterator(It = #it{refresh_counter = undefined}) -> stop_iteration(It) -> ok = rocksdb:iterator_close(It#it.handle), none. - --ifdef(TEST). - --include_lib("eunit/include/eunit.hrl"). - -make_keymapper_test_() -> - [ - ?_assertEqual( - #keymapper{ - source = [ - {timestamp, 9, 23}, - {hash, level, 2}, - {hash, level, 4}, - {hash, levels, 8}, - {timestamp, 0, 9} - ], - bitsize = 46, - epoch = 512 - }, - make_keymapper(#{ - timestamp_bits => 32, - topic_bits_per_level => [2, 4, 8], - epoch => 1000 - }) - ), - ?_assertEqual( - #keymapper{ - source = [ - {timestamp, 0, 32}, - {hash, levels, 16} - ], - bitsize = 48, - epoch = 1 - }, - make_keymapper(#{ - timestamp_bits => 32, - topic_bits_per_level => [16], - epoch => 1 - }) - ) - ]. - -compute_test_bitmask(TopicFilter) -> - compute_topic_bitmask( - TopicFilter, - [ - {hash, level, 3}, - {hash, level, 4}, - {hash, level, 5}, - {hash, levels, 2} - ], - 0 - ). - -bitmask_test_() -> - [ - ?_assertEqual( - 2#111_1111_11111_11, - compute_test_bitmask([<<"foo">>, <<"bar">>]) - ), - ?_assertEqual( - 2#111_0000_11111_11, - compute_test_bitmask([<<"foo">>, '+']) - ), - ?_assertEqual( - 2#111_0000_00000_11, - compute_test_bitmask([<<"foo">>, '+', '+']) - ), - ?_assertEqual( - 2#111_0000_11111_00, - compute_test_bitmask([<<"foo">>, '+', <<"bar">>, '+']) - ) - ]. - -wildcard_bitmask_test_() -> - [ - ?_assertEqual( - 2#000_0000_00000_00, - compute_test_bitmask(['#']) - ), - ?_assertEqual( - 2#111_0000_00000_00, - compute_test_bitmask([<<"foo">>, '#']) - ), - ?_assertEqual( - 2#111_1111_11111_00, - compute_test_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, '#']) - ), - ?_assertEqual( - 2#111_1111_11111_11, - compute_test_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, '#']) - ) - ]. - -%% Filter = |123|***|678|***| -%% Mask = |123|***|678|***| -%% Key1 = |123|011|108|121| → Seek = 0 |123|011|678|000| -%% Key2 = |123|011|679|919| → Seek = 0 |123|012|678|000| -%% Key3 = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos -%% Key4 = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos - -compute_test_topic_seek(Bitstring, Bitfilter, HBitmask) -> - compute_topic_seek( - Bitstring, - Bitfilter, - HBitmask, - [ - {hash, level, 8}, - {hash, level, 8}, - {hash, level, 16}, - {hash, levels, 12} - ], - 8 + 8 + 16 + 12 - ). - -next_seek_test_() -> - [ - ?_assertMatch( - none, - compute_test_topic_seek( - 16#FD_42_4242_043, - 16#FD_42_4242_042, - 16#FF_FF_FFFF_FFF - ) - ), - ?_assertMatch( - 16#FD_11_0678_000, - compute_test_topic_seek( - 16#FD_11_0108_121, - 16#FD_00_0678_000, - 16#FF_00_FFFF_000 - ) - ), - ?_assertMatch( - 16#FD_12_0678_000, - compute_test_topic_seek( - 16#FD_11_0679_919, - 16#FD_00_0678_000, - 16#FF_00_FFFF_000 - ) - ), - ?_assertMatch( - none, - compute_test_topic_seek( - 16#FD_FF_0679_001, - 16#FD_00_0678_000, - 16#FF_00_FFFF_000 - ) - ), - ?_assertMatch( - none, - compute_test_topic_seek( - 16#FE_11_0179_017, - 16#FD_00_0678_000, - 16#FF_00_FFFF_000 - ) - ) - ]. - --endif. diff --git a/apps/emqx_replay/test/emqx_replay_message_storage_SUITE.erl b/apps/emqx_replay/test/emqx_replay_message_storage_SUITE.erl new file mode 100644 index 000000000..3fca48a7b --- /dev/null +++ b/apps/emqx_replay/test/emqx_replay_message_storage_SUITE.erl @@ -0,0 +1,200 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_replay_message_storage_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("stdlib/include/assert.hrl"). + +-import(emqx_replay_message_storage, [ + make_keymapper/1, + keymapper_info/1, + compute_topic_bitmask/2, + compute_time_bitmask/1, + compute_topic_seek/4 +]). + +all() -> emqx_common_test_helpers:all(?MODULE). + +t_make_keymapper(_) -> + ?assertMatch( + #{ + source := [ + {timestamp, 9, 23}, + {hash, level, 2}, + {hash, level, 4}, + {hash, levels, 8}, + {timestamp, 0, 9} + ], + bitsize := 46, + epoch := 512 + }, + keymapper_info( + make_keymapper(#{ + timestamp_bits => 32, + topic_bits_per_level => [2, 4, 8], + epoch => 1000 + }) + ) + ). + +t_make_keymapper_single_hash_level(_) -> + ?assertMatch( + #{ + source := [ + {timestamp, 0, 32}, + {hash, levels, 16} + ], + bitsize := 48, + epoch := 1 + }, + keymapper_info( + make_keymapper(#{ + timestamp_bits => 32, + topic_bits_per_level => [16], + epoch => 1 + }) + ) + ). + +t_make_keymapper_no_timestamp(_) -> + ?assertMatch( + #{ + source := [ + {hash, level, 4}, + {hash, level, 8}, + {hash, levels, 16} + ], + bitsize := 28, + epoch := 1 + }, + keymapper_info( + make_keymapper(#{ + timestamp_bits => 0, + topic_bits_per_level => [4, 8, 16], + epoch => 42 + }) + ) + ). + +t_compute_topic_bitmask(_) -> + KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}), + ?assertEqual( + 2#111_1111_11111_11, + compute_topic_bitmask([<<"foo">>, <<"bar">>], KM) + ), + ?assertEqual( + 2#111_0000_11111_11, + compute_topic_bitmask([<<"foo">>, '+'], KM) + ), + ?assertEqual( + 2#111_0000_00000_11, + compute_topic_bitmask([<<"foo">>, '+', '+'], KM) + ), + ?assertEqual( + 2#111_0000_11111_00, + compute_topic_bitmask([<<"foo">>, '+', <<"bar">>, '+'], KM) + ). + +t_compute_topic_bitmask_wildcard(_) -> + KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}), + ?assertEqual( + 2#000_0000_00000_00, + compute_topic_bitmask(['#'], KM) + ), + ?assertEqual( + 2#111_0000_00000_00, + compute_topic_bitmask([<<"foo">>, '#'], KM) + ), + ?assertEqual( + 2#111_1111_11111_00, + compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, '#'], KM) + ). + +t_compute_topic_bitmask_wildcard_long_tail(_) -> + KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}), + ?assertEqual( + 2#111_1111_11111_11, + compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, <<"xyzzy">>], KM) + ), + ?assertEqual( + 2#111_1111_11111_00, + compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, '#'], KM) + ). + +t_compute_time_bitmask(_) -> + KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 200}), + ?assertEqual(2#111_000000_1111111, compute_time_bitmask(KM)). + +t_compute_time_bitmask_epoch_only(_) -> + KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 1}), + ?assertEqual(2#1111111111_000000, compute_time_bitmask(KM)). + +%% Filter = |123|***|678|***| +%% Mask = |123|***|678|***| +%% Key1 = |123|011|108|121| → Seek = 0 |123|011|678|000| +%% Key2 = |123|011|679|919| → Seek = 0 |123|012|678|000| +%% Key3 = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos +%% Key4 = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos + +t_compute_next_topic_seek(_) -> + KM = make_keymapper(#{topic_bits_per_level => [8, 8, 16, 12], timestamp_bits => 0, epoch => 1}), + ?assertMatch( + none, + compute_topic_seek( + 16#FD_42_4242_043, + 16#FD_42_4242_042, + 16#FF_FF_FFFF_FFF, + KM + ) + ), + ?assertMatch( + 16#FD_11_0678_000, + compute_topic_seek( + 16#FD_11_0108_121, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000, + KM + ) + ), + ?assertMatch( + 16#FD_12_0678_000, + compute_topic_seek( + 16#FD_11_0679_919, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000, + KM + ) + ), + ?assertMatch( + none, + compute_topic_seek( + 16#FD_FF_0679_001, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000, + KM + ) + ), + ?assertMatch( + none, + compute_topic_seek( + 16#FE_11_0179_017, + 16#FD_00_0678_000, + 16#FF_00_FFFF_000, + KM + ) + ). From 464db76a520a7c5ac52703d36c90cb868ca17fa0 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 16 Jan 2023 17:25:55 +0300 Subject: [PATCH 5/6] feat: wire iteration options up to the app config --- apps/emqx_replay/src/emqx_replay_conf.erl | 46 ++++++++++++++----- .../src/emqx_replay_local_store.erl | 2 +- .../src/emqx_replay_message_storage.erl | 20 +++++--- .../test/emqx_replay_local_store_SUITE.erl | 5 +- 4 files changed, 53 insertions(+), 20 deletions(-) diff --git a/apps/emqx_replay/src/emqx_replay_conf.erl b/apps/emqx_replay/src/emqx_replay_conf.erl index 8f7105312..57ba87ddf 100644 --- a/apps/emqx_replay/src/emqx_replay_conf.erl +++ b/apps/emqx_replay/src/emqx_replay_conf.erl @@ -20,27 +20,51 @@ %% API: -export([zone_config/1, db_options/0]). +-export([zone_iteration_options/1]). +-export([default_iteration_options/0]). + %%================================================================================ %% API funcions %%================================================================================ -define(APP, emqx_replay). --spec zone_config(emqx_types:zone()) -> - {module(), term()}. +-type zone() :: emqx_types:zone(). +-type config() :: + {emqx_replay_message_storage, emqx_replay_message_storage:options()} + | {module(), _Options}. + +-spec zone_config(zone()) -> config(). zone_config(Zone) -> - DefaultConf = - #{ - timestamp_bits => 64, - topic_bits_per_level => [8, 8, 8, 32, 16], - epoch => 5 - }, - DefaultZoneConfig = application:get_env( - ?APP, default_zone_config, {emqx_replay_message_storage, DefaultConf} - ), + DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()), Zones = application:get_env(?APP, zone_config, #{}), maps:get(Zone, Zones, DefaultZoneConfig). +-spec zone_iteration_options(zone()) -> emqx_replay_message_storage:iteration_options(). +zone_iteration_options(Zone) -> + case zone_config(Zone) of + {emqx_replay_message_storage, Config} -> + maps:get(iteration, Config, default_iteration_options()); + {_Module, _} -> + default_iteration_options() + end. + +-spec default_iteration_options() -> emqx_replay_message_storage:iteration_options(). +default_iteration_options() -> + {emqx_replay_message_storage, Config} = default_zone_config(), + maps:get(iteration, Config). + +-spec default_zone_config() -> config(). +default_zone_config() -> + {emqx_replay_message_storage, #{ + timestamp_bits => 64, + topic_bits_per_level => [8, 8, 8, 32, 16], + epoch => 5, + iteration => #{ + iterator_refresh => {every, 100} + } + }}. + -spec db_options() -> emqx_replay_local_store:db_options(). db_options() -> application:get_env(?APP, db_options, []). diff --git a/apps/emqx_replay/src/emqx_replay_local_store.erl b/apps/emqx_replay/src/emqx_replay_local_store.erl index 23cedb04c..15a400a92 100644 --- a/apps/emqx_replay/src/emqx_replay_local_store.erl +++ b/apps/emqx_replay/src/emqx_replay_local_store.erl @@ -150,7 +150,7 @@ read_metadata(S) -> -spec read_metadata(gen_id(), #s{}) -> ok. read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) -> Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId), - DB = Mod:open(DBHandle, GenId, CFs, Data), + DB = Mod:open(Zone, DBHandle, GenId, CFs, Data), meta_put(Zone, GenId, Gen#generation{data = DB}). -spec ensure_current_generation(#s{}) -> #s{}. diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index b1c5f1806..dd6c41598 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -90,7 +90,7 @@ %%================================================================================ %% API: --export([create_new/3, open/4]). +-export([create_new/3, open/5]). -export([make_keymapper/1]). -export([store/5]). @@ -123,6 +123,9 @@ -export_type([db/0, iterator/0, schema/0]). +-export_type([options/0]). +-export_type([iteration_options/0]). + -compile( {inline, [ bitwise_concat/3, @@ -162,6 +165,8 @@ %% Maximum granularity of iteration over time. epoch := time(), + iteration => iteration_options(), + cf_options => emqx_replay_local_store:db_cf_options() }. @@ -180,12 +185,12 @@ -opaque schema() :: #schema{}. -record(db, { + zone :: emqx_types:zone(), handle :: rocksdb:db_handle(), cf :: rocksdb:cf_handle(), keymapper :: keymapper(), write_options = [{sync, true}] :: emqx_replay_local_store:db_write_options(), - read_options = [] :: emqx_replay_local_store:db_write_options(), - iteration_options = #{} :: iteration_options() + read_options = [] :: emqx_replay_local_store:db_write_options() }). -record(it, { @@ -233,7 +238,6 @@ %% Create a new column family for the generation and a serializable representation of the schema -spec create_new(rocksdb:db_handle(), emqx_replay_local_store:gen_id(), options()) -> {schema(), emqx_replay_local_store:cf_refs()}. -%{schema(), emqx_replay_local_store:cf_refs()}. create_new(DBHandle, GenId, Options) -> CFName = data_cf(GenId), CFOptions = maps:get(cf_options, Options, []), @@ -243,15 +247,17 @@ create_new(DBHandle, GenId, Options) -> %% Reopen the database -spec open( + emqx_types:zone(), rocksdb:db_handle(), emqx_replay_local_store:gen_id(), emqx_replay_local_store:cf_refs(), schema() ) -> db(). -open(DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> +open(Zone, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) -> {value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs), #db{ + zone = Zone, handle = DBHandle, cf = CFHandle, keymapper = Keymapper @@ -289,8 +295,8 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, -spec make_iterator(db(), emqx_topic:words(), time() | earliest) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, TopicFilter, StartTime) -> - % TODO wire it up somehow to the upper level - make_iterator(DB, TopicFilter, StartTime, DB#db.iteration_options). + Options = emqx_replay_conf:zone_iteration_options(DB#db.zone), + make_iterator(DB, TopicFilter, StartTime, Options). -spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time diff --git a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl index 5a1bb59f4..eee802e69 100644 --- a/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl +++ b/apps/emqx_replay/test/emqx_replay_local_store_SUITE.erl @@ -164,7 +164,10 @@ init_per_testcase(TC, Config) -> ok = set_zone_config(zone(TC), #{ timestamp_bits => 64, topic_bits_per_level => [8, 8, 32, 16], - epoch => 5 + epoch => 5, + iteration => #{ + iterator_refresh => {every, 5} + } }), {ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)), Config. From 16736eca0f30d3e5384f1eb8ab811eea1281e174 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 16 Jan 2023 17:26:57 +0300 Subject: [PATCH 6/6] fix: correct typespec --- apps/emqx_replay/src/emqx_replay_message_storage.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx_replay/src/emqx_replay_message_storage.erl b/apps/emqx_replay/src/emqx_replay_message_storage.erl index dd6c41598..f2a6afaa6 100644 --- a/apps/emqx_replay/src/emqx_replay_message_storage.erl +++ b/apps/emqx_replay/src/emqx_replay_message_storage.erl @@ -396,7 +396,8 @@ refresh_iterator(It = #it{handle = Handle, cursor = Cursor, next_action = Action %% Internal exports %%================================================================================ --spec keymapper_info(keymapper()) -> [bitsource()]. +-spec keymapper_info(keymapper()) -> + #{source := [bitsource()], bitsize := bits(), epoch := time()}. keymapper_info(#keymapper{source = Source, bitsize = Bitsize, epoch = Epoch}) -> #{source => Source, bitsize => Bitsize, epoch => Epoch}.