fix(ds): Pass topic filter to emqx_ds:make_iterator call

This commit is contained in:
ieQu1 2023-10-11 15:51:52 +02:00
parent 51a6f623fd
commit c149e0e2df
7 changed files with 53 additions and 49 deletions

View File

@ -28,7 +28,7 @@
-export([store_batch/2, store_batch/3]). -export([store_batch/2, store_batch/3]).
%% Message replay API: %% Message replay API:
-export([get_streams/3, make_iterator/2, next/2]). -export([get_streams/3, make_iterator/3, next/2]).
%% Misc. API: %% Misc. API:
-export([]). -export([]).
@ -159,9 +159,9 @@ store_batch(DB, Msgs) ->
get_streams(DB, TopicFilter, StartTime) -> get_streams(DB, TopicFilter, StartTime) ->
emqx_ds_replication_layer:get_streams(DB, TopicFilter, StartTime). emqx_ds_replication_layer:get_streams(DB, TopicFilter, StartTime).
-spec make_iterator(stream(), time()) -> make_iterator_result(). -spec make_iterator(stream(), topic_filter(), time()) -> make_iterator_result().
make_iterator(Stream, StartTime) -> make_iterator(Stream, TopicFilter, StartTime) ->
emqx_ds_replication_layer:make_iterator(Stream, StartTime). emqx_ds_replication_layer:make_iterator(Stream, TopicFilter, StartTime).
-spec next(iterator(), pos_integer()) -> next_result(). -spec next(iterator(), pos_integer()) -> next_result().
next(Iter, BatchSize) -> next(Iter, BatchSize) ->

View File

@ -334,29 +334,30 @@ extract_inv(Dest, #scan_action{
ones(Bits) -> ones(Bits) ->
1 bsl Bits - 1. 1 bsl Bits - 1.
%% Create a bitmask that is sufficient to cover a given number. E.g.:
%%
%% 2#1000 -> 2#1111; 2#0 -> 2#0; 2#10101 -> 2#11111
bitmask_of(N) ->
%% FIXME: avoid floats
NBits = ceil(math:log2(N + 1)),
ones(NBits).
%%================================================================================ %%================================================================================
%% Unit tests %% Unit tests
%%================================================================================ %%================================================================================
-ifdef(TEST). -ifdef(TEST).
bitmask_of_test() -> %% %% Create a bitmask that is sufficient to cover a given number. E.g.:
?assertEqual(2#0, bitmask_of(0)), %% %%
?assertEqual(2#1, bitmask_of(1)), %% %% 2#1000 -> 2#1111; 2#0 -> 2#0; 2#10101 -> 2#11111
?assertEqual(2#11, bitmask_of(2#10)), %% bitmask_of(N) ->
?assertEqual(2#11, bitmask_of(2#11)), %% %% FIXME: avoid floats
?assertEqual(2#1111, bitmask_of(2#1000)), %% NBits = ceil(math:log2(N + 1)),
?assertEqual(2#1111, bitmask_of(2#1111)), %% ones(NBits).
?assertEqual(ones(128), bitmask_of(ones(128))),
?assertEqual(ones(256), bitmask_of(ones(256))).
%% bitmask_of_test() ->
%% ?assertEqual(2#0, bitmask_of(0)),
%% ?assertEqual(2#1, bitmask_of(1)),
%% ?assertEqual(2#11, bitmask_of(2#10)),
%% ?assertEqual(2#11, bitmask_of(2#11)),
%% ?assertEqual(2#1111, bitmask_of(2#1000)),
%% ?assertEqual(2#1111, bitmask_of(2#1111)),
%% ?assertEqual(ones(128), bitmask_of(ones(128))),
%% ?assertEqual(ones(256), bitmask_of(ones(256))).
make_keymapper0_test() -> make_keymapper0_test() ->
Schema = [], Schema = [],

View File

@ -21,7 +21,7 @@
drop_db/1, drop_db/1,
store_batch/3, store_batch/3,
get_streams/3, get_streams/3,
make_iterator/2, make_iterator/3,
next/2 next/2
]). ]).
@ -30,7 +30,7 @@
do_open_shard_v1/2, do_open_shard_v1/2,
do_drop_shard_v1/1, do_drop_shard_v1/1,
do_get_streams_v1/3, do_get_streams_v1/3,
do_make_iterator_v1/3, do_make_iterator_v1/4,
do_next_v1/3 do_next_v1/3
]). ]).
@ -132,11 +132,11 @@ get_streams(DB, TopicFilter, StartTime) ->
Shards Shards
). ).
-spec make_iterator(stream(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). -spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()).
make_iterator(Stream, StartTime) -> make_iterator(Stream, TopicFilter, StartTime) ->
#stream{shard = Shard, enc = StorageStream} = Stream, #stream{shard = Shard, enc = StorageStream} = Stream,
Node = node_of_shard(Shard), Node = node_of_shard(Shard),
case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, StartTime) of case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #iterator{shard = Shard, enc = Iter}}; {ok, #iterator{shard = Shard, enc = Iter}};
Err = {error, _} -> Err = {error, _} ->
@ -184,9 +184,9 @@ do_drop_shard_v1(Shard) ->
do_get_streams_v1(Shard, TopicFilter, StartTime) -> do_get_streams_v1(Shard, TopicFilter, StartTime) ->
emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime). emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime).
-spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:time()) -> {ok, iterator()} | {error, _}. -spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> {ok, iterator()} | {error, _}.
do_make_iterator_v1(Shard, Stream, StartTime) -> do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_storage_layer:make_iterator(Shard, Stream, StartTime). emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime).
-spec do_next_v1(shard_id(), Iter, pos_integer()) -> emqx_ds:next_result(Iter). -spec do_next_v1(shard_id(), Iter, pos_integer()) -> emqx_ds:next_result(Iter).
do_next_v1(Shard, Iter, BatchSize) -> do_next_v1(Shard, Iter, BatchSize) ->

