diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 1ef4ea293..156aa943e 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -161,7 +161,7 @@ fetch(_SessionId, Inflight, _Streams, 0, Acc) -> fetch(SessionId, Inflight0, [Stream | Streams], N, Publishes0) -> #inflight{next_seqno = FirstSeqNo, offset_ranges = Ranges0} = Inflight0, ItBegin = get_last_iterator(SessionId, Stream, Ranges0), - {ok, ItEnd, Messages} = emqx_ds:next(ItBegin, N), + {ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N), {NMessages, Publishes, Inflight1} = lists:foldl( fun(Msg, {N0, PubAcc0, InflightAcc0}) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index b7b5d0df9..52c98c7d4 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -105,8 +105,6 @@ -export_type([id/0]). --define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). - %% -spec create(clientinfo(), conninfo(), emqx_session:conf()) -> @@ -497,8 +495,6 @@ storage() -> %% @doc Called when a client connects. This function looks up a %% session or returns `false` if previous one couldn't be found. %% -%% This function also spawns replay agents for each iterator. -%% %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. -spec session_open(id()) -> @@ -670,7 +666,9 @@ renew_streams(Id, ExistingStreams, TopicFilter, StartTime) -> ok; false -> mnesia:write(?SESSION_STREAM_TAB, Rec, write), - {ok, Iterator} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime), + {ok, Iterator} = emqx_ds:make_iterator( + ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime + ), IterRec = #ds_iter{id = {Id, Stream}, iter = Iterator}, mnesia:write(?SESSION_ITER_TAB, IterRec, write) end diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 81b997df5..666874608 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -16,6 +16,8 @@ -ifndef(EMQX_PERSISTENT_SESSION_DS_HRL_HRL). -define(EMQX_PERSISTENT_SESSION_DS_HRL_HRL, true). +-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). + -define(SESSION_TAB, emqx_ds_session). -define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions). -define(SESSION_STREAM_TAB, emqx_ds_stream_tab). diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 52ba090b5..45cf85a05 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -256,14 +256,14 @@ consume(TopicFilter, StartMS) -> Streams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartMS), lists:flatmap( fun({_Rank, Stream}) -> - {ok, It} = emqx_ds:make_iterator(Stream, TopicFilter, StartMS), + {ok, It} = emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartMS), consume(It) end, Streams ). consume(It) -> - case emqx_ds:next(It, 100) of + case emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, 100) of {ok, _NIt, _Msgs = []} -> []; {ok, NIt, Msgs} -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 5a14e0bc9..0f8929e23 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -133,7 +133,7 @@ get_listener_port(Type, Name) -> end_per_group(Group, Config) when Group == tcp; Group == ws; Group == quic -> ok = emqx_cth_suite:stop(?config(group_apps, Config)); end_per_group(_, _Config) -> - ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), + catch emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), ok. init_per_testcase(TestCase, Config) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 27a0745bc..725d62673 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -28,7 +28,7 @@ -export([store_batch/2, store_batch/3]). %% Message replay API: --export([get_streams/3, make_iterator/3, next/2]). +-export([get_streams/3, make_iterator/4, next/3]). %% Misc. API: -export([]). @@ -100,6 +100,26 @@ -type get_iterator_result(Iterator) :: {ok, Iterator} | undefined. +-define(persistent_term(DB), {emqx_ds_db_backend, DB}). + +-define(module(DB), (persistent_term:get(?persistent_term(DB)))). + +%%================================================================================ +%% Behavior callbacks +%%================================================================================ + +-callback open_db(db(), create_db_opts()) -> ok | {error, _}. + +-callback drop_db(db()) -> ok | {error, _}. + +-callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result(). + +-callback get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}]. + +-callback make_iterator(db(), _Stream, topic_filter(), time()) -> make_iterator_result(_Iterator). + +-callback next(db(), Iterator, pos_integer()) -> next_result(Iterator). + %%================================================================================ %% API funcions %%================================================================================ @@ -107,19 +127,29 @@ %% @doc Different DBs are completely independent from each other. They %% could represent something like different tenants. -spec open_db(db(), create_db_opts()) -> ok. -open_db(DB, Opts = #{backend := builtin}) -> - emqx_ds_replication_layer:open_db(DB, Opts). +open_db(DB, Opts = #{backend := Backend}) when Backend =:= builtin -> + Module = + case Backend of + builtin -> emqx_ds_replication_layer + end, + persistent_term:put(?persistent_term(DB), Module), + ?module(DB):open_db(DB, Opts). %% @doc TODO: currently if one or a few shards are down, they won't be %% deleted. -spec drop_db(db()) -> ok. drop_db(DB) -> - emqx_ds_replication_layer:drop_db(DB). + case persistent_term:get(?persistent_term(DB), undefined) of + undefined -> + ok; + Module -> + Module:drop_db(DB) + end. -spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result(). store_batch(DB, Msgs, Opts) -> - emqx_ds_replication_layer:store_batch(DB, Msgs, Opts). + ?module(DB):store_batch(DB, Msgs, Opts). -spec store_batch(db(), [emqx_types:message()]) -> store_batch_result(). store_batch(DB, Msgs) -> @@ -168,15 +198,15 @@ store_batch(DB, Msgs) -> %% replaying streams that depend on the given one. -spec get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}]. get_streams(DB, TopicFilter, StartTime) -> - emqx_ds_replication_layer:get_streams(DB, TopicFilter, StartTime). + ?module(DB):get_streams(DB, TopicFilter, StartTime). --spec make_iterator(stream(), topic_filter(), time()) -> make_iterator_result(). -make_iterator(Stream, TopicFilter, StartTime) -> - emqx_ds_replication_layer:make_iterator(Stream, TopicFilter, StartTime). +-spec make_iterator(db(), stream(), topic_filter(), time()) -> make_iterator_result(). +make_iterator(DB, Stream, TopicFilter, StartTime) -> + ?module(DB):make_iterator(DB, Stream, TopicFilter, StartTime). --spec next(iterator(), pos_integer()) -> next_result(). -next(Iter, BatchSize) -> - emqx_ds_replication_layer:next(Iter, BatchSize). +-spec next(db(), iterator(), pos_integer()) -> next_result(). +next(DB, Iter, BatchSize) -> + ?module(DB):next(DB, Iter, BatchSize). %%================================================================================ %% Internal exports diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index b81f43c4f..4a9240f95 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -18,23 +18,25 @@ %% replication on their own. -module(emqx_ds_replication_layer). +-behaviour(emqx_ds). + -export([ list_shards/1, open_db/2, drop_db/1, store_batch/3, get_streams/3, - make_iterator/3, - next/2 + make_iterator/4, + next/3 ]). %% internal exports: -export([ - do_open_shard_v1/2, - do_drop_shard_v1/1, - do_get_streams_v1/3, - do_make_iterator_v1/4, - do_next_v1/3 + do_open_shard_v1/3, + do_drop_shard_v1/2, + do_get_streams_v1/4, + do_make_iterator_v1/5, + do_next_v1/4 ]). -export_type([shard_id/0, stream/0, iterator/0, message_id/0]). @@ -57,7 +59,7 @@ -type db() :: emqx_ds:db(). --type shard_id() :: {db(), atom()}. +-type shard_id() :: atom(). %% This enapsulates the stream entity from the replication level. %% @@ -86,41 +88,36 @@ %%================================================================================ -spec list_shards(db()) -> [shard_id()]. -list_shards(DB) -> +list_shards(_DB) -> %% TODO: milestone 5 - lists:map( - fun(Node) -> - shard_id(DB, Node) - end, - list_nodes() - ). + list_nodes(). -spec open_db(db(), emqx_ds:create_db_opts()) -> ok | {error, _}. open_db(DB, Opts) -> %% TODO: improve error reporting, don't just crash lists:foreach( - fun(Node) -> - Shard = shard_id(DB, Node), - ok = emqx_ds_proto_v1:open_shard(Node, Shard, Opts) + fun(Shard) -> + Node = node_of_shard(DB, Shard), + ok = emqx_ds_proto_v1:open_shard(Node, DB, Shard, Opts) end, - list_nodes() + list_shards(DB) ). -spec drop_db(db()) -> ok | {error, _}. drop_db(DB) -> lists:foreach( - fun(Node) -> - Shard = shard_id(DB, Node), - ok = emqx_ds_proto_v1:drop_shard(Node, Shard) + fun(Shard) -> + Node = node_of_shard(DB, Shard), + ok = emqx_ds_proto_v1:drop_shard(Node, DB, Shard) end, - list_nodes() + list_shards(DB) ). -spec store_batch(db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). store_batch(DB, Msg, Opts) -> %% TODO: Currently we store messages locally. - Shard = shard_id(DB, node()), + Shard = {DB, node()}, emqx_ds_storage_layer:store_batch(Shard, Msg, Opts). -spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) -> @@ -129,8 +126,8 @@ get_streams(DB, TopicFilter, StartTime) -> Shards = list_shards(DB), lists:flatmap( fun(Shard) -> - Node = node_of_shard(Shard), - Streams = emqx_ds_proto_v1:get_streams(Node, Shard, TopicFilter, StartTime), + Node = node_of_shard(DB, Shard), + Streams = emqx_ds_proto_v1:get_streams(Node, DB, Shard, TopicFilter, StartTime), lists:map( fun({RankY, Stream}) -> RankX = Shard, @@ -147,22 +144,22 @@ get_streams(DB, TopicFilter, StartTime) -> Shards ). --spec make_iterator(stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> +-spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). -make_iterator(Stream, TopicFilter, StartTime) -> +make_iterator(DB, Stream, TopicFilter, StartTime) -> #{?tag := ?stream, ?shard := Shard, ?enc := StorageStream} = Stream, - Node = node_of_shard(Shard), - case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, TopicFilter, StartTime) of + Node = node_of_shard(DB, Shard), + case emqx_ds_proto_v1:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?it, ?shard => Shard, ?enc => Iter}}; Err = {error, _} -> Err end. --spec next(iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). -next(Iter0, BatchSize) -> +-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). +next(DB, Iter0, BatchSize) -> #{?tag := ?it, ?shard := Shard, ?enc := StorageIter0} = Iter0, - Node = node_of_shard(Shard), + Node = node_of_shard(DB, Shard), %% TODO: iterator can contain information that is useful for %% reconstructing messages sent over the network. For example, %% when we send messages with the learned topic index, we could @@ -171,7 +168,7 @@ next(Iter0, BatchSize) -> %% %% This kind of trickery should be probably done here in the %% replication layer. Or, perhaps, in the logic layer. - case emqx_ds_proto_v1:next(Node, Shard, StorageIter0, BatchSize) of + case emqx_ds_proto_v1:next(Node, DB, Shard, StorageIter0, BatchSize) of {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; @@ -187,42 +184,49 @@ next(Iter0, BatchSize) -> %% Internal exports (RPC targets) %%================================================================================ --spec do_open_shard_v1(shard_id(), emqx_ds:create_db_opts()) -> ok. -do_open_shard_v1(Shard, Opts) -> - emqx_ds_storage_layer:open_shard(Shard, Opts). +-spec do_open_shard_v1(db(), emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) -> + ok | {error, _}. +do_open_shard_v1(DB, Shard, Opts) -> + emqx_ds_storage_layer:open_shard({DB, Shard}, Opts). --spec do_drop_shard_v1(shard_id()) -> ok. -do_drop_shard_v1(Shard) -> - emqx_ds_storage_layer:drop_shard(Shard). +-spec do_drop_shard_v1(db(), emqx_ds_storage_layer:shard_id()) -> ok | {error, _}. +do_drop_shard_v1(DB, Shard) -> + emqx_ds_storage_layer:drop_shard({DB, Shard}). --spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> +-spec do_get_streams_v1( + emqx_ds:db(), emqx_ds_replicationi_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() +) -> [{integer(), emqx_ds_storage_layer:stream()}]. -do_get_streams_v1(Shard, TopicFilter, StartTime) -> - emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime). +do_get_streams_v1(DB, Shard, TopicFilter, StartTime) -> + emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime). -spec do_make_iterator_v1( - shard_id(), emqx_ds_storage_layer:stream(), emqx_ds:topic_filter(), emqx_ds:time() + emqx_ds:db(), + emqx_ds_storage_layer:shard_id(), + emqx_ds_storage_layer:stream(), + emqx_ds:topic_filter(), + emqx_ds:time() ) -> {ok, emqx_ds_storage_layer:iterator()} | {error, _}. -do_make_iterator_v1(Shard, Stream, TopicFilter, StartTime) -> - emqx_ds_storage_layer:make_iterator(Shard, Stream, TopicFilter, StartTime). +do_make_iterator_v1(DB, Shard, Stream, TopicFilter, StartTime) -> + emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime). --spec do_next_v1(shard_id(), emqx_ds_storage_layer:iterator(), pos_integer()) -> +-spec do_next_v1( + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + pos_integer() +) -> emqx_ds:next_result(emqx_ds_storage_layer:iterator()). -do_next_v1(Shard, Iter, BatchSize) -> - emqx_ds_storage_layer:next(Shard, Iter, BatchSize). +do_next_v1(DB, Shard, Iter, BatchSize) -> + emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize). %%================================================================================ %% Internal functions %%================================================================================ -shard_id(DB, Node) -> - %% TODO: don't bake node name into the schema, don't repeat the - %% Mnesia's 1M$ mistake. - {DB, Node}. - --spec node_of_shard(shard_id()) -> node(). -node_of_shard({_DB, Node}) -> +-spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). +node_of_shard(_DB, Node) -> Node. list_nodes() -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index d2c997ae1..50b6af5b6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -110,7 +110,7 @@ %%================================================================================ -spec create( - emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:shard_id(), rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), options() @@ -137,7 +137,7 @@ create(_ShardId, DBHandle, GenId, Options) -> {Schema, [{DataCFName, DataCFHandle}, {TrieCFName, TrieCFHandle}]}. -spec open( - emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:shard_id(), rocksdb:db_handle(), emqx_ds_storage_layer:gen_id(), emqx_ds_storage_layer:cf_refs(), @@ -173,7 +173,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> }. -spec store_batch( - emqx_ds_replication_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() + emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> @@ -187,7 +187,7 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> ). -spec get_streams( - emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:shard_id(), s(), emqx_ds:topic_filter(), emqx_ds:time() @@ -197,7 +197,7 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) -> [#{?tag => ?stream, ?storage_key => I} || I <- Indexes]. -spec make_iterator( - emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:shard_id(), s(), stream(), emqx_ds:topic_filter(), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index c91ac49d5..8c2e55510 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -50,7 +50,7 @@ {emqx_ds_storage_reference, emqx_ds_storage_reference:options()} | {emqx_ds_storage_bitfield_lts, emqx_ds_storage_bitfield_lts:options()}. --type shard_id() :: emqx_ds_replication_layer:shard_id(). +-type shard_id() :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}. -type cf_refs() :: [{string(), rocksdb:cf_handle()}]. @@ -217,7 +217,7 @@ next(Shard, Iter = #{?tag := ?it, ?generation := GenId, ?enc := GenIter0}, Batch -spec start_link(shard_id(), emqx_ds:builtin_db_opts()) -> {ok, pid()}. -start_link(Shard, Options) -> +start_link(Shard = {_, _}, Options) -> gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). -record(s, { @@ -417,11 +417,11 @@ generations_since(Shard, Since) -> -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). -spec get_schema_runtime(shard_id()) -> shard(). -get_schema_runtime(Shard) -> +get_schema_runtime(Shard = {_, _}) -> persistent_term:get(?PERSISTENT_TERM(Shard)). -spec put_schema_runtime(shard_id(), shard()) -> ok. -put_schema_runtime(Shard, RuntimeSchema) -> +put_schema_runtime(Shard = {_, _}, RuntimeSchema) -> persistent_term:put(?PERSISTENT_TERM(Shard), RuntimeSchema), ok. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index fac7204bf..174312c4e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -25,7 +25,7 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, []). --spec start_shard(emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) -> +-spec start_shard(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) -> supervisor:startchild_ret(). start_shard(Shard, Options) -> supervisor:start_child(?SUP, shard_child_spec(Shard, Options)). @@ -33,9 +33,10 @@ start_shard(Shard, Options) -> -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. stop_shard(Shard) -> ok = supervisor:terminate_child(?SUP, Shard), - ok = supervisor:delete_child(?SUP, Shard). + Ok = supervisor:delete_child(?SUP, Shard). --spec ensure_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> ok | {error, _Reason}. +-spec ensure_shard(emqx_ds_storage_layer:shard_id(), emqx_ds_storage_layer:options()) -> + ok | {error, _Reason}. ensure_shard(Shard, Options) -> case start_shard(Shard, Options) of {ok, _Pid} -> @@ -63,7 +64,7 @@ init([]) -> %% Internal functions %%================================================================================ --spec shard_child_spec(emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) -> +-spec shard_child_spec(emqx_ds_storage_layer:shard_id(), emqx_ds:create_db_opts()) -> supervisor:child_spec(). shard_child_spec(Shard, Options) -> #{ diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 6a79a4a61..c5fee4757 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -19,7 +19,7 @@ -include_lib("emqx_utils/include/bpapi.hrl"). %% API: --export([open_shard/3, drop_shard/2, get_streams/4, make_iterator/5, next/4]). +-export([open_shard/4, drop_shard/3, get_streams/5, make_iterator/6, next/5]). %% behavior callbacks: -export([introduced_in/0]). @@ -28,44 +28,58 @@ %% API funcions %%================================================================================ --spec open_shard(node(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts()) -> +-spec open_shard( + node(), + emqx_ds_replication_layer:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds:create_db_opts() +) -> ok. -open_shard(Node, Shard, Opts) -> - erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [Shard, Opts]). +open_shard(Node, DB, Shard, Opts) -> + erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [DB, Shard, Opts]). --spec drop_shard(node(), emqx_ds_replication_layer:shard_id()) -> +-spec drop_shard(node(), emqx_ds_replication_layer:db(), emqx_ds_replication_layer:shard_id()) -> ok. -drop_shard(Node, Shard) -> - erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [Shard]). +drop_shard(Node, DB, Shard) -> + erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [DB, Shard]). -spec get_streams( - node(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds:topic_filter(), + emqx_ds:time() ) -> [{integer(), emqx_ds_storage_layer:stream()}]. -get_streams(Node, Shard, TopicFilter, Time) -> - erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). +get_streams(Node, DB, Shard, TopicFilter, Time) -> + erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]). -spec make_iterator( node(), + emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds_storage_layer:stream(), emqx_ds:topic_filter(), emqx_ds:time() ) -> {ok, emqx_ds_storage_layer:iterator()} | {error, _}. -make_iterator(Node, Shard, Stream, TopicFilter, StartTime) -> +make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [ - Shard, Stream, TopicFilter, StartTime + DB, Shard, Stream, TopicFilter, StartTime ]). -spec next( - node(), emqx_ds_replication_layer:shard_id(), emqx_ds_storage_layer:iterator(), pos_integer() + node(), + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds_storage_layer:iterator(), + pos_integer() ) -> {ok, emqx_ds_storage_layer:iterator(), [emqx_types:messages()]} | {ok, end_of_stream} | {error, _}. -next(Node, Shard, Iter, BatchSize) -> - erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [Shard, Iter, BatchSize]). +next(Node, DB, Shard, Iter, BatchSize) -> + erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]). %%================================================================================ %% behavior callbacks diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 9637431d3..9b74e3227 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -54,7 +54,7 @@ t_02_smoke_get_streams_start_iter(_Config) -> TopicFilter = ['#'], [{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), ?assertMatch({_, _}, Rank), - ?assertMatch({ok, _Iter}, emqx_ds:make_iterator(Stream, TopicFilter, StartTime)). + ?assertMatch({ok, _Iter}, emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime)). %% A simple smoke test that verifies that it's possible to iterate %% over messages. @@ -70,8 +70,8 @@ t_03_smoke_iterate(_Config) -> ], ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), - {ok, Iter0} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime), - {ok, Iter, Batch} = iterate(Iter0, 1), + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), + {ok, Iter, Batch} = iterate(DB, Iter0, 1), ?assertEqual(Msgs, Batch, {Iter0, Iter}). %% Verify that iterators survive restart of the application. This is @@ -91,14 +91,14 @@ t_04_restart(_Config) -> ], ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), - {ok, Iter0} = emqx_ds:make_iterator(Stream, TopicFilter, StartTime), + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), %% Restart the application: ?tp(warning, emqx_ds_SUITE_restart_app, #{}), ok = application:stop(emqx_durable_storage), {ok, _} = application:ensure_all_started(emqx_durable_storage), ok = emqx_ds:open_db(DB, opts()), %% The old iterator should be still operational: - {ok, Iter, Batch} = iterate(Iter0, 1), + {ok, Iter, Batch} = iterate(DB, Iter0, 1), ?assertEqual(Msgs, Batch, {Iter0, Iter}). message(Topic, Payload, PublishedAt) -> @@ -109,15 +109,15 @@ message(Topic, Payload, PublishedAt) -> id = emqx_guid:gen() }. -iterate(It, BatchSize) -> - iterate(It, BatchSize, []). +iterate(DB, It, BatchSize) -> + iterate(DB, It, BatchSize, []). -iterate(It0, BatchSize, Acc) -> - case emqx_ds:next(It0, BatchSize) of +iterate(DB, It0, BatchSize, Acc) -> + case emqx_ds:next(DB, It0, BatchSize) of {ok, It, []} -> {ok, It, Acc}; {ok, It, Msgs} -> - iterate(It, BatchSize, Acc ++ Msgs); + iterate(DB, It, BatchSize, Acc ++ Msgs); Ret -> Ret end.