Merge pull request #9740 from feat/rocksdb-replay-queue/iterator-refresh
feat: enable periodic iterator refresh
This commit is contained in:
commit
a0f97ede67
|
@ -20,27 +20,51 @@
|
|||
%% API:
|
||||
-export([zone_config/1, db_options/0]).
|
||||
|
||||
-export([zone_iteration_options/1]).
|
||||
-export([default_iteration_options/0]).
|
||||
|
||||
%%================================================================================
|
||||
%% API funcions
|
||||
%%================================================================================
|
||||
|
||||
-define(APP, emqx_replay).
|
||||
|
||||
-spec zone_config(emqx_types:zone()) ->
|
||||
{module(), term()}.
|
||||
-type zone() :: emqx_types:zone().
|
||||
-type config() ::
|
||||
{emqx_replay_message_storage, emqx_replay_message_storage:options()}
|
||||
| {module(), _Options}.
|
||||
|
||||
-spec zone_config(zone()) -> config().
|
||||
zone_config(Zone) ->
|
||||
DefaultConf =
|
||||
#{
|
||||
timestamp_bits => 64,
|
||||
topic_bits_per_level => [8, 8, 8, 32, 16],
|
||||
epoch => 5
|
||||
},
|
||||
DefaultZoneConfig = application:get_env(
|
||||
?APP, default_zone_config, {emqx_replay_message_storage, DefaultConf}
|
||||
),
|
||||
DefaultZoneConfig = application:get_env(?APP, default_zone_config, default_zone_config()),
|
||||
Zones = application:get_env(?APP, zone_config, #{}),
|
||||
maps:get(Zone, Zones, DefaultZoneConfig).
|
||||
|
||||
-spec zone_iteration_options(zone()) -> emqx_replay_message_storage:iteration_options().
|
||||
zone_iteration_options(Zone) ->
|
||||
case zone_config(Zone) of
|
||||
{emqx_replay_message_storage, Config} ->
|
||||
maps:get(iteration, Config, default_iteration_options());
|
||||
{_Module, _} ->
|
||||
default_iteration_options()
|
||||
end.
|
||||
|
||||
-spec default_iteration_options() -> emqx_replay_message_storage:iteration_options().
|
||||
default_iteration_options() ->
|
||||
{emqx_replay_message_storage, Config} = default_zone_config(),
|
||||
maps:get(iteration, Config).
|
||||
|
||||
-spec default_zone_config() -> config().
|
||||
default_zone_config() ->
|
||||
{emqx_replay_message_storage, #{
|
||||
timestamp_bits => 64,
|
||||
topic_bits_per_level => [8, 8, 8, 32, 16],
|
||||
epoch => 5,
|
||||
iteration => #{
|
||||
iterator_refresh => {every, 100}
|
||||
}
|
||||
}}.
|
||||
|
||||
-spec db_options() -> emqx_replay_local_store:db_options().
|
||||
db_options() ->
|
||||
application:get_env(?APP, db_options, []).
|
||||
|
|
|
@ -150,7 +150,7 @@ read_metadata(S) ->
|
|||
-spec read_metadata(gen_id(), #s{}) -> ok.
|
||||
read_metadata(GenId, S = #s{zone = Zone, db = DBHandle, column_families = CFs}) ->
|
||||
Gen = #generation{module = Mod, data = Data} = schema_get_gen(DBHandle, GenId),
|
||||
DB = Mod:open(DBHandle, GenId, CFs, Data),
|
||||
DB = Mod:open(Zone, DBHandle, GenId, CFs, Data),
|
||||
meta_put(Zone, GenId, Gen#generation{data = DB}).
|
||||
|
||||
-spec ensure_current_generation(#s{}) -> #s{}.
|
||||
|
|
|
@ -90,20 +90,22 @@
|
|||
%%================================================================================
|
||||
|
||||
%% API:
|
||||
-export([create_new/3, open/4]).
|
||||
-export([create_new/3, open/5]).
|
||||
-export([make_keymapper/1]).
|
||||
|
||||
-export([store/5]).
|
||||
-export([make_iterator/3]).
|
||||
-export([make_iterator/4]).
|
||||
-export([next/1]).
|
||||
|
||||
-export([preserve_iterator/1]).
|
||||
-export([restore_iterator/2]).
|
||||
-export([refresh_iterator/1]).
|
||||
|
||||
%% Debug/troubleshooting:
|
||||
%% Keymappers
|
||||
-export([
|
||||
bitsize/1,
|
||||
keymapper_info/1,
|
||||
compute_bitstring/3,
|
||||
compute_topic_bitmask/2,
|
||||
compute_time_bitmask/1,
|
||||
|
@ -121,6 +123,9 @@
|
|||
|
||||
-export_type([db/0, iterator/0, schema/0]).
|
||||
|
||||
-export_type([options/0]).
|
||||
-export_type([iteration_options/0]).
|
||||
|
||||
-compile(
|
||||
{inline, [
|
||||
bitwise_concat/3,
|
||||
|
@ -159,9 +164,20 @@
|
|||
topic_bits_per_level := bits_per_level(),
|
||||
%% Maximum granularity of iteration over time.
|
||||
epoch := time(),
|
||||
|
||||
iteration => iteration_options(),
|
||||
|
||||
cf_options => emqx_replay_local_store:db_cf_options()
|
||||
}.
|
||||
|
||||
-type iteration_options() :: #{
|
||||
%% Request periodic iterator refresh.
|
||||
%% This might be helpful during replays taking a lot of time (e.g. tens of seconds).
|
||||
%% Note that `{every, 1000}` means 1000 _operations_ with the iterator which is not
|
||||
%% the same as 1000 replayed messages.
|
||||
iterator_refresh => {every, _NumOperations :: pos_integer()}
|
||||
}.
|
||||
|
||||
%% Persistent configuration of the generation, it is used to create db
|
||||
%% record when the database is reopened
|
||||
-record(schema, {keymapper :: keymapper()}).
|
||||
|
@ -169,6 +185,7 @@
|
|||
-opaque schema() :: #schema{}.
|
||||
|
||||
-record(db, {
|
||||
zone :: emqx_types:zone(),
|
||||
handle :: rocksdb:db_handle(),
|
||||
cf :: rocksdb:cf_handle(),
|
||||
keymapper :: keymapper(),
|
||||
|
@ -180,7 +197,8 @@
|
|||
handle :: rocksdb:itr_handle(),
|
||||
filter :: keyspace_filter(),
|
||||
cursor :: binary() | undefined,
|
||||
next_action :: {seek, binary()} | next
|
||||
next_action :: {seek, binary()} | next,
|
||||
refresh_counter :: {non_neg_integer(), pos_integer()} | undefined
|
||||
}).
|
||||
|
||||
-record(filter, {
|
||||
|
@ -220,7 +238,6 @@
|
|||
%% Create a new column family for the generation and a serializable representation of the schema
|
||||
-spec create_new(rocksdb:db_handle(), emqx_replay_local_store:gen_id(), options()) ->
|
||||
{schema(), emqx_replay_local_store:cf_refs()}.
|
||||
%{schema(), emqx_replay_local_store:cf_refs()}.
|
||||
create_new(DBHandle, GenId, Options) ->
|
||||
CFName = data_cf(GenId),
|
||||
CFOptions = maps:get(cf_options, Options, []),
|
||||
|
@ -230,15 +247,17 @@ create_new(DBHandle, GenId, Options) ->
|
|||
|
||||
%% Reopen the database
|
||||
-spec open(
|
||||
emqx_types:zone(),
|
||||
rocksdb:db_handle(),
|
||||
emqx_replay_local_store:gen_id(),
|
||||
emqx_replay_local_store:cf_refs(),
|
||||
schema()
|
||||
) ->
|
||||
db().
|
||||
open(DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
|
||||
open(Zone, DBHandle, GenId, CFs, #schema{keymapper = Keymapper}) ->
|
||||
{value, {_, CFHandle}} = lists:keysearch(data_cf(GenId), 1, CFs),
|
||||
#db{
|
||||
zone = Zone,
|
||||
handle = DBHandle,
|
||||
cf = CFHandle,
|
||||
keymapper = Keymapper
|
||||
|
@ -274,41 +293,51 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic,
|
|||
rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options).
|
||||
|
||||
-spec make_iterator(db(), emqx_topic:words(), time() | earliest) ->
|
||||
{ok, iterator()} | {error, _TODO}.
|
||||
make_iterator(DB, TopicFilter, StartTime) ->
|
||||
Options = emqx_replay_conf:zone_iteration_options(DB#db.zone),
|
||||
make_iterator(DB, TopicFilter, StartTime, Options).
|
||||
|
||||
-spec make_iterator(db(), emqx_topic:words(), time() | earliest, iteration_options()) ->
|
||||
% {error, invalid_start_time}? might just start from the beginning of time
|
||||
% and call it a day: client violated the contract anyway.
|
||||
{ok, iterator()} | {error, _TODO}.
|
||||
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime) ->
|
||||
make_iterator(DB = #db{handle = DBHandle, cf = CFHandle}, TopicFilter, StartTime, Options) ->
|
||||
case rocksdb:iterator(DBHandle, CFHandle, DB#db.read_options) of
|
||||
{ok, ITHandle} ->
|
||||
% TODO earliest
|
||||
Filter = make_keyspace_filter(TopicFilter, StartTime, DB#db.keymapper),
|
||||
InitialSeek = combine(compute_initial_seek(Filter), <<>>, DB#db.keymapper),
|
||||
RefreshCounter = make_refresh_counter(maps:get(iterator_refresh, Options, undefined)),
|
||||
{ok, #it{
|
||||
handle = ITHandle,
|
||||
filter = Filter,
|
||||
next_action = {seek, InitialSeek}
|
||||
next_action = {seek, InitialSeek},
|
||||
refresh_counter = RefreshCounter
|
||||
}};
|
||||
Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
-spec next(iterator()) -> {value, binary(), iterator()} | none | {error, closed}.
|
||||
next(It = #it{filter = #filter{keymapper = Keymapper}}) ->
|
||||
next(It0 = #it{filter = #filter{keymapper = Keymapper}}) ->
|
||||
It = maybe_refresh_iterator(It0),
|
||||
case rocksdb:iterator_move(It#it.handle, It#it.next_action) of
|
||||
% spec says `{ok, Key}` is also possible but the implementation says it's not
|
||||
{ok, Key, Value} ->
|
||||
% Preserve last seen key in the iterator so it could be restored / refreshed later.
|
||||
ItNext = It#it{cursor = Key},
|
||||
Bitstring = extract(Key, Keymapper),
|
||||
case match_next(Bitstring, Value, It#it.filter) of
|
||||
{_Topic, Payload} ->
|
||||
% Preserve last seen key in the iterator so it could be restored later.
|
||||
{value, Payload, It#it{cursor = Key, next_action = next}};
|
||||
{value, Payload, ItNext#it{next_action = next}};
|
||||
next ->
|
||||
next(It#it{next_action = next});
|
||||
next(ItNext#it{next_action = next});
|
||||
NextBitstring when is_integer(NextBitstring) ->
|
||||
NextSeek = combine(NextBitstring, <<>>, Keymapper),
|
||||
next(It#it{next_action = {seek, NextSeek}});
|
||||
next(ItNext#it{next_action = {seek, NextSeek}});
|
||||
none ->
|
||||
stop_iteration(It)
|
||||
stop_iteration(ItNext)
|
||||
end;
|
||||
{error, invalid_iterator} ->
|
||||
stop_iteration(It);
|
||||
|
@ -347,13 +376,30 @@ restore_iterator(DB, #{
|
|||
Err
|
||||
end.
|
||||
|
||||
-spec refresh_iterator(iterator()) -> iterator().
|
||||
refresh_iterator(It = #it{handle = Handle, cursor = Cursor, next_action = Action}) ->
|
||||
case rocksdb:iterator_refresh(Handle) of
|
||||
ok when Action =:= next ->
|
||||
% Now the underlying iterator is invalid, need to seek instead.
|
||||
It#it{next_action = {seek, successor(Cursor)}};
|
||||
ok ->
|
||||
% Now the underlying iterator is invalid, but will seek soon anyway.
|
||||
It;
|
||||
{error, _} ->
|
||||
% Implementation could in theory return an {error, ...} tuple.
|
||||
% Supposedly our best bet is to ignore it.
|
||||
% TODO logging?
|
||||
It
|
||||
end.
|
||||
|
||||
%%================================================================================
|
||||
%% Internal exports
|
||||
%%================================================================================
|
||||
|
||||
-spec bitsize(keymapper()) -> bits().
|
||||
bitsize(#keymapper{bitsize = Bitsize}) ->
|
||||
Bitsize.
|
||||
-spec keymapper_info(keymapper()) ->
|
||||
#{source := [bitsource()], bitsize := bits(), epoch := time()}.
|
||||
keymapper_info(#keymapper{source = Source, bitsize = Bitsize, epoch = Epoch}) ->
|
||||
#{source => Source, bitsize => Bitsize, epoch => Epoch}.
|
||||
|
||||
make_message_key(Topic, PublishedAt, MessageID, Keymapper) ->
|
||||
combine(compute_bitstring(Topic, PublishedAt, Keymapper), MessageID, Keymapper).
|
||||
|
@ -687,166 +733,18 @@ substring(I, Offset, Size) ->
|
|||
data_cf(GenId) ->
|
||||
?MODULE_STRING ++ integer_to_list(GenId).
|
||||
|
||||
make_refresh_counter({every, N}) when is_integer(N), N > 0 ->
|
||||
{0, N};
|
||||
make_refresh_counter(undefined) ->
|
||||
undefined.
|
||||
|
||||
maybe_refresh_iterator(It = #it{refresh_counter = {N, N}}) ->
|
||||
refresh_iterator(It#it{refresh_counter = {0, N}});
|
||||
maybe_refresh_iterator(It = #it{refresh_counter = {M, N}}) ->
|
||||
It#it{refresh_counter = {M + 1, N}};
|
||||
maybe_refresh_iterator(It = #it{refresh_counter = undefined}) ->
|
||||
It.
|
||||
|
||||
stop_iteration(It) ->
|
||||
ok = rocksdb:iterator_close(It#it.handle),
|
||||
none.
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
make_keymapper_test_() ->
|
||||
[
|
||||
?_assertEqual(
|
||||
#keymapper{
|
||||
source = [
|
||||
{timestamp, 9, 23},
|
||||
{hash, level, 2},
|
||||
{hash, level, 4},
|
||||
{hash, levels, 8},
|
||||
{timestamp, 0, 9}
|
||||
],
|
||||
bitsize = 46,
|
||||
epoch = 512
|
||||
},
|
||||
make_keymapper(#{
|
||||
timestamp_bits => 32,
|
||||
topic_bits_per_level => [2, 4, 8],
|
||||
epoch => 1000
|
||||
})
|
||||
),
|
||||
?_assertEqual(
|
||||
#keymapper{
|
||||
source = [
|
||||
{timestamp, 0, 32},
|
||||
{hash, levels, 16}
|
||||
],
|
||||
bitsize = 48,
|
||||
epoch = 1
|
||||
},
|
||||
make_keymapper(#{
|
||||
timestamp_bits => 32,
|
||||
topic_bits_per_level => [16],
|
||||
epoch => 1
|
||||
})
|
||||
)
|
||||
].
|
||||
|
||||
compute_test_bitmask(TopicFilter) ->
|
||||
compute_topic_bitmask(
|
||||
TopicFilter,
|
||||
[
|
||||
{hash, level, 3},
|
||||
{hash, level, 4},
|
||||
{hash, level, 5},
|
||||
{hash, levels, 2}
|
||||
],
|
||||
0
|
||||
).
|
||||
|
||||
bitmask_test_() ->
|
||||
[
|
||||
?_assertEqual(
|
||||
2#111_1111_11111_11,
|
||||
compute_test_bitmask([<<"foo">>, <<"bar">>])
|
||||
),
|
||||
?_assertEqual(
|
||||
2#111_0000_11111_11,
|
||||
compute_test_bitmask([<<"foo">>, '+'])
|
||||
),
|
||||
?_assertEqual(
|
||||
2#111_0000_00000_11,
|
||||
compute_test_bitmask([<<"foo">>, '+', '+'])
|
||||
),
|
||||
?_assertEqual(
|
||||
2#111_0000_11111_00,
|
||||
compute_test_bitmask([<<"foo">>, '+', <<"bar">>, '+'])
|
||||
)
|
||||
].
|
||||
|
||||
wildcard_bitmask_test_() ->
|
||||
[
|
||||
?_assertEqual(
|
||||
2#000_0000_00000_00,
|
||||
compute_test_bitmask(['#'])
|
||||
),
|
||||
?_assertEqual(
|
||||
2#111_0000_00000_00,
|
||||
compute_test_bitmask([<<"foo">>, '#'])
|
||||
),
|
||||
?_assertEqual(
|
||||
2#111_1111_11111_00,
|
||||
compute_test_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, '#'])
|
||||
),
|
||||
?_assertEqual(
|
||||
2#111_1111_11111_11,
|
||||
compute_test_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, '#'])
|
||||
)
|
||||
].
|
||||
|
||||
%% Filter = |123|***|678|***|
|
||||
%% Mask = |123|***|678|***|
|
||||
%% Key1 = |123|011|108|121| → Seek = 0 |123|011|678|000|
|
||||
%% Key2 = |123|011|679|919| → Seek = 0 |123|012|678|000|
|
||||
%% Key3 = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos
|
||||
%% Key4 = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos
|
||||
|
||||
compute_test_topic_seek(Bitstring, Bitfilter, HBitmask) ->
|
||||
compute_topic_seek(
|
||||
Bitstring,
|
||||
Bitfilter,
|
||||
HBitmask,
|
||||
[
|
||||
{hash, level, 8},
|
||||
{hash, level, 8},
|
||||
{hash, level, 16},
|
||||
{hash, levels, 12}
|
||||
],
|
||||
8 + 8 + 16 + 12
|
||||
).
|
||||
|
||||
next_seek_test_() ->
|
||||
[
|
||||
?_assertMatch(
|
||||
none,
|
||||
compute_test_topic_seek(
|
||||
16#FD_42_4242_043,
|
||||
16#FD_42_4242_042,
|
||||
16#FF_FF_FFFF_FFF
|
||||
)
|
||||
),
|
||||
?_assertMatch(
|
||||
16#FD_11_0678_000,
|
||||
compute_test_topic_seek(
|
||||
16#FD_11_0108_121,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000
|
||||
)
|
||||
),
|
||||
?_assertMatch(
|
||||
16#FD_12_0678_000,
|
||||
compute_test_topic_seek(
|
||||
16#FD_11_0679_919,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000
|
||||
)
|
||||
),
|
||||
?_assertMatch(
|
||||
none,
|
||||
compute_test_topic_seek(
|
||||
16#FD_FF_0679_001,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000
|
||||
)
|
||||
),
|
||||
?_assertMatch(
|
||||
none,
|
||||
compute_test_topic_seek(
|
||||
16#FE_11_0179_017,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000
|
||||
)
|
||||
)
|
||||
].
|
||||
|
||||
-endif.
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_replay_storage_SUITE).
|
||||
-module(emqx_replay_local_store_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
@ -164,7 +164,10 @@ init_per_testcase(TC, Config) ->
|
|||
ok = set_zone_config(zone(TC), #{
|
||||
timestamp_bits => 64,
|
||||
topic_bits_per_level => [8, 8, 32, 16],
|
||||
epoch => 5
|
||||
epoch => 5,
|
||||
iteration => #{
|
||||
iterator_refresh => {every, 5}
|
||||
}
|
||||
}),
|
||||
{ok, _} = emqx_replay_local_store_sup:start_zone(zone(TC)),
|
||||
Config.
|
|
@ -0,0 +1,200 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_replay_message_storage_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
|
||||
-import(emqx_replay_message_storage, [
|
||||
make_keymapper/1,
|
||||
keymapper_info/1,
|
||||
compute_topic_bitmask/2,
|
||||
compute_time_bitmask/1,
|
||||
compute_topic_seek/4
|
||||
]).
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_make_keymapper(_) ->
|
||||
?assertMatch(
|
||||
#{
|
||||
source := [
|
||||
{timestamp, 9, 23},
|
||||
{hash, level, 2},
|
||||
{hash, level, 4},
|
||||
{hash, levels, 8},
|
||||
{timestamp, 0, 9}
|
||||
],
|
||||
bitsize := 46,
|
||||
epoch := 512
|
||||
},
|
||||
keymapper_info(
|
||||
make_keymapper(#{
|
||||
timestamp_bits => 32,
|
||||
topic_bits_per_level => [2, 4, 8],
|
||||
epoch => 1000
|
||||
})
|
||||
)
|
||||
).
|
||||
|
||||
t_make_keymapper_single_hash_level(_) ->
|
||||
?assertMatch(
|
||||
#{
|
||||
source := [
|
||||
{timestamp, 0, 32},
|
||||
{hash, levels, 16}
|
||||
],
|
||||
bitsize := 48,
|
||||
epoch := 1
|
||||
},
|
||||
keymapper_info(
|
||||
make_keymapper(#{
|
||||
timestamp_bits => 32,
|
||||
topic_bits_per_level => [16],
|
||||
epoch => 1
|
||||
})
|
||||
)
|
||||
).
|
||||
|
||||
t_make_keymapper_no_timestamp(_) ->
|
||||
?assertMatch(
|
||||
#{
|
||||
source := [
|
||||
{hash, level, 4},
|
||||
{hash, level, 8},
|
||||
{hash, levels, 16}
|
||||
],
|
||||
bitsize := 28,
|
||||
epoch := 1
|
||||
},
|
||||
keymapper_info(
|
||||
make_keymapper(#{
|
||||
timestamp_bits => 0,
|
||||
topic_bits_per_level => [4, 8, 16],
|
||||
epoch => 42
|
||||
})
|
||||
)
|
||||
).
|
||||
|
||||
t_compute_topic_bitmask(_) ->
|
||||
KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
|
||||
?assertEqual(
|
||||
2#111_1111_11111_11,
|
||||
compute_topic_bitmask([<<"foo">>, <<"bar">>], KM)
|
||||
),
|
||||
?assertEqual(
|
||||
2#111_0000_11111_11,
|
||||
compute_topic_bitmask([<<"foo">>, '+'], KM)
|
||||
),
|
||||
?assertEqual(
|
||||
2#111_0000_00000_11,
|
||||
compute_topic_bitmask([<<"foo">>, '+', '+'], KM)
|
||||
),
|
||||
?assertEqual(
|
||||
2#111_0000_11111_00,
|
||||
compute_topic_bitmask([<<"foo">>, '+', <<"bar">>, '+'], KM)
|
||||
).
|
||||
|
||||
t_compute_topic_bitmask_wildcard(_) ->
|
||||
KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
|
||||
?assertEqual(
|
||||
2#000_0000_00000_00,
|
||||
compute_topic_bitmask(['#'], KM)
|
||||
),
|
||||
?assertEqual(
|
||||
2#111_0000_00000_00,
|
||||
compute_topic_bitmask([<<"foo">>, '#'], KM)
|
||||
),
|
||||
?assertEqual(
|
||||
2#111_1111_11111_00,
|
||||
compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, '#'], KM)
|
||||
).
|
||||
|
||||
t_compute_topic_bitmask_wildcard_long_tail(_) ->
|
||||
KM = make_keymapper(#{topic_bits_per_level => [3, 4, 5, 2], timestamp_bits => 0, epoch => 1}),
|
||||
?assertEqual(
|
||||
2#111_1111_11111_11,
|
||||
compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, <<"xyzzy">>], KM)
|
||||
),
|
||||
?assertEqual(
|
||||
2#111_1111_11111_00,
|
||||
compute_topic_bitmask([<<"foo">>, <<"bar">>, <<"baz">>, <<>>, '#'], KM)
|
||||
).
|
||||
|
||||
t_compute_time_bitmask(_) ->
|
||||
KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 200}),
|
||||
?assertEqual(2#111_000000_1111111, compute_time_bitmask(KM)).
|
||||
|
||||
t_compute_time_bitmask_epoch_only(_) ->
|
||||
KM = make_keymapper(#{topic_bits_per_level => [1, 2, 3], timestamp_bits => 10, epoch => 1}),
|
||||
?assertEqual(2#1111111111_000000, compute_time_bitmask(KM)).
|
||||
|
||||
%% Filter = |123|***|678|***|
|
||||
%% Mask = |123|***|678|***|
|
||||
%% Key1 = |123|011|108|121| → Seek = 0 |123|011|678|000|
|
||||
%% Key2 = |123|011|679|919| → Seek = 0 |123|012|678|000|
|
||||
%% Key3 = |123|999|679|001| → Seek = 1 |123|000|678|000| → eos
|
||||
%% Key4 = |125|011|179|017| → Seek = 1 |123|000|678|000| → eos
|
||||
|
||||
t_compute_next_topic_seek(_) ->
|
||||
KM = make_keymapper(#{topic_bits_per_level => [8, 8, 16, 12], timestamp_bits => 0, epoch => 1}),
|
||||
?assertMatch(
|
||||
none,
|
||||
compute_topic_seek(
|
||||
16#FD_42_4242_043,
|
||||
16#FD_42_4242_042,
|
||||
16#FF_FF_FFFF_FFF,
|
||||
KM
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
16#FD_11_0678_000,
|
||||
compute_topic_seek(
|
||||
16#FD_11_0108_121,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000,
|
||||
KM
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
16#FD_12_0678_000,
|
||||
compute_topic_seek(
|
||||
16#FD_11_0679_919,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000,
|
||||
KM
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
none,
|
||||
compute_topic_seek(
|
||||
16#FD_FF_0679_001,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000,
|
||||
KM
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
none,
|
||||
compute_topic_seek(
|
||||
16#FE_11_0179_017,
|
||||
16#FD_00_0678_000,
|
||||
16#FF_00_FFFF_000,
|
||||
KM
|
||||
)
|
||||
).
|
|
@ -28,13 +28,14 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
prop_bitstring_computes() ->
|
||||
?FORALL(Keymapper, keymapper(), begin
|
||||
Bitsize = emqx_replay_message_storage:bitsize(Keymapper),
|
||||
?FORALL(
|
||||
Keymapper,
|
||||
keymapper(),
|
||||
?FORALL({Topic, Timestamp}, {topic(), integer()}, begin
|
||||
BS = emqx_replay_message_storage:compute_bitstring(Topic, Timestamp, Keymapper),
|
||||
is_integer(BS) andalso (BS < (1 bsl Bitsize))
|
||||
is_integer(BS) andalso (BS < (1 bsl get_keymapper_bitsize(Keymapper)))
|
||||
end)
|
||||
end).
|
||||
).
|
||||
|
||||
prop_topic_bitmask_computes() ->
|
||||
Keymapper = make_keymapper(16, [8, 12, 16], 100),
|
||||
|
@ -56,7 +57,7 @@ prop_next_seek_monotonic() ->
|
|||
),
|
||||
?FORALL(
|
||||
Bitstring,
|
||||
bitstr(emqx_replay_message_storage:bitsize(Keymapper)),
|
||||
bitstr(get_keymapper_bitsize(Keymapper)),
|
||||
emqx_replay_message_storage:compute_next_seek(Bitstring, Filter) >= Bitstring
|
||||
)
|
||||
end
|
||||
|
@ -150,6 +151,41 @@ prop_iterate_eq_iterate_with_preserve_restore() ->
|
|||
)
|
||||
end).
|
||||
|
||||
prop_iterate_eq_iterate_with_refresh() ->
|
||||
TBPL = [4, 8, 16, 12],
|
||||
Options = #{
|
||||
timestamp_bits => 32,
|
||||
topic_bits_per_level => TBPL,
|
||||
epoch => 500
|
||||
},
|
||||
{DB, _Handle} = open_db(make_filepath(?FUNCTION_NAME), Options),
|
||||
?FORALL(Stream, non_empty(messages(topic(TBPL))), begin
|
||||
% TODO
|
||||
% This proptest is also impure, see above.
|
||||
ok = store_db(DB, Stream),
|
||||
?FORALL(
|
||||
{
|
||||
{Topic, _},
|
||||
Pat,
|
||||
StartTime,
|
||||
RefreshEvery
|
||||
},
|
||||
{
|
||||
nth(Stream),
|
||||
topic_filter_pattern(),
|
||||
start_time(),
|
||||
pos_integer()
|
||||
},
|
||||
?TIMEOUT(5000, begin
|
||||
TopicFilter = make_topic_filter(Pat, Topic),
|
||||
IterationOptions = #{iterator_refresh => {every, RefreshEvery}},
|
||||
Iterator = make_iterator(DB, TopicFilter, StartTime, IterationOptions),
|
||||
Messages = iterate_db(Iterator),
|
||||
equals(Messages, iterate_db(DB, TopicFilter, StartTime))
|
||||
end)
|
||||
)
|
||||
end).
|
||||
|
||||
% store_message_stream(DB, [{Topic, {Payload, ChunkNum, _ChunkCount}} | Rest]) ->
|
||||
% MessageID = emqx_guid:gen(),
|
||||
% PublishedAt = ChunkNum,
|
||||
|
@ -184,6 +220,10 @@ make_iterator(DB, TopicFilter, StartTime) ->
|
|||
{ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime),
|
||||
It.
|
||||
|
||||
make_iterator(DB, TopicFilter, StartTime, Options) ->
|
||||
{ok, It} = emqx_replay_message_storage:make_iterator(DB, TopicFilter, StartTime, Options),
|
||||
It.
|
||||
|
||||
run_iterator_commands([iterate | Rest], It, DB) ->
|
||||
case emqx_replay_message_storage:next(It) of
|
||||
{value, Payload, ItNext} ->
|
||||
|
@ -397,6 +437,9 @@ make_keymapper(TimestampBits, TopicBits, MaxEpoch) ->
|
|||
epoch => MaxEpoch
|
||||
}).
|
||||
|
||||
get_keymapper_bitsize(Keymapper) ->
|
||||
maps:get(bitsize, emqx_replay_message_storage:keymapper_info(Keymapper)).
|
||||
|
||||
-spec interleave(list({Tag, list(E)}), rand:state()) -> list({Tag, E}).
|
||||
interleave(Seqs, Rng) ->
|
||||
interleave(Seqs, length(Seqs), Rng).
|
||||
|
|
Loading…
Reference in New Issue