View File

@ -18,7 +18,7 @@
-behaviour(gen_server). -behaviour(gen_server).
%% Replication layer API: %% Replication layer API:
-export([open_shard/2, drop_shard/1, store_batch/3, get_streams/3, make_iterator/3, next/3]). -export([open_shard/2, drop_shard/1, store_batch/3, get_streams/3, make_iterator/4, next/3]).
%% gen_server %% gen_server
-export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
@ -112,7 +112,7 @@
-callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) ->
[_Stream]. [_Stream].
-callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:time()) -> -callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:topic_filter(), emqx_ds:time()) ->
emqx_ds:make_iterator_result(_Iterator). emqx_ds:make_iterator_result(_Iterator).
-callback next(shard_id(), _Data, Iter, pos_integer()) -> -callback next(shard_id(), _Data, Iter, pos_integer()) ->
@ -158,11 +158,11 @@ get_streams(Shard, TopicFilter, StartTime) ->
Gens Gens
). ).
-spec make_iterator(shard_id(), stream(), emqx_ds:time()) -> -spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
make_iterator(Shard, #stream{generation = GenId, enc = Stream}, StartTime) -> make_iterator(Shard, #stream{generation = GenId, enc = Stream}, TopicFilter, StartTime) ->
#{module := Mod, data := GenData} = generation_get(Shard, GenId), #{module := Mod, data := GenData} = generation_get(Shard, GenId),
case Mod:make_iterator(Shard, GenData, Stream, StartTime) of case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #it{ {ok, #it{
generation = GenId, generation = GenId,

View File

@ -27,7 +27,7 @@
-export([]). -export([]).
%% behavior callbacks: %% behavior callbacks:
-export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/4, next/4]). -export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]).
%% internal exports: %% internal exports:
-export([]). -export([]).
@ -49,7 +49,7 @@
cf :: rocksdb:cf_handle() cf :: rocksdb:cf_handle()
}). }).
-record(stream, {topic_filter :: emqx_ds:topic_filter()}). -record(stream, {}).
-record(it, { -record(it, {
topic_filter :: emqx_ds:topic_filter(), topic_filter :: emqx_ds:topic_filter(),
@ -86,10 +86,10 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
Messages Messages
). ).
get_streams(_Shard, _Data, TopicFilter, _StartTime) -> get_streams(_Shard, _Data, _TopicFilter, _StartTime) ->
[#stream{topic_filter = TopicFilter}]. [#stream{}].
make_iterator(_Shard, _Data, #stream{topic_filter = TopicFilter}, StartTime) -> make_iterator(_Shard, _Data, #stream{}, TopicFilter, StartTime) ->
{ok, #it{ {ok, #it{
topic_filter = TopicFilter, topic_filter = TopicFilter,
start_time = StartTime start_time = StartTime

View File

@ -19,7 +19,7 @@
-include_lib("emqx/include/bpapi.hrl"). -include_lib("emqx/include/bpapi.hrl").
%% API: %% API:
-export([open_shard/3, drop_shard/2, get_streams/4, make_iterator/4, next/4]). -export([open_shard/3, drop_shard/2, get_streams/4, make_iterator/5, next/4]).
%% behavior callbacks: %% behavior callbacks:
-export([introduced_in/0]). -export([introduced_in/0]).
@ -45,10 +45,10 @@ drop_shard(Node, Shard) ->
get_streams(Node, Shard, TopicFilter, Time) -> get_streams(Node, Shard, TopicFilter, Time) ->
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]).
-spec make_iterator(node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:time()) -> -spec make_iterator(node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) ->
{ok, emqx_ds_replication_layer:iterator()} | {error, _}. {ok, emqx_ds_replication_layer:iterator()} | {error, _}.
make_iterator(Node, Shard, Stream, StartTime) -> make_iterator(Node, Shard, Stream, TopicFilter, StartTime) ->
erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [Shard, Stream, StartTime]). erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [Shard, Stream, TopicFilter, StartTime]).
-spec next( -spec next(
node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer() node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer()

View File

@ -45,9 +45,10 @@ t_02_smoke_get_streams_start_iter(_Config) ->
DB = ?FUNCTION_NAME, DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, #{})), ?assertMatch(ok, emqx_ds:open_db(DB, #{})),
StartTime = 0, StartTime = 0,
[{Rank, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime), TopicFilter = ['#'],
[{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
?assertMatch({_, _}, Rank), ?assertMatch({_, _}, Rank),
?assertMatch({ok, _Iter}, emqx_ds:make_iterator(Stream, StartTime)). ?assertMatch({ok, _Iter}, emqx_ds:make_iterator(Stream, TopicFilter, StartTime)).
%% A simple smoke test that verifies that it's possible to iterate %% A simple smoke test that verifies that it's possible to iterate
%% over messages. %% over messages.
@ -55,14 +56,15 @@ t_03_smoke_iterate(_Config) ->
DB = ?FUNCTION_NAME, DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, #{})), ?assertMatch(ok, emqx_ds:open_db(DB, #{})),
StartTime = 0, StartTime = 0,
TopicFilter = ['#'],
Msgs = [ Msgs = [
message(<<"foo/bar">>, <<"1">>, 0), message(<<"foo/bar">>, <<"1">>, 0),
message(<<"foo">>, <<"2">>, 1), message(<<"foo">>, <<"2">>, 1),
message(<<"bar/bar">>, <<"3">>, 2) message(<<"bar/bar">>, <<"3">>, 2)
], ],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
[{_, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime), [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(Stream, StartTime), {ok, Iter0} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime),
{ok, Iter, Batch} = iterate(Iter0, 1), {ok, Iter, Batch} = iterate(Iter0, 1),
?assertEqual(Msgs, Batch, {Iter0, Iter}). ?assertEqual(Msgs, Batch, {Iter0, Iter}).
@ -74,6 +76,7 @@ t_03_smoke_iterate(_Config) ->
t_04_restart(_Config) -> t_04_restart(_Config) ->
DB = ?FUNCTION_NAME, DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, #{})), ?assertMatch(ok, emqx_ds:open_db(DB, #{})),
TopicFilter = ['#'],
StartTime = 0, StartTime = 0,
Msgs = [ Msgs = [
message(<<"foo/bar">>, <<"1">>, 0), message(<<"foo/bar">>, <<"1">>, 0),
@ -81,8 +84,8 @@ t_04_restart(_Config) ->
message(<<"bar/bar">>, <<"3">>, 2) message(<<"bar/bar">>, <<"3">>, 2)
], ],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
[{_, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime), [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(Stream, StartTime), {ok, Iter0} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime),
%% Restart the application: %% Restart the application:
?tp(warning, emqx_ds_SUITE_restart_app, #{}), ?tp(warning, emqx_ds_SUITE_restart_app, #{}),
ok = application:stop(emqx_durable_storage), ok = application:stop(emqx_durable_storage),