feat(ds): Introduce emqx_ds behavior

This commit is contained in:
ieQu1 2023-11-09 14:01:53 +01:00
parent 5aeb1feada
commit 8dc8237331
12 changed files with 164 additions and 115 deletions

View File

@ -161,7 +161,7 @@ fetch(_SessionId, Inflight, _Streams, 0, Acc) ->
fetch(SessionId, Inflight0, [Stream | Streams], N, Publishes0) ->
#inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0,
ItBegin = get_last_iterator(SessionId, Stream, Ranges0),
{ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N),
{ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N),
{NMessages, Publishes, Inflight1} =
lists:foldl(
fun(Msg, {N0, PubAcc0, InflightAcc0}) ->

View File

@ -105,8 +105,6 @@
-export_type([id/0]).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
%%
-spec create(clientinfo(), conninfo(), emqx_session:conf()) ->
@ -497,8 +495,6 @@ storage() ->
%% @doc Called when a client connects. This function looks up a
%% session or returns `false` if previous one couldn't be found.
%%
%% This function also spawns replay agents for each iterator.
%%
%% Note: session API doesn't handle session takeovers, it's the job of
%% the broker.
-spec session_open(id()) ->
@ -670,7 +666,9 @@ renew_streams(Id, ExistingStreams, TopicFilter, StartTime) ->
ok;
false ->
mnesia:write(?SESSION_STREAM_TAB, Rec, write),
{ok, Iterator} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime),
{ok, Iterator} = emqx_ds:make_iterator(
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
),
IterRec = #ds_iter{id = {Id, Stream}, iter = Iterator},
mnesia:write(?SESSION_ITER_TAB, IterRec, write)
end

View File

@ -16,6 +16,8 @@
-ifndef(EMQX_PERSISTENT_SESSION_DS_HRL_HRL).
-define(EMQX_PERSISTENT_SESSION_DS_HRL_HRL, true).
-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
-define(SESSION_TAB, emqx_ds_session).
-define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).
-define(SESSION_STREAM_TAB, emqx_ds_stream_tab).

View File

@ -256,14 +256,14 @@ consume(TopicFilter, StartMS) ->
Streams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartMS),
lists:flatmap(
fun({_Rank, Stream}) ->
{ok, It} = emqx_ds:make_iterator(Stream, TopicFilter, StartMS),
{ok, It} = emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartMS),
consume(It)
end,
Streams
).
consume(It) ->
case emqx_ds:next(It, 100) of
case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, 100) of
{ok, _NIt, _Msgs = []} ->
[];
{ok, NIt, Msgs} ->

View File

@ -133,7 +133,7 @@ get_listener_port(Type, Name) ->
end_per_group(Group, Config) when Group == tcp; Group == ws; Group == quic ->
ok = emqx_cth_suite:stop(?config(group_apps, Config));
end_per_group(_, _Config) ->
ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
catch emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
ok.
init_per_testcase(TestCase, Config) ->

View File

