From c149e0e2df8d373cf09b472cadc8dc159426411a Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 11 Oct 2023 15:51:52 +0200 Subject: [PATCH] fix(ds): Pass topic filter to emqx_ds:make_iterator call --- apps/emqx_durable_storage/src/emqx_ds.erl | 8 ++--- .../src/emqx_ds_bitmask_keymapper.erl | 35 ++++++++++--------- .../src/emqx_ds_replication_layer.erl | 16 ++++----- .../src/emqx_ds_storage_layer.erl | 10 +++--- .../src/emqx_ds_storage_reference.erl | 10 +++--- .../src/proto/emqx_ds_proto_v1.erl | 8 ++--- .../test/emqx_ds_SUITE.erl | 15 ++++---- 7 files changed, 53 insertions(+), 49 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index cf4b5a031..dd6af9a03 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -28,7 +28,7 @@ -export([store_batch/2, store_batch/3]). %% Message replay API: --export([get_streams/3, make_iterator/2, next/2]). +-export([get_streams/3, make_iterator/3, next/2]). %% Misc. API: -export([]). @@ -159,9 +159,9 @@ store_batch(DB, Msgs) -> get_streams(DB, TopicFilter, StartTime) -> emqx_ds_replication_layer:get_streams(DB, TopicFilter, StartTime). --spec make_iterator(stream(), time()) -> make_iterator_result(). -make_iterator(Stream, StartTime) -> - emqx_ds_replication_layer:make_iterator(Stream, StartTime). +-spec make_iterator(stream(), topic_filter(), time()) -> make_iterator_result(). +make_iterator(Stream, TopicFilter, StartTime) -> + emqx_ds_replication_layer:make_iterator(Stream, TopicFilter, StartTime). -spec next(iterator(), pos_integer()) -> next_result(). next(Iter, BatchSize) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index 44f171b55..fd2d41946 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -334,29 +334,30 @@ extract_inv(Dest, #scan_action{ ones(Bits) -> 1 bsl Bits - 1. -%% Create a bitmask that is sufficient to cover a given number. E.g.: -%% -%% 2#1000 -> 2#1111; 2#0 -> 2#0; 2#10101 -> 2#11111 -bitmask_of(N) -> - %% FIXME: avoid floats - NBits = ceil(math:log2(N + 1)), - ones(NBits). - %%================================================================================ %% Unit tests %%================================================================================ -ifdef(TEST). -bitmask_of_test() -> - ?assertEqual(2#0, bitmask_of(0)), - ?assertEqual(2#1, bitmask_of(1)), - ?assertEqual(2#11, bitmask_of(2#10)), - ?assertEqual(2#11, bitmask_of(2#11)), - ?assertEqual(2#1111, bitmask_of(2#1000)), - ?assertEqual(2#1111, bitmask_of(2#1111)), - ?assertEqual(ones(128), bitmask_of(ones(128))), - ?assertEqual(ones(256), bitmask_of(ones(256))). +%% %% Create a bitmask that is sufficient to cover a given number. E.g.: +%% %% +%% %% 2#1000 -> 2#1111; 2#0 -> 2#0; 2#10101 -> 2#11111 +%% bitmask_of(N) -> +%% %% FIXME: avoid floats +%% NBits = ceil(math:log2(N + 1)), +%% ones(NBits). + + +%% bitmask_of_test() -> +%% ?assertEqual(2#0, bitmask_of(0)), +%% ?assertEqual(2#1, bitmask_of(1)), +%% ?assertEqual(2#11, bitmask_of(2#10)), +%% ?assertEqual(2#11, bitmask_of(2#11)), +%% ?assertEqual(2#1111, bitmask_of(2#1000)), +%% ?assertEqual(2#1111, bitmask_of(2#1111)), +%% ?assertEqual(ones(128), bitmask_of(ones(128))), +%% ?assertEqual(ones(256), bitmask_of(ones(256))). make_keymapper0_test() -> Schema = [], diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index a28c9de52..aeb2ce646 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -21,7 +21,7 @@ drop_db/1, store_batch/3, get_streams/3, - make_iterator/2, + make_iterator/3, next/2 ]). @@ -30,7 +30,7 @@ do_open_shard_v1/2, do_drop_shard_v1/1, do_get_streams_v1/3, - do_make_iterator_v1/3, + do_make_iterator_v1/4, do_next_v1/3 ]). @@ -132,11 +132,11 @@ get_streams(DB, TopicFilter, StartTime) -> Shards ). --spec make_iterator(stream(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). -make_iterator(Stream, StartTime) -> +-spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). +make_iterator(Stream, TopicFilter, StartTime) -> #stream{shard = Shard, enc = StorageStream} = Stream, Node = node_of_shard(Shard), - case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, StartTime) of + case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #iterator{shard = Shard, enc = Iter}}; Err = {error, _} -> @@ -184,9 +184,9 @@ do_drop_shard_v1(Shard) -> do_get_streams_v1(Shard, TopicFilter, StartTime) -> emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime). --spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:time()) -> {ok, iterator()} | {error, _}. -do_make_iterator_v1(Shard, Stream, StartTime) -> - emqx_ds_storage_layer:make_iterator(Shard, Stream, StartTime). +-spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> {ok, iterator()} | {error, _}. +do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) -> + emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime). -spec do_next_v1(shard_id(), Iter, pos_integer()) -> emqx_ds:next_result(Iter). do_next_v1(Shard, Iter, BatchSize) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index e9d4edc06..744ac869f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). %% Replication layer API: --export([open_shard/2, drop_shard/1, store_batch/3, get_streams/3, make_iterator/3, next/3]). +-export([open_shard/2, drop_shard/1, store_batch/3, get_streams/3, make_iterator/4, next/3]). %% gen_server -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -112,7 +112,7 @@ -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> [_Stream]. --callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:time()) -> +-callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(_Iterator). -callback next(shard_id(), _Data, Iter, pos_integer()) -> @@ -158,11 +158,11 @@ get_streams(Shard, TopicFilter, StartTime) -> Gens ). --spec make_iterator(shard_id(), stream(), emqx_ds:time()) -> +-spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). -make_iterator(Shard, #stream{generation = GenId, enc = Stream}, StartTime) -> +make_iterator(Shard, #stream{generation = GenId, enc = Stream}, TopicFilter, StartTime) -> #{module := Mod, data := GenData} = generation_get(Shard, GenId), - case Mod:make_iterator(Shard, GenData, Stream, StartTime) of + case Mod:make_iterator(Shard, GenData, Stream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #it{ generation = GenId, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index fd480eeab..5a91f9ecd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -27,7 +27,7 @@ -export([]). %% behavior callbacks: --export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/4, next/4]). +-export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/5, next/4]). %% internal exports: -export([]). @@ -49,7 +49,7 @@ cf :: rocksdb:cf_handle() }). --record(stream, {topic_filter :: emqx_ds:topic_filter()}). +-record(stream, {}). -record(it, { topic_filter :: emqx_ds:topic_filter(), @@ -86,10 +86,10 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> Messages ). -get_streams(_Shard, _Data, TopicFilter, _StartTime) -> - [#stream{topic_filter = TopicFilter}]. +get_streams(_Shard, _Data, _TopicFilter, _StartTime) -> + [#stream{}]. -make_iterator(_Shard, _Data, #stream{topic_filter = TopicFilter}, StartTime) -> +make_iterator(_Shard, _Data, #stream{}, TopicFilter, StartTime) -> {ok, #it{ topic_filter = TopicFilter, start_time = StartTime diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 60671cef7..d4d7b3631 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -19,7 +19,7 @@ -include_lib("emqx/include/bpapi.hrl"). %% API: --export([open_shard/3, drop_shard/2, get_streams/4, make_iterator/4, next/4]). +-export([open_shard/3, drop_shard/2, get_streams/4, make_iterator/5, next/4]). %% behavior callbacks: -export([introduced_in/0]). @@ -45,10 +45,10 @@ drop_shard(Node, Shard) -> get_streams(Node, Shard, TopicFilter, Time) -> erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). --spec make_iterator(node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:time()) -> +-spec make_iterator(node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:topic_filter(), emqx_ds:time()) -> {ok, emqx_ds_replication_layer:iterator()} | {error, _}. -make_iterator(Node, Shard, Stream, StartTime) -> - erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [Shard, Stream, StartTime]). +make_iterator(Node, Shard, Stream, TopicFilter, StartTime) -> + erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [Shard, Stream, TopicFilter, StartTime]). -spec next( node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer() diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 1935e41cf..2dc77c563 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -45,9 +45,10 @@ t_02_smoke_get_streams_start_iter(_Config) -> DB = ?FUNCTION_NAME, ?assertMatch(ok, emqx_ds:open_db(DB, #{})), StartTime = 0, - [{Rank, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime), + TopicFilter = ['#'], + [{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), ?assertMatch({_, _}, Rank), - ?assertMatch({ok, _Iter}, emqx_ds:make_iterator(Stream, StartTime)). + ?assertMatch({ok, _Iter}, emqx_ds:make_iterator(Stream, TopicFilter, StartTime)). %% A simple smoke test that verifies that it's possible to iterate %% over messages. @@ -55,14 +56,15 @@ t_03_smoke_iterate(_Config) -> DB = ?FUNCTION_NAME, ?assertMatch(ok, emqx_ds:open_db(DB, #{})), StartTime = 0, + TopicFilter = ['#'], Msgs = [ message(<<"foo/bar">>, <<"1">>, 0), message(<<"foo">>, <<"2">>, 1), message(<<"bar/bar">>, <<"3">>, 2) ], ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), - [{_, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime), - {ok, Iter0} = emqx_ds:make_iterator(Stream, StartTime), + [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime), {ok, Iter, Batch} = iterate(Iter0, 1), ?assertEqual(Msgs, Batch, {Iter0, Iter}). @@ -74,6 +76,7 @@ t_03_smoke_iterate(_Config) -> t_04_restart(_Config) -> DB = ?FUNCTION_NAME, ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + TopicFilter = ['#'], StartTime = 0, Msgs = [ message(<<"foo/bar">>, <<"1">>, 0), @@ -81,8 +84,8 @@ t_04_restart(_Config) -> message(<<"bar/bar">>, <<"3">>, 2) ], ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), - [{_, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime), - {ok, Iter0} = emqx_ds:make_iterator(Stream, StartTime), + [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime), %% Restart the application: ?tp(warning, emqx_ds_SUITE_restart_app, #{}), ok = application:stop(emqx_durable_storage),