From c91df2f5cd917335f973224ed6e32ef177ad28f5 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 2 Oct 2023 23:04:43 +0200 Subject: [PATCH] refactor(ds): Create a prototype of replication layer --- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- .../emqx_persistent_session_ds_proto_v1.erl | 17 +- apps/emqx_durable_storage/IMPLEMENTATION.md | 42 ---- apps/emqx_durable_storage/README.md | 9 +- apps/emqx_durable_storage/src/emqx_ds.erl | 199 ++++++++++-------- apps/emqx_durable_storage/src/emqx_ds.erl_ | 189 +++++++++++++++++ apps/emqx_durable_storage/src/emqx_ds_lts.erl | 3 +- .../src/emqx_ds_message_storage_bitmask.erl | 15 +- .../src/emqx_ds_replication_layer.erl | 128 +++++++++++ .../src/emqx_ds_storage_layer.erl | 27 ++- .../src/proto/emqx_ds_proto_v1.erl | 56 +++++ 11 files changed, 539 insertions(+), 148 deletions(-) create mode 100644 apps/emqx_durable_storage/src/emqx_ds.erl_ create mode 100644 apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl create mode 100644 apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index e456211fc..174a02156 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -74,7 +74,7 @@ ]). %% FIXME --define(DS_SHARD_ID, <<"local">>). +-define(DS_SHARD_ID, atom_to_binary(node())). -define(DEFAULT_KEYSPACE, default). -define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). diff --git a/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl index d35ccd963..edaaea775 100644 --- a/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl @@ -23,7 +23,8 @@ open_iterator/4, close_iterator/2, - close_all_iterators/2 + close_all_iterators/2, + get_streams/5 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -50,6 +51,20 @@ open_iterator(Nodes, TopicFilter, StartMS, IteratorID) -> ?TIMEOUT ). +-spec get_streams( + node(), + emqx_ds:keyspace(), + emqx_ds:shard_id(), + emqx_ds:topic_filter(), + emqx_ds:time()) -> + [emqx_ds_storage_layer:stream()]. +get_streams(Node, Keyspace, ShardId, TopicFilter, StartTime) -> + erpc:call( + Node, + emqx_ds_storage_layer, + get_streams, + [Keyspace, ShardId, TopicFilter, StartTime]). + -spec close_iterator( [node()], emqx_ds:iterator_id() diff --git a/apps/emqx_durable_storage/IMPLEMENTATION.md b/apps/emqx_durable_storage/IMPLEMENTATION.md index 9c0c5928c..33f02dfc4 100644 --- a/apps/emqx_durable_storage/IMPLEMENTATION.md +++ b/apps/emqx_durable_storage/IMPLEMENTATION.md @@ -31,48 +31,6 @@ Read pattern: pseudoserial Number of records: O(total write throughput * retention time) -## Session storage - -Data there is updated when: - -- A new client connects with clean session = false -- Client subscribes to a topic -- Client unsubscribes to a topic -- Garbage collection is performed - -Write throughput: low - -Data is read when a client connects and replay agents are started - -Read throughput: low - -Data format: - -`#session{clientId = "foobar", iterators = [ItKey1, ItKey2, ItKey3, ...]}` - -Number of records: O(N clients) - -Size of record: O(N subscriptions per clients) - -## Iterator storage - -Data is written every time a client acks a message. - -Data is read when a client reconnects and we restart replay agents. - -`#iterator{key = IterKey, data = Blob}` - -Number of records: O(N clients * N subscriptions per client) - -Size of record: O(1) - -Write throughput: high, lots of small updates - -Write pattern: mostly key overwrite - -Read throughput: low - -Read pattern: random # Push vs. Pull model diff --git a/apps/emqx_durable_storage/README.md b/apps/emqx_durable_storage/README.md index 7de43bee0..f01af0c37 100644 --- a/apps/emqx_durable_storage/README.md +++ b/apps/emqx_durable_storage/README.md @@ -1,9 +1,10 @@ # EMQX Replay -`emqx_ds` is a durable storage for MQTT messages within EMQX. -It implements the following scenarios: -- Persisting messages published by clients -- +`emqx_ds` is a generic durable storage for MQTT messages within EMQX. + +Concepts: + + > 0. App overview introduction > 1. let people know what your project can do specifically. Is it a base diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index feaa37bc0..ad6a07330 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -15,48 +15,29 @@ %%-------------------------------------------------------------------- -module(emqx_ds). --include_lib("stdlib/include/ms_transform.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). +%% Management API: +-export([create_db/2]). -%% API: --export([ensure_shard/2]). -%% Messages: --export([message_store/2, message_store/1, message_stats/0]). -%% Iterator: --export([iterator_update/2, iterator_next/1, iterator_stats/0]). +%% Message storage API: +-export([message_store/3, message_store/2]). + +%% Message replay API: +-export([get_streams/3, open_iterator/2, next/2]). %% internal exports: -export([]). --export_type([ - keyspace/0, - message_id/0, - message_stats/0, - message_store_opts/0, - replay/0, - replay_id/0, - iterator_id/0, - iterator/0, - shard/0, - shard_id/0, - topic/0, - topic_filter/0, - time/0 -]). +-export_type([db/0, time/0, topic_filter/0, topic/0]). %%================================================================================ %% Type declarations %%================================================================================ --type iterator() :: term(). - --type iterator_id() :: binary(). - --type message_store_opts() :: #{}. - --type message_stats() :: #{}. - --type message_id() :: binary(). +%% Different DBs are completely independent from each other. They +%% could represent something like different tenants. +%% +%% Topics stored in different DBs aren't necesserily disjoint. +-type db() :: binary(). %% Parsed topic. -type topic() :: list(binary()). @@ -64,9 +45,30 @@ %% Parsed topic filter. -type topic_filter() :: list(binary() | '+' | '#' | ''). --type keyspace() :: atom(). --type shard_id() :: binary(). --type shard() :: {keyspace(), shard_id()}. +%% This record enapsulates the stream entity from the replication +%% level. +%% +%% TODO: currently the stream is hardwired to only support the +%% internal rocksdb storage. In t he future we want to add another +%% implementations for emqx_ds, so this type has to take this into +%% account. +-record(stream, + { shard :: emqx_ds:shard() + , enc :: emqx_ds_replication_layer:stream() + }). + +-type stream_rank() :: {integer(), integer()}. + +-opaque stream() :: #stream{}. + +%% This record encapsulates the iterator entity from the replication +%% level. +-record(iterator, + { shard :: emqx_ds:shard() + , enc :: enqx_ds_replication_layer:iterator() + }). + +-opaque iterator() :: #iterator{}. %% Timestamp %% Earliest possible timestamp is 0. @@ -74,70 +76,89 @@ %% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. -type time() :: non_neg_integer(). --type replay_id() :: binary(). +-type message_store_opts() :: #{}. --type replay() :: { - _TopicFilter :: topic_filter(), - _StartTime :: time() -}. +-type create_db_opts() :: #{}. + +-type message_id() :: binary(). %%================================================================================ %% API funcions %%================================================================================ --spec ensure_shard(shard(), emqx_ds_storage_layer:options()) -> - ok | {error, _Reason}. -ensure_shard(Shard, Options) -> - case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of - {ok, _Pid} -> - ok; - {error, {already_started, _Pid}} -> - ok; - {error, Reason} -> - {error, Reason} +-spec create_db(db(), create_db_opts()) -> ok. +create_db(DB, Opts) -> + emqx_ds_replication_layer:create_db(DB, Opts). + +-spec message_store(db(), [emqx_types:message()], message_store_opts()) -> + {ok, [message_id()]} | {error, _}. +message_store(DB, Msgs, Opts) -> + emqx_ds_replication_layer:message_store(DB, Msgs, Opts). + +-spec message_store(db(), [emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. +message_store(DB, Msgs) -> + message_store(DB, Msgs, #{}). + +%% @doc Get a list of streams needed for replaying a topic filter. +%% +%% Motivation: under the hood, EMQX may store different topics at +%% different locations or even in different databases. A wildcard +%% topic filter may require pulling data from any number of locations. +%% +%% Stream is an abstraction exposed by `emqx_ds' that reflects the +%% notion that different topics can be stored differently, but hides +%% the implementation details. +%% +%% Rules: +%% +%% 1. New streams matching the topic filter can appear without notice, +%% so the replayer must periodically call this function to get the +%% updated list of streams. +%% +%% 2. Streams may depend on one another. Therefore, care should be +%% taken while replaying them in parallel to avoid out-of-order +%% replay. This function returns stream together with its +%% "coordinates": `{X, T, Stream}'. If X coordinate of two streams is +%% different, then they can be replayed in parallel. If it's the +%% same, then the stream with smaller T coordinate should be replayed +%% first. +-spec get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}]. +get_streams(DB, TopicFilter, StartTime) -> + Shards = emqx_ds_replication_layer:list_shards(DB), + lists:flatmap( + fun(Shard) -> + Streams = emqx_ds_replication_layer:get_streams(Shard, TopicFilter, StartTime), + [{Rank, #stream{ shard = Shard + , enc = I + }} || {Rank, I} <- Streams] + end, + Shards). + +-spec open_iterator(stream(), time()) -> {ok, iterator()} | {error, _}. +open_iterator(#stream{shard = Shard, enc = Stream}, StartTime) -> + case emqx_ds_replication_layer:open_iterator(Shard, Stream, StartTime) of + {ok, Iter} -> + {ok, #iterator{shard = Shard, enc = Iter}}; + Err = {error, _} -> + Err end. -%%-------------------------------------------------------------------------------- -%% Message -%%-------------------------------------------------------------------------------- --spec message_store([emqx_types:message()], message_store_opts()) -> - {ok, [message_id()]} | {error, _}. -message_store(_Msg, _Opts) -> - %% TODO - {error, not_implemented}. +-spec next(iterator(), non_neg_integer()) -> {ok, iterator(), [emqx_types:message()]} | end_of_stream. +next(#iterator{shard = Shard, enc = Iter0}, BatchSize) -> + case emqx_ds_replication_layer:next(Shard, Iter0, BatchSize) of + {ok, Iter, Batch} -> + {ok, #iterator{shard = Shard, enc = Iter}, Batch}; + end_of_stream -> + end_of_stream + end. --spec message_store([emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. -message_store(Msg) -> - %% TODO - message_store(Msg, #{}). +%%================================================================================ +%% behavior callbacks +%%================================================================================ --spec message_stats() -> message_stats(). -message_stats() -> - #{}. - -%%-------------------------------------------------------------------------------- -%% Session -%%-------------------------------------------------------------------------------- - -%%-------------------------------------------------------------------------------- -%% Iterator (pull API) -%%-------------------------------------------------------------------------------- - -%% @doc Called when a client acks a message --spec iterator_update(iterator_id(), iterator()) -> ok. -iterator_update(_IterId, _Iter) -> - %% TODO - ok. - -%% @doc Called when a client acks a message --spec iterator_next(iterator()) -> {value, emqx_types:message(), iterator()} | none | {error, _}. -iterator_next(_Iter) -> - %% TODO - none. - --spec iterator_stats() -> #{}. -iterator_stats() -> - #{}. +%%================================================================================ +%% Internal exports +%%================================================================================ %%================================================================================ %% Internal functions diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl_ b/apps/emqx_durable_storage/src/emqx_ds.erl_ new file mode 100644 index 000000000..61b4c4bb3 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds.erl_ @@ -0,0 +1,189 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds). + +-include_lib("stdlib/include/ms_transform.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% API: +-export([ensure_shard/2]). +%% Messages: +-export([message_store/2, message_store/1, message_stats/0]). +%% Iterator: +-export([get_streams/3, open_iterator/1, next/2]). + +%% internal exports: +-export([]). + +-export_type([ + stream/0, + keyspace/0, + message_id/0, + message_stats/0, + message_store_opts/0, + replay/0, + replay_id/0, + %iterator_id/0, + iterator/0, + topic/0, + topic_filter/0, + time/0 +]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +%% This record enapsulates the stream entity from the storage level. +%% +%% TODO: currently the stream is hardwired to only support the +%% internal rocksdb storage. In t he future we want to add another +%% implementations for emqx_ds, so this type has to take this into +%% account. +-record(stream, + { shard :: emqx_ds:shard() + , :: emqx_ds_storage_layer:stream() + }). + +-opaque stream() :: #stream{}. + +-type iterator() :: term(). + +%-type iterator_id() :: binary(). + +-type message_store_opts() :: #{}. + +-type message_stats() :: #{}. + +-type message_id() :: binary(). + +%% Parsed topic. +-type topic() :: list(binary()). + +%% Parsed topic filter. +-type topic_filter() :: list(binary() | '+' | '#' | ''). + +-type keyspace() :: atom(). +-type shard_id() :: binary(). +-type shard() :: {keyspace(), shard_id()}. + +%% Timestamp +%% Earliest possible timestamp is 0. +%% TODO granularity? Currently, we should always use micro second, as that's the unit we +%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. +-type time() :: non_neg_integer(). + +-type replay_id() :: binary(). + +-type replay() :: { + _TopicFilter :: topic_filter(), + _StartTime :: time() +}. + +%%================================================================================ +%% API funcions +%%================================================================================ + +%% @doc Get a list of streams needed for replaying a topic filter. +%% +%% Motivation: under the hood, EMQX may store different topics at +%% different locations or even in different databases. A wildcard +%% topic filter may require pulling data from any number of locations. +%% +%% Stream is an abstraction exposed by `emqx_ds' that reflects the +%% notion that different topics can be stored differently, but hides +%% the implementation details. +%% +%% Rules: +%% +%% 1. New streams matching the topic filter can appear without notice, +%% so the replayer must periodically call this function to get the +%% updated list of streams. +%% +%% 2. Streams may depend on one another. Therefore, care should be +%% taken while replaying them in parallel to avoid out-of-order +%% replay. This function returns stream together with its +%% "coordinates": `{X, T, Stream}'. If X coordinate of two streams is +%% different, then they can be replayed in parallel. If it's the +%% same, then the stream with smaller T coordinate should be replayed +%% first. +-spec get_streams(keyspace(), topic_filter(), time()) -> [{integer(), integer(), stream()}]. +get_streams(Keyspace, TopicFilter, StartTime) -> + ShardIds = emqx_ds_replication_layer:get_all_shards(Keyspace), + lists:flatmap( + fun(Shard) -> + Node = emqx_ds_replication_layer:shard_to_node(Shard), + try + Streams = emqx_persistent_session_ds_proto_v1:get_streams(Node, Keyspace, Shard, TopicFilter, StartTime), + [#stream{ shard = {Keyspace, ShardId} + , stream = Stream + } || Stream <- Streams] + catch + error:{erpc, _} -> + %% The caller has to periodically refresh the + %% list of streams anyway, so it's ok to ignore + %% transient errors. + [] + end + end, + ShardIds). + +-spec ensure_shard(shard(), emqx_ds_storage_layer:options()) -> + ok | {error, _Reason}. +ensure_shard(Shard, Options) -> + case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of + {ok, _Pid} -> + ok; + {error, {already_started, _Pid}} -> + ok; + {error, Reason} -> + {error, Reason} + end. + +%%-------------------------------------------------------------------------------- +%% Message +%%-------------------------------------------------------------------------------- + +-spec message_store([emqx_types:message()], message_store_opts()) -> + {ok, [message_id()]} | {error, _}. +message_store(Msg, Opts) -> + message_store(Msg, Opts). + +-spec message_store([emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. +message_store(Msg) -> + message_store(Msg, #{}). + +-spec message_stats() -> message_stats(). +message_stats() -> + #{}. + +%%-------------------------------------------------------------------------------- +%% Iterator (pull API) +%%-------------------------------------------------------------------------------- + +-spec open_iterator(stream()) -> {ok, iterator()}. +open_iterator(#stream{shard = {_Keyspace, _ShardId}, stream = _StorageSpecificStream}) -> + error(todo). + +-spec next(iterator(), non_neg_integer()) -> + {ok, iterator(), [emqx_types:message()]} + | end_of_stream. +next(_Iterator, _BatchSize) -> + error(todo). + +%%================================================================================ +%% Internal functions +%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index 384677d21..9d206ee81 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -458,7 +458,8 @@ topic_match_test() -> [{S2_12, [<<"1">>]}, {S2_1_, [<<"1">>, <<"2">>]}]), assert_match_topics(T, [2, '#'], - [{S21, []}, {S22, []}, {S2_, ['+']}, + [{S21, []}, {S22, []}, + {S2_, ['+']}, {S2_11, ['+']}, {S2_12, ['+']}, {S2_1_, ['+', '+']}]), ok diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index be8a207bb..f51d556f1 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -85,9 +85,9 @@ -export([store/5]). -export([delete/4]). --export([make_iterator/2]). --export([make_iterator/3]). --export([next/1]). + +-export([get_streams/2]). +-export([make_iterator/2, make_iterator/3, next/1]). -export([preserve_iterator/1]). -export([restore_iterator/2]). @@ -112,7 +112,7 @@ compute_topic_seek/4 ]). --export_type([db/0, iterator/0, schema/0]). +-export_type([db/0, stream/0, iterator/0, schema/0]). -export_type([options/0]). -export_type([iteration_options/0]). @@ -131,6 +131,8 @@ %% Type declarations %%================================================================================ +-opaque stream() :: singleton_stream. + -type topic() :: emqx_ds:topic(). -type topic_filter() :: emqx_ds:topic_filter(). -type time() :: emqx_ds:time(). @@ -288,6 +290,11 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper), rocksdb:delete(DBHandle, CFHandle, Key, DB#db.write_options). +-spec get_streams(db(), emqx_ds:reply()) -> + [stream()]. +get_streams(_, _) -> + [singleton_stream]. + -spec make_iterator(db(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, Replay) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl new file mode 100644 index 000000000..9fe08e0a2 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -0,0 +1,128 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_replication_layer). + +-export([ + list_shards/1, + create_db/2, + message_store/3, + get_streams/3, + open_iterator/3, + next/3 + ]). + + +%% internal exports: +-export([ do_create_shard_v1/2, + do_get_streams_v1/3, + do_open_iterator/3, + do_next_v1/3 + ]). + +-export_type([shard/0, stream/0, iterator/0]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-opaque stream() :: emqx_ds_storage_layer:stream(). + +-type shard() :: binary(). + +-opaque iterator() :: emqx_ds_storage_layer:iterator(). + +%%================================================================================ +%% API functions +%%================================================================================ + +-spec list_shards(emqx_ds:db()) -> [shard()]. +list_shards(DB) -> + %% TODO: milestone 5 + lists:map( + fun(Node) -> + term_to_binary({DB, Node}) + end, + list_nodes()). + +-spec create_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok. +create_db(DB, Opts) -> + lists:foreach( + fun(Node) -> + Shard = term_to_binary({DB, Node}), + emqx_ds_proto_v1:create_shard(Node, Shard, Opts) + end, + list_nodes()). + +-spec message_store(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> + {ok, [message_id()]} | {error, _}. +message_store(DB, Msg, Opts) -> + %% TODO: milestone 5. Currently we store messages locally. + Shard = term_to_binary({DB, node()}), + emqx_ds_storage_layer:message_store(Shard, Msg, Opts). + +-spec get_streams(shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. +get_streams(Shard, TopicFilter, StartTime) -> + Node = node_of_shard(Shard), + emqx_ds_proto_v1:get_streams(Node, Shard, TopicFilter, StartTime). + +-spec open_iterator(shard(), stream(), emqx_ds:time()) -> {ok, iterator()} | {error, _}. +open_iterator(Shard, Stream, StartTime) -> + Node = node_of_shard(Shard), + emqx_ds_proto_v1:open_iterator(Node, Shard, Stream, StartTime). + +-spec next(shard(), iterator(), non_neg_integer()) -> + {ok, iterator(), [emqx_types:message()]} | end_of_stream. +next(Shard, Iter, BatchSize) -> + Node = node_of_shard(Shard), + emqx_ds_proto_v1:next(Node, Shard, Iter, BatchSize). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +%%================================================================================ +%% Internal exports (RPC targets) +%%================================================================================ + +-spec do_create_shard_v1(shard(), emqx_ds:create_db_opts()) -> ok. +do_create_shard_v1(Shard, Opts) -> + error({todo, Shard, Opts}). + +-spec do_get_streams_v1(shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> + [{emqx_ds:stream_rank(), stream()}]. +do_get_streams_v1(Shard, TopicFilter, StartTime) -> + error({todo, Shard, TopicFilter, StartTime}). + +-spec do_open_iterator_v1(shard(), stream(), emqx_ds:time()) -> iterator(). +do_open_iterator_v1(Shard, Stream, Time) -> + error({todo, Shard, Stream, StartTime}). + +-spec do_next_v1(shard(), iterator(), non_neg_integer()) -> + {ok, iterator(), [emqx_types:message()]} | end_of_stream. +do_next_v1(Shard, Iter, BatchSize) -> + error({todo, Shard, Iter, BatchSize}). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +-spec node_of_shard(shard()) -> node(). +node_of_shard(ShardId) -> + {_DB, Node} = binary_to_term(ShardId), + Node. + +list_nodes() -> + mria:running_nodes(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 25a58950d..7a96cab51 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -9,6 +9,7 @@ -export([start_link/2]). -export([create_generation/3]). +-export([get_streams/3]). -export([store/5]). -export([delete/4]). @@ -27,7 +28,7 @@ %% behaviour callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). --export_type([cf_refs/0, gen_id/0, options/0, state/0, iterator/0]). +-export_type([stream/0, cf_refs/0, gen_id/0, options/0, state/0, iterator/0]). -export_type([db_options/0, db_write_options/0, db_read_options/0]). -compile({inline, [meta_lookup/2]}). @@ -36,6 +37,8 @@ %% Type declarations %%================================================================================ +-opaque stream() :: {term()}. + -type options() :: #{ dir => file:filename() }. @@ -114,10 +117,10 @@ cf_refs(), _Schema ) -> - term(). + _DB. -callback store( - _Schema, + _DB, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic(), @@ -125,13 +128,16 @@ ) -> ok | {error, _}. --callback delete(_Schema, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) -> +-callback delete(_DB, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) -> ok | {error, _}. --callback make_iterator(_Schema, emqx_ds:replay()) -> +-callback get_streams(_DB, emqx_ds:topic_filter(), emqx_ds:time()) -> + [_Stream]. + +-callback make_iterator(_DB, emqx_ds:replay()) -> {ok, _It} | {error, _}. --callback restore_iterator(_Schema, _Serialized :: binary()) -> {ok, _It} | {error, _}. +-callback restore_iterator(_DB, _Serialized :: binary()) -> {ok, _It} | {error, _}. -callback preserve_iterator(_It) -> term(). @@ -146,6 +152,15 @@ start_link(Shard = {Keyspace, ShardId}, Options) -> gen_server:start_link(?REF(Keyspace, ShardId), ?MODULE, {Shard, Options}, []). +-spec get_streams(emqx_ds:keyspace(), emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [stream()]. +get_streams(KeySpace, TopicFilter, StartTime) -> + %% FIXME: messages can be potentially stored in multiple + %% generations. This function should return the results from all + %% of them! + %% Otherwise we could LOSE messages when generations are switched. + {GenId, #{module := Mod, }} = meta_lookup_gen(Shard, StartTime), + + -spec create_generation( emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config() ) -> diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl new file mode 100644 index 000000000..f5d802003 --- /dev/null +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -0,0 +1,56 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_proto_v1). + +-behavior(emqx_bpapi). + +%% API: +-export([]). + +%% behavior callbacks: +-export([introduced_in/0]). + +%%================================================================================ +%% API funcions +%%================================================================================ + +-spec create_shard(node(), emqx_ds_replication_layer:shard(), emqx_ds:create_db_opts()) -> + ok. +create_shard(Node, Shard, Opts) -> + erpc:call(Node, emqx_ds_replication_layer, do_create_shard_v1, [Shard, Opts]). + +-spec get_streams(node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> + [emqx_ds_replication_layer:stream()]. +get_streams(Shard, TopicFilter, Time) -> + erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). + +-spec open_iterator(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:stream(), emqx_ds:time()) -> + {ok, emqx_ds_replication_layer:iterator()} | {error, _}. +open_iterator(Node, Shard, Stream, StartTime) -> + erpc:call(Node, emqx_ds_replication_layer, do_open_iterator_v1, [Shard, Stream, Time]). + +-spec next(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), non_neg_integer()) -> + {ok, emqx_ds_replication_layer:iterator(), [emqx_types:messages()]} | end_of_stream. +next(Node, Shard, Iter, BatchSize) -> + erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [Shard, Iter, BatchSize]). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +introduced_in() -> + %% FIXME + "5.3.0".