@ -28,7 +28,7 @@
-export([store_batch/2, store_batch/3]).
%% Message replay API:
-export([get_streams/3, make_iterator/3, next/2]).
-export([get_streams/3, make_iterator/4, next/3]).
%% Misc. API:
-export([]).
@ -100,6 +100,26 @@
-type get_iterator_result(Iterator) :: {ok, Iterator} | undefined.
-define(persistent_term(DB), {emqx_ds_db_backend, DB}).
-define(module(DB), (persistent_term:get(?persistent_term(DB)))).
%%================================================================================
%% Behavior callbacks
%%================================================================================
-callback open_db(db(), create_db_opts()) -> ok | {error, _}.
-callback drop_db(db()) -> ok | {error, _}.
-callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
-callback get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}].
-callback make_iterator(db(), _Stream, topic_filter(), time()) -> make_iterator_result(_Iterator).
-callback next(db(), Iterator, pos_integer()) -> next_result(Iterator).
%%================================================================================
%% API funcions
%%================================================================================
@ -107,19 +127,29 @@
%% @doc Different DBs are completely independent from each other. They
%% could represent something like different tenants.
-spec open_db(db(), create_db_opts()) -> ok.
open_db(DB, Opts = #{backend := builtin}) ->
emqx_ds_replication_layer:open_db(DB, Opts).
open_db(DB, Opts = #{backend := Backend}) when Backend =:= builtin ->
Module =
case Backend of
builtin -> emqx_ds_replication_layer
end,
persistent_term:put(?persistent_term(DB), Module),
?module(DB):open_db(DB, Opts).
%% @doc TODO: currently if one or a few shards are down, they won't be
%% deleted.
-spec drop_db(db()) -> ok.
drop_db(DB) ->
emqx_ds_replication_layer:drop_db(DB).
case persistent_term:get(?persistent_term(DB), undefined) of
undefined ->
ok;
Module ->
Module:drop_db(DB)
end.
-spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
store_batch(DB, Msgs, Opts) ->
emqx_ds_replication_layer:store_batch(DB, Msgs, Opts).
?module(DB):store_batch(DB, Msgs, Opts).
-spec store_batch(db(), [emqx_types:message()]) -> store_batch_result().
store_batch(DB, Msgs) ->
@ -168,15 +198,15 @@ store_batch(DB, Msgs) ->
%% replaying streams that depend on the given one.
-spec get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}].
get_streams(DB, TopicFilter, StartTime) ->
emqx_ds_replication_layer:get_streams(DB, TopicFilter, StartTime).
?module(DB):get_streams(DB, TopicFilter, StartTime).
-spec make_iterator(stream(), topic_filter(), time()) -> make_iterator_result().
make_iterator(Stream, TopicFilter, StartTime) ->
emqx_ds_replication_layer:make_iterator(Stream, TopicFilter, StartTime).
-spec make_iterator(db(), stream(), topic_filter(), time()) -> make_iterator_result().
make_iterator(DB, Stream, TopicFilter, StartTime) ->
?module(DB):make_iterator(DB, Stream, TopicFilter, StartTime).
-spec next(iterator(), pos_integer()) -> next_result().
next(Iter, BatchSize) ->
emqx_ds_replication_layer:next(Iter, BatchSize).
-spec next(db(), iterator(), pos_integer()) -> next_result().
next(DB, Iter, BatchSize) ->
?module(DB):next(DB, Iter, BatchSize).
%%================================================================================
%% Internal exports

View File

@ -18,23 +18,25 @@
%% replication on their own.
-module(emqx_ds_replication_layer).
-behaviour(emqx_ds).
-export([
list_shards/1,
open_db/2,
drop_db/1,
store_batch/3,
get_streams/3,
make_iterator/3,
next/2
make_iterator/4,
next/3
]).
%% internal exports:
-export([
do_open_shard_v1/2,
do_drop_shard_v1/1,
do_get_streams_v1/3,
do_make_iterator_v1/4,
do_next_v1/3
do_open_shard_v1/3,
do_drop_shard_v1/2,
do_get_streams_v1/4,
do_make_iterator_v1/5,
do_next_v1/4
]).
-export_type([shard_id/0, stream/0, iterator/0, message_id/0]).
@ -57,7 +59,7 @@
-type db() :: emqx_ds:db().
-type shard_id() :: {db(), atom()}.
-type shard_id() :: atom().
%% This enapsulates the stream entity from the replication level.
%%
@ -86,41 +88,36 @@
%%================================================================================
-spec list_shards(db()) -> [shard_id()].
list_shards(DB) ->
list_shards(_DB) ->
%% TODO: milestone 5
lists:map(
fun(Node) ->
shard_id(DB, Node)
end,
list_nodes()
).
list_nodes().
-spec open_db(db(), emqx_ds:create_db_opts()) -> ok | {error, _}.
open_db(DB, Opts) ->
%% TODO: improve error reporting, don't just crash
lists:foreach(
fun(Node) ->
Shard = shard_id(DB, Node),
ok = emqx_ds_proto_v1:open_shard(Node, Shard, Opts)
fun(Shard) ->
Node = node_of_shard(DB, Shard),
ok = emqx_ds_proto_v1:open_shard(Node, DB, Shard, Opts)
end,
list_nodes()
list_shards(DB)
).
-spec drop_db(db()) -> ok | {error, _}.
drop_db(DB) ->
lists:foreach(
fun(Node) ->
Shard = shard_id(DB, Node),
ok = emqx_ds_proto_v1:drop_shard(Node, Shard)
fun(Shard) ->
Node = node_of_shard(DB, Shard),
ok = emqx_ds_proto_v1:drop_shard(Node, DB, Shard)
end,
list_nodes()
list_shards(DB)
).
-spec store_batch(db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result().
store_batch(DB, Msg, Opts) ->
%% TODO: Currently we store messages locally.
Shard = shard_id(DB, node()),
Shard = {DB, node()},
emqx_ds_storage_layer:store_batch(Shard, Msg, Opts).
-spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
@ -129,8 +126,8 @@ get_streams(DB, TopicFilter, StartTime) ->
Shards = list_shards(DB),
lists:flatmap(
fun(Shard) ->
Node = node_of_shard(Shard),
Streams = emqx_ds_proto_v1:get_streams(Node, Shard, TopicFilter, StartTime),
Node = node_of_shard(DB, Shard),
Streams = emqx_ds_proto_v1:get_streams(Node, DB, Shard, TopicFilter, StartTime),
lists:map(
fun({RankY, Stream}) ->
RankX = Shard,
@ -147,22 +144,22 @@ get_streams(DB, TopicFilter, StartTime) ->
Shards
).
-spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
-spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
emqx_ds:make_iterator_result(iterator()).
make_iterator(Stream, TopicFilter, StartTime) ->
make_iterator(DB, Stream, TopicFilter, StartTime) ->
#{?tag := ?stream, ?shard := Shard, ?enc := StorageStream} = Stream,
Node = node_of_shard(Shard),
case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, TopicFilter, StartTime) of
Node = node_of_shard(DB, Shard),
case emqx_ds_proto_v1:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
{ok, Iter} ->
{ok, #{?tag => ?it, ?shard => Shard, ?enc => Iter}};
Err = {error, _} ->
Err
end.
-spec next(iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
next(Iter0, BatchSize) ->
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
next(DB, Iter0, BatchSize) ->
#{?tag := ?it, ?shard := Shard, ?enc := StorageIter0} = Iter0,
Node = node_of_shard(Shard),
Node = node_of_shard(DB, Shard),
%% TODO: iterator can contain information that is useful for
%% reconstructing messages sent over the network. For example,
%% when we send messages with the learned topic index, we could
@ -171,7 +168,7 @@ next(Iter0, BatchSize) ->
%%
%% This kind of trickery should be probably done here in the
%% replication layer. Or, perhaps, in the logic layer.
case emqx_ds_proto_v1:next(Node, Shard, StorageIter0, BatchSize) of
case emqx_ds_proto_v1:next(Node, DB, Shard, StorageIter0, BatchSize) of
{ok, StorageIter, Batch} ->
Iter = Iter0#{?enc := StorageIter},
{ok, Iter, Batch};
@ -187,42 +184,49 @@ next(Iter0, BatchSize) ->
%% Internal exports (RPC targets)
%%================================================================================
-spec do_open_shard_v1(shard_id(), emqx_ds:create_db_opts()) -> ok.
do_open_shard_v1(Shard, Opts) ->
emqx_ds_storage_layer:open_shard(Shard, Opts).
-spec do_open_shard_v1(db(), emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
ok | {error, _}.
do_open_shard_v1(DB, Shard, Opts) ->
emqx_ds_storage_layer:open_shard({DB, Shard}, Opts).
-spec do_drop_shard_v1(shard_id()) -> ok.
do_drop_shard_v1(Shard) ->
emqx_ds_storage_layer:drop_shard(Shard).
-spec do_drop_shard_v1(db(), emqx_ds_storage_layer:shard_id()) -> ok | {error, _}.
do_drop_shard_v1(DB, Shard) ->
emqx_ds_storage_layer:drop_shard({DB, Shard}).
-spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
-spec do_get_streams_v1(
emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
) ->
[{integer(), emqx_ds_storage_layer:stream()}].
do_get_streams_v1(Shard, TopicFilter, StartTime) ->
emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime).
do_get_streams_v1(DB, Shard, TopicFilter, StartTime) ->
emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime).
-spec do_make_iterator_v1(
shard_id(), emqx_ds_storage_layer:stream(), emqx_ds:topic_filter(), emqx_ds:time()
emqx_ds:db(),
emqx_ds_storage_layer:shard_id(),
emqx_ds_storage_layer:stream(),
emqx_ds:topic_filter(),
emqx_ds:time()
) ->
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime).
do_make_iterator_v1(DB, Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
-spec do_next_v1(shard_id(), emqx_ds_storage_layer:iterator(), pos_integer()) ->
-spec do_next_v1(
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds_storage_layer:iterator(),
pos_integer()
) ->
emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
do_next_v1(Shard, Iter, BatchSize) ->
emqx_ds_storage_layer:next(Shard, Iter, BatchSize).
do_next_v1(DB, Shard, Iter, BatchSize) ->
emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize).
%%================================================================================
%% Internal functions
%%================================================================================
shard_id(DB, Node) ->
%% TODO: don't bake node name into the schema, don't repeat the
%% Mnesia's 1M$ mistake.
{DB, Node}.
-spec node_of_shard(shard_id()) -> node().
node_of_shard({_DB, Node}) ->
-spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
node_of_shard(_DB, Node) ->
Node.
list_nodes() ->

View File

@ -110,7 +110,7 @@
%%================================================================================
-spec create(
emqx_ds_replication_layer:shard_id(),
emqx_ds_storage_layer:shard_id(),
rocksdb:db_handle(),
emqx_ds_storage_layer:gen_id(),
options()
@ -137,7 +137,7 @@ create(_ShardId, DBHandle, GenId, Options) ->
{Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}.
-spec open(
emqx_ds_replication_layer:shard_id(),
emqx_ds_storage_layer:shard_id(),
rocksdb:db_handle(),
emqx_ds_storage_layer:gen_id(),
emqx_ds_storage_layer:cf_refs(),
@ -173,7 +173,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) ->
}.
-spec store_batch(
emqx_ds_replication_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
) ->
emqx_ds:store_batch_result().
store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
@ -187,7 +187,7 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
).
-spec get_streams(
emqx_ds_replication_layer:shard_id(),
emqx_ds_storage_layer:shard_id(),
s(),
emqx_ds:topic_filter(),
emqx_ds:time()
@ -197,7 +197,7 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
[#{?tag => ?stream, ?storage_key => I} || I <- Indexes].
-spec make_iterator(
emqx_ds_replication_layer:shard_id(),
emqx_ds_storage_layer:shard_id(),
s(),
stream(),
emqx_ds:topic_filter(),

View File

@ -50,7 +50,7 @@
{emqx_ds_storage_reference, emqx_ds_storage_reference:options()}
| {emqx_ds_storage_bitfield_lts, emqx_ds_storage_bitfield_lts:options()}.
-type shard_id() :: emqx_ds_replication_layer:shard_id().
-type shard_id() :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}.
-type cf_refs() :: [{string(), rocksdb:cf_handle()}].
@ -217,7 +217,7 @@ next(Shard, Iter = #{?tag := ?it, ?generation := GenId, ?enc := GenIter0}, Batch
-spec start_link(shard_id(), emqx_ds:builtin_db_opts()) ->
{ok, pid()}.
start_link(Shard, Options) ->
start_link(Shard = {_, _}, Options) ->
gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
-record(s, {
@ -417,11 +417,11 @@ generations_since(Shard, Since) ->
-define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).
-spec get_schema_runtime(shard_id()) -> shard().
get_schema_runtime(Shard) ->
get_schema_runtime(Shard = {_, _}) ->
persistent_term:get(?PERSISTENT_TERM(Shard)).
-spec put_schema_runtime(shard_id(), shard()) -> ok.
put_schema_runtime(Shard, RuntimeSchema) ->
put_schema_runtime(Shard = {_, _}, RuntimeSchema) ->
persistent_term:put(?PERSISTENT_TERM(Shard), RuntimeSchema),
ok.

View File

@ -25,7 +25,7 @@
start_link() ->
supervisor:start_link({local, ?SUP}, ?MODULE, []).
-spec start_shard(emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) ->
-spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
supervisor:startchild_ret().
start_shard(Shard, Options) ->
supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
@ -33,9 +33,10 @@ start_shard(Shard, Options) ->
-spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
stop_shard(Shard) ->
ok = supervisor:terminate_child(?SUP, Shard),
ok = supervisor:delete_child(?SUP, Shard).
Ok = supervisor:delete_child(?SUP, Shard).
-spec ensure_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}.
-spec ensure_shard(emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:options()) ->
ok | {error, _Reason}.
ensure_shard(Shard, Options) ->
case start_shard(Shard, Options) of
{ok, _Pid} ->
@ -63,7 +64,7 @@ init([]) ->
%% Internal functions
%%================================================================================
-spec shard_child_spec(emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) ->
-spec shard_child_spec(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) ->
supervisor:child_spec().
shard_child_spec(Shard, Options) ->
#{

View File

@ -19,7 +19,7 @@
-include_lib("emqx_utils/include/bpapi.hrl").
%% API:
-export([open_shard/3, drop_shard/2, get_streams/4, make_iterator/5, next/4]).
-export([open_shard/4, drop_shard/3, get_streams/5, make_iterator/6, next/5]).
%% behavior callbacks:
-export([introduced_in/0]).
@ -28,44 +28,58 @@
%% API funcions
%%================================================================================
-spec open_shard(node(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) ->
-spec open_shard(
node(),
emqx_ds_replication_layer:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds:create_db_opts()
) ->
ok.
open_shard(Node, Shard, Opts) ->
erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [Shard, Opts]).
open_shard(Node, DB, Shard, Opts) ->
erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [DB, Shard, Opts]).
-spec drop_shard(node(), emqx_ds_replication_layer:shard_id()) ->
-spec drop_shard(node(), emqx_ds_replication_layer:db(), emqx_ds_replication_layer:shard_id()) ->
ok.
drop_shard(Node, Shard) ->
erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [Shard]).
drop_shard(Node, DB, Shard) ->
erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [DB, Shard]).
-spec get_streams(
node(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
node(),
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds:topic_filter(),
emqx_ds:time()
) ->
[{integer(), emqx_ds_storage_layer:stream()}].
get_streams(Node, Shard, TopicFilter, Time) ->
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]).
get_streams(Node, DB, Shard, TopicFilter, Time) ->
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
-spec make_iterator(
node(),
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds_storage_layer:stream(),
emqx_ds:topic_filter(),
emqx_ds:time()
) ->
{ok, emqx_ds_storage_layer:iterator()} | {error, _}.
make_iterator(Node, Shard, Stream, TopicFilter, StartTime) ->
make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [
Shard, Stream, TopicFilter, StartTime
DB, Shard, Stream, TopicFilter, StartTime
]).
-spec next(
node(), emqx_ds_replication_layer:shard_id(), emqx_ds_storage_layer:iterator(), pos_integer()
node(),
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds_storage_layer:iterator(),
pos_integer()
) ->
{ok, emqx_ds_storage_layer:iterator(), [emqx_types:messages()]}
| {ok, end_of_stream}
| {error, _}.
next(Node, Shard, Iter, BatchSize) ->
erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [Shard, Iter, BatchSize]).
next(Node, DB, Shard, Iter, BatchSize) ->
erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
%%================================================================================
%% behavior callbacks

View File

@ -54,7 +54,7 @@ t_02_smoke_get_streams_start_iter(_Config) ->
TopicFilter = ['#'],
[{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
?assertMatch({_, _}, Rank),
?assertMatch({ok, _Iter}, emqx_ds:make_iterator(Stream, TopicFilter, StartTime)).
?assertMatch({ok, _Iter}, emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime)).
%% A simple smoke test that verifies that it's possible to iterate
%% over messages.
@ -70,8 +70,8 @@ t_03_smoke_iterate(_Config) ->
],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
[{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime),
{ok, Iter, Batch} = iterate(Iter0, 1),
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
{ok, Iter, Batch} = iterate(DB, Iter0, 1),
?assertEqual(Msgs, Batch, {Iter0, Iter}).
%% Verify that iterators survive restart of the application. This is
@ -91,14 +91,14 @@ t_04_restart(_Config) ->
],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
[{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime),
{ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
%% Restart the application:
?tp(warning, emqx_ds_SUITE_restart_app, #{}),
ok = application:stop(emqx_durable_storage),
{ok, _} = application:ensure_all_started(emqx_durable_storage),
ok = emqx_ds:open_db(DB, opts()),
%% The old iterator should be still operational:
{ok, Iter, Batch} = iterate(Iter0, 1),
{ok, Iter, Batch} = iterate(DB, Iter0, 1),
?assertEqual(Msgs, Batch, {Iter0, Iter}).
message(Topic, Payload, PublishedAt) ->
@ -109,15 +109,15 @@ message(Topic, Payload, PublishedAt) ->
id = emqx_guid:gen()
}.
iterate(It, BatchSize) ->
iterate(It, BatchSize, []).
iterate(DB, It, BatchSize) ->
iterate(DB, It, BatchSize, []).
iterate(It0, BatchSize, Acc) ->
case emqx_ds:next(It0, BatchSize) of
iterate(DB, It0, BatchSize, Acc) ->
case emqx_ds:next(DB, It0, BatchSize) of
{ok, It, []} ->
{ok, It, Acc};
{ok, It, Msgs} ->
iterate(It, BatchSize, Acc ++ Msgs);
iterate(DB, It, BatchSize, Acc ++ Msgs);
Ret ->
Ret
end.