From c6a721a7eb431d5bd910bf84f23939f0caef744e Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 3 Oct 2023 17:13:16 +0200 Subject: [PATCH] refactor(ds): Passthrough open_db and get_channels to storage layer --- ...l => emqx_persistent_session_ds_SUITE.erl} | 2 +- apps/emqx/src/emqx_persistent_message.erl | 10 +- ...ds.erl => emqx_persistent_session_ds.erl_} | 64 +- .../emqx_persistent_session_ds_proto_v1.erl | 22 +- apps/emqx_durable_storage/src/emqx_ds.erl | 155 ++- apps/emqx_durable_storage/src/emqx_ds_lts.erl | 337 ++++--- .../src/emqx_ds_replication_layer.erl | 173 ++-- .../src/emqx_ds_storage_layer.erl | 893 ++++++------------ .../src/emqx_ds_storage_layer.erl_ | 714 ++++++++++++++ ...erl => emqx_ds_storage_layer_bitmask.erl_} | 18 +- .../src/emqx_ds_storage_layer_sup.erl | 2 +- .../src/emqx_ds_storage_reference.erl | 136 +++ .../src/proto/emqx_ds_proto_v1.erl | 33 +- .../test/emqx_ds_SUITE.erl | 107 +++ ...mqx_ds_message_storage_bitmask_SUITE.erl_} | 0 ...E.erl => emqx_ds_storage_layer_SUITE.erl_} | 0 scripts/check-elixir-applications.exs | 2 +- scripts/check-elixir-deps-discrepancies.exs | 2 +- ...elixir-emqx-machine-boot-discrepancies.exs | 2 +- scripts/check_missing_reboot_apps.exs | 2 +- 20 files changed, 1683 insertions(+), 991 deletions(-) rename apps/emqx/integration_test/{emqx_ds_SUITE.erl => emqx_persistent_session_ds_SUITE.erl} (99%) rename apps/emqx/src/{emqx_persistent_session_ds.erl => emqx_persistent_session_ds.erl_} (90%) create mode 100644 apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl_ rename apps/emqx_durable_storage/src/{emqx_ds_message_storage_bitmask.erl => emqx_ds_storage_layer_bitmask.erl_} (98%) create mode 100644 apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl create mode 100644 apps/emqx_durable_storage/test/emqx_ds_SUITE.erl rename apps/emqx_durable_storage/test/{emqx_ds_message_storage_bitmask_SUITE.erl => emqx_ds_message_storage_bitmask_SUITE.erl_} (100%) rename apps/emqx_durable_storage/test/{emqx_ds_storage_layer_SUITE.erl => emqx_ds_storage_layer_SUITE.erl_} (100%) diff --git a/apps/emqx/integration_test/emqx_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl similarity index 99% rename from apps/emqx/integration_test/emqx_ds_SUITE.erl rename to apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 34c15b505..d2d23e8cd 100644 --- a/apps/emqx/integration_test/emqx_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -1,7 +1,7 @@ %%-------------------------------------------------------------------- %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_ds_SUITE). +-module(emqx_persistent_session_ds_SUITE). -compile(export_all). -compile(nowarn_export_all). diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 96c767d7e..8801acce5 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -27,10 +27,6 @@ ]). %% FIXME --define(DS_SHARD_ID, <<"local">>). --define(DEFAULT_KEYSPACE, default). --define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). - -define(WHEN_ENABLED(DO), case is_store_enabled() of true -> DO; @@ -42,9 +38,9 @@ init() -> ?WHEN_ENABLED(begin - ok = emqx_ds:create_db(<<"default">>, #{}), + ok = emqx_ds:open_db(<<"default">>, #{}), ok = emqx_persistent_session_ds_router:init_tables(), - ok = emqx_persistent_session_ds:create_tables(), + %ok = emqx_persistent_session_ds:create_tables(), ok end). @@ -70,7 +66,7 @@ needs_persistence(Msg) -> not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)). store_message(Msg) -> - emqx_ds:message_store([Msg]). + emqx_ds:store_batch([Msg]). has_subscribers(#message{topic = Topic}) -> emqx_persistent_session_ds_router:has_any_route(Topic). diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl_ similarity index 90% rename from apps/emqx/src/emqx_persistent_session_ds.erl rename to apps/emqx/src/emqx_persistent_session_ds.erl_ index 174a02156..3fff5f7ba 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl_ @@ -62,22 +62,6 @@ -export([session_open/1]). -endif. -%% RPC --export([ - ensure_iterator_closed_on_all_shards/1, - ensure_all_iterators_closed/1 -]). --export([ - do_open_iterator/3, - do_ensure_iterator_closed/1, - do_ensure_all_iterators_closed/1 -]). - -%% FIXME --define(DS_SHARD_ID, atom_to_binary(node())). --define(DEFAULT_KEYSPACE, default). --define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). - %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% an atom, in theory (?). -type id() :: binary(). @@ -157,7 +141,6 @@ destroy(#{clientid := ClientID}) -> destroy_session(ClientID). destroy_session(ClientID) -> - _ = ensure_all_iterators_closed(ClientID), session_drop(ClientID). %%-------------------------------------------------------------------- @@ -410,9 +393,9 @@ open_iterator_on_all_shards(TopicFilter, Iterator) -> %% RPC target. -spec do_open_iterator(emqx_types:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> {ok, emqx_ds_storage_layer:iterator()} | {error, _Reason}. -do_open_iterator(TopicFilter, StartMS, IteratorID) -> - Replay = {TopicFilter, StartMS}, - emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay). +do_open_iterator(TopicFilter, StartMS, _IteratorID) -> + %% TODO: wrong + {ok, emqx_ds:make_iterator(TopicFilter, StartMS)}. -spec del_subscription(topic(), iterator(), id()) -> ok. @@ -420,49 +403,8 @@ del_subscription(TopicFilterBin, #{id := IteratorID}, DSSessionID) -> % N.B.: see comments in `?MODULE:add_subscription' for a discussion about the % order of operations here. TopicFilter = emqx_topic:words(TopicFilterBin), - Ctx = #{iterator_id => IteratorID}, - ?tp_span( - persistent_session_ds_close_iterators, - Ctx, - ok = ensure_iterator_closed_on_all_shards(IteratorID) - ), - ?tp_span( - persistent_session_ds_iterator_delete, - Ctx, - session_del_iterator(DSSessionID, TopicFilter) - ), ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID). --spec ensure_iterator_closed_on_all_shards(emqx_ds:iterator_id()) -> ok. -ensure_iterator_closed_on_all_shards(IteratorID) -> - %% Note: currently, shards map 1:1 to nodes, but this will change in the future. - Nodes = emqx:running_nodes(), - Results = emqx_persistent_session_ds_proto_v1:close_iterator(Nodes, IteratorID), - %% TODO: handle errors - true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results), - ok. - -%% RPC target. --spec do_ensure_iterator_closed(emqx_ds:iterator_id()) -> ok. -do_ensure_iterator_closed(IteratorID) -> - ok = emqx_ds_storage_layer:discard_iterator(?DS_SHARD, IteratorID), - ok. - --spec ensure_all_iterators_closed(id()) -> ok. -ensure_all_iterators_closed(DSSessionID) -> - %% Note: currently, shards map 1:1 to nodes, but this will change in the future. - Nodes = emqx:running_nodes(), - Results = emqx_persistent_session_ds_proto_v1:close_all_iterators(Nodes, DSSessionID), - %% TODO: handle errors - true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results), - ok. - -%% RPC target. --spec do_ensure_all_iterators_closed(id()) -> ok. -do_ensure_all_iterators_closed(DSSessionID) -> - ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, DSSessionID), - ok. - %%-------------------------------------------------------------------- %% Session tables operations %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl index edaaea775..d9b882f3d 100644 --- a/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl @@ -52,18 +52,20 @@ open_iterator(Nodes, TopicFilter, StartMS, IteratorID) -> ). -spec get_streams( - node(), - emqx_ds:keyspace(), - emqx_ds:shard_id(), - emqx_ds:topic_filter(), - emqx_ds:time()) -> - [emqx_ds_storage_layer:stream()]. + node(), + emqx_ds:keyspace(), + emqx_ds:shard_id(), + emqx_ds:topic_filter(), + emqx_ds:time() +) -> + [emqx_ds_storage_layer:stream()]. get_streams(Node, Keyspace, ShardId, TopicFilter, StartTime) -> erpc:call( - Node, - emqx_ds_storage_layer, - get_streams, - [Keyspace, ShardId, TopicFilter, StartTime]). + Node, + emqx_ds_storage_layer, + get_streams, + [Keyspace, ShardId, TopicFilter, StartTime] + ). -spec close_iterator( [node()], diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 70cdd8d17..6a20afbf1 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -13,31 +13,44 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- + +%% @doc Main interface module for `emqx_durable_storage' application. +%% +%% It takes care of forwarding calls to the underlying DBMS. Currently +%% only the embedded `emqx_ds_replication_layer' storage is supported, +%% so all the calls are simply passed through. -module(emqx_ds). %% Management API: -export([open_db/2]). %% Message storage API: --export([message_store/1, message_store/2, message_store/3]). +-export([store_batch/1, store_batch/2, store_batch/3]). %% Message replay API: --export([get_streams/3, open_iterator/2, next/2]). +-export([get_streams/3, make_iterator/2, next/2]). -%% internal exports: +%% Misc. API: -export([]). --export_type([db/0, time/0, topic_filter/0, topic/0]). +-export_type([ + db/0, + time/0, + topic_filter/0, + topic/0, + stream/0, + stream_rank/0, + iterator/0, + next_result/1, next_result/0, + store_batch_result/0, + make_iterator_result/1, make_iterator_result/0 +]). %%================================================================================ %% Type declarations %%================================================================================ -%% Different DBs are completely independent from each other. They -%% could represent something like different tenants. -%% -%% Topics stored in different DBs aren't necesserily disjoint. --type db() :: binary(). +-type db() :: emqx_ds_replication_layer:db(). %% Parsed topic. -type topic() :: list(binary()). @@ -45,30 +58,22 @@ %% Parsed topic filter. -type topic_filter() :: list(binary() | '+' | '#' | ''). -%% This record enapsulates the stream entity from the replication -%% level. -%% -%% TODO: currently the stream is hardwired to only support the -%% internal rocksdb storage. In t he future we want to add another -%% implementations for emqx_ds, so this type has to take this into -%% account. --record(stream, - { shard :: emqx_ds_replication_layer:shard() - , enc :: emqx_ds_replication_layer:stream() - }). - -type stream_rank() :: {integer(), integer()}. --opaque stream() :: #stream{}. +-opaque stream() :: emqx_ds_replication_layer:stream(). -%% This record encapsulates the iterator entity from the replication -%% level. --record(iterator, - { shard :: emqx_ds_replication_layer:shard() - , enc :: enqx_ds_replication_layer:iterator() - }). +-opaque iterator() :: emqx_ds_replication_layer:iterator(). --opaque iterator() :: #iterator{}. +-type store_batch_result() :: ok | {error, _}. + +-type make_iterator_result(Iterator) :: {ok, Iterator} | {error, _}. + +-type make_iterator_result() :: make_iterator_result(iterator()). + +-type next_result(Iterator) :: + {ok, Iterator, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}. + +-type next_result() :: next_result(iterator()). %% Timestamp %% Earliest possible timestamp is 0. @@ -78,7 +83,9 @@ -type message_store_opts() :: #{}. --type create_db_opts() :: #{}. +-type create_db_opts() :: + %% TODO: keyspace + #{}. -type message_id() :: emqx_ds_replication_layer:message_id(). @@ -88,24 +95,24 @@ %% API funcions %%================================================================================ +%% @doc Different DBs are completely independent from each other. They +%% could represent something like different tenants. -spec open_db(db(), create_db_opts()) -> ok. open_db(DB, Opts) -> emqx_ds_replication_layer:open_db(DB, Opts). --spec message_store([emqx_types:message()]) -> - {ok, [message_id()]} | {error, _}. -message_store(Msgs) -> - message_store(?DEFAULT_DB, Msgs, #{}). +-spec store_batch([emqx_types:message()]) -> store_batch_result(). +store_batch(Msgs) -> + store_batch(?DEFAULT_DB, Msgs, #{}). --spec message_store(db(), [emqx_types:message()], message_store_opts()) -> - {ok, [message_id()]} | {error, _}. -message_store(DB, Msgs, Opts) -> - emqx_ds_replication_layer:message_store(DB, Msgs, Opts). +-spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result(). +store_batch(DB, Msgs, Opts) -> + emqx_ds_replication_layer:store_batch(DB, Msgs, Opts). %% TODO: Do we really need to return message IDs? It's extra work... --spec message_store(db(), [emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. -message_store(DB, Msgs) -> - message_store(DB, Msgs, #{}). +-spec store_batch(db(), [emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. +store_batch(DB, Msgs) -> + store_batch(DB, Msgs, #{}). %% @doc Get a list of streams needed for replaying a topic filter. %% @@ -113,56 +120,44 @@ message_store(DB, Msgs) -> %% different locations or even in different databases. A wildcard %% topic filter may require pulling data from any number of locations. %% -%% Stream is an abstraction exposed by `emqx_ds' that reflects the -%% notion that different topics can be stored differently, but hides -%% the implementation details. +%% Stream is an abstraction exposed by `emqx_ds' that, on one hand, +%% reflects the notion that different topics can be stored +%% differently, but hides the implementation details. %% %% Rules: %% -%% 1. New streams matching the topic filter can appear without notice, -%% so the replayer must periodically call this function to get the -%% updated list of streams. +%% 0. There is no 1-to-1 mapping between MQTT topics and streams. One +%% stream can contain any number of MQTT topics. +%% +%% 1. New streams matching the topic filter and start time can appear +%% without notice, so the replayer must periodically call this +%% function to get the updated list of streams. %% %% 2. Streams may depend on one another. Therefore, care should be %% taken while replaying them in parallel to avoid out-of-order %% replay. This function returns stream together with its -%% "coordinates": `{X, T, Stream}'. If X coordinate of two streams is -%% different, then they can be replayed in parallel. If it's the -%% same, then the stream with smaller T coordinate should be replayed -%% first. +%% "coordinate": `stream_rank()'. +%% +%% Stream rank is a tuple of two integers, let's call them X and Y. If +%% X coordinate of two streams is different, they are independent and +%% can be replayed in parallel. If it's the same, then the stream with +%% smaller Y coordinate should be replayed first. If Y coordinates are +%% equal, then the streams are independent. +%% +%% Stream is fully consumed when `next/3' function returns +%% `end_of_stream'. Then the client can proceed to replaying streams +%% that depend on the given one. -spec get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}]. get_streams(DB, TopicFilter, StartTime) -> - Shards = emqx_ds_replication_layer:list_shards(DB), - lists:flatmap( - fun(Shard) -> - Streams = emqx_ds_replication_layer:get_streams(Shard, TopicFilter, StartTime), - [{Rank, #stream{ shard = Shard - , enc = I - }} || {Rank, I} <- Streams] - end, - Shards). + emqx_ds_replication_layer:get_streams(DB, TopicFilter, StartTime). --spec open_iterator(stream(), time()) -> {ok, iterator()} | {error, _}. -open_iterator(#stream{shard = Shard, enc = Stream}, StartTime) -> - case emqx_ds_replication_layer:open_iterator(Shard, Stream, StartTime) of - {ok, Iter} -> - {ok, #iterator{shard = Shard, enc = Iter}}; - Err = {error, _} -> - Err - end. +-spec make_iterator(stream(), time()) -> make_iterator_result(). +make_iterator(Stream, StartTime) -> + emqx_ds_replication_layer:make_iterator(Stream, StartTime). --spec next(iterator(), pos_integer()) -> {ok, iterator(), [emqx_types:message()]} | end_of_stream. -next(#iterator{shard = Shard, enc = Iter0}, BatchSize) -> - case emqx_ds_replication_layer:next(Shard, Iter0, BatchSize) of - {ok, Iter, Batch} -> - {ok, #iterator{shard = Shard, enc = Iter}, Batch}; - end_of_stream -> - end_of_stream - end. - -%%================================================================================ -%% behavior callbacks -%%================================================================================ +-spec next(iterator(), pos_integer()) -> next_result(). +next(Iter, BatchSize) -> + emqx_ds_replication_layer:next(Iter, BatchSize). %%================================================================================ %% Internal exports diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index 9d206ee81..fcc9f2b36 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -34,7 +34,8 @@ %% Type declarations %%================================================================================ --define(EOT, []). %% End Of Topic +%% End Of Topic +-define(EOT, []). -define(PLUS, '+'). -type edge() :: binary() | ?EOT | ?PLUS. @@ -49,17 +50,17 @@ -type threshold_fun() :: fun((non_neg_integer()) -> non_neg_integer()). --record(trie, - { trie :: ets:tid() - , stats :: ets:tid() - }). +-record(trie, { + trie :: ets:tid(), + stats :: ets:tid() +}). -opaque trie() :: #trie{}. --record(trans, - { key :: {state(), edge()} - , next :: state() - }). +-record(trans, { + key :: {state(), edge()}, + next :: state() +}). %%================================================================================ %% API funcions @@ -70,9 +71,10 @@ trie_create() -> Trie = ets:new(trie, [{keypos, #trans.key}, set]), Stats = ets:new(stats, [{keypos, 1}, set]), - #trie{ trie = Trie - , stats = Stats - }. + #trie{ + trie = Trie, + stats = Stats + }. %% @doc Create a topic key, -spec topic_key(trie(), threshold_fun(), [binary()]) -> msg_storage_key(). @@ -86,7 +88,7 @@ lookup_topic_key(Trie, Tokens) -> %% @doc Return list of keys of topics that match a given topic filter -spec match_topics(trie(), [binary() | '+' | '#']) -> - [{static_key(), _Varying :: binary() | ?PLUS}]. + [{static_key(), _Varying :: binary() | ?PLUS}]. match_topics(Trie, TopicFilter) -> do_match_topics(Trie, ?PREFIX, [], TopicFilter). @@ -96,38 +98,43 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) -> L = ets:tab2list(Trie), {Nodes0, Edges} = lists:foldl( - fun(#trans{key = {From, Label}, next = To}, {AccN, AccEdge}) -> - Edge = {From, To, Label}, - {[From, To] ++ AccN, [Edge|AccEdge]} - end, - {[], []}, - L), + fun(#trans{key = {From, Label}, next = To}, {AccN, AccEdge}) -> + Edge = {From, To, Label}, + {[From, To] ++ AccN, [Edge | AccEdge]} + end, + {[], []}, + L + ), Nodes = lists:map( - fun(Node) -> - case ets:lookup(Stats, Node) of - [{_, NChildren}] -> ok; - [] -> NChildren = 0 - end, - {Node, NChildren} - end, - lists:usort(Nodes0)), - {ok, FD} = file:open(Filename, [write]), - Print = fun (?PREFIX) -> "prefix"; - (NodeId) -> binary:encode_hex(NodeId) + fun(Node) -> + case ets:lookup(Stats, Node) of + [{_, NChildren}] -> ok; + [] -> NChildren = 0 + end, + {Node, NChildren} end, + lists:usort(Nodes0) + ), + {ok, FD} = file:open(Filename, [write]), + Print = fun + (?PREFIX) -> "prefix"; + (NodeId) -> binary:encode_hex(NodeId) + end, io:format(FD, "digraph {~n", []), lists:foreach( - fun({Node, NChildren}) -> - Id = Print(Node), - io:format(FD, " \"~s\" [label=\"~s : ~p\"];~n", [Id, Id, NChildren]) - end, - Nodes), + fun({Node, NChildren}) -> + Id = Print(Node), + io:format(FD, " \"~s\" [label=\"~s : ~p\"];~n", [Id, Id, NChildren]) + end, + Nodes + ), lists:foreach( - fun({From, To, Label}) -> - io:format(FD, " \"~s\" -> \"~s\" [label=\"~s\"];~n", [Print(From), Print(To), Label]) - end, - Edges), + fun({From, To, Label}) -> + io:format(FD, " \"~s\" -> \"~s\" [label=\"~s\"];~n", [Print(From), Print(To), Label]) + end, + Edges + ), io:format(FD, "}~n", []), file:close(FD). @@ -135,12 +142,12 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) -> %% Internal exports %%================================================================================ --spec trie_next(trie(), state(), binary() | ?EOT) -> {Wildcard, state()} | undefined - when Wildcard :: boolean(). +-spec trie_next(trie(), state(), binary() | ?EOT) -> {Wildcard, state()} | undefined when + Wildcard :: boolean(). trie_next(#trie{trie = Trie}, State, ?EOT) -> case ets:lookup(Trie, {State, ?EOT}) of [#trans{next = Next}] -> {false, Next}; - [] -> undefined + [] -> undefined end; trie_next(#trie{trie = Trie}, State, Token) -> case ets:lookup(Trie, {State, ?PLUS}) of @@ -149,25 +156,27 @@ trie_next(#trie{trie = Trie}, State, Token) -> [] -> case ets:lookup(Trie, {State, Token}) of [#trans{next = Next}] -> {false, Next}; - [] -> undefined + [] -> undefined end end. --spec trie_insert(trie(), state(), edge()) -> {Updated, state()} - when Updated :: false | non_neg_integer(). +-spec trie_insert(trie(), state(), edge()) -> {Updated, state()} when + Updated :: false | non_neg_integer(). trie_insert(#trie{trie = Trie, stats = Stats}, State, Token) -> Key = {State, Token}, NewState = get_id_for_key(State, Token), - Rec = #trans{ key = Key - , next = NewState - }, + Rec = #trans{ + key = Key, + next = NewState + }, case ets:insert_new(Trie, Rec) of true -> - Inc = case Token of - ?EOT -> 0; - ?PLUS -> 0; - _ -> 1 - end, + Inc = + case Token of + ?EOT -> 0; + ?PLUS -> 0; + _ -> 1 + end, NChildren = ets:update_counter(Stats, State, {2, Inc}, {State, 0}), {NChildren, NewState}; false -> @@ -202,69 +211,75 @@ get_id_for_key(_State, _Token) -> do_match_topics(Trie, State, Varying, []) -> case trie_next(Trie, State, ?EOT) of {false, Static} -> [{Static, lists:reverse(Varying)}]; - undefined -> [] + undefined -> [] end; do_match_topics(Trie, State, Varying, ['#']) -> Emanating = emanating(Trie, State, ?PLUS), lists:flatmap( - fun({?EOT, Static}) -> - [{Static, lists:reverse(Varying)}]; - ({?PLUS, NextState}) -> - do_match_topics(Trie, NextState, [?PLUS|Varying], ['#']); - ({_, NextState}) -> - do_match_topics(Trie, NextState, Varying, ['#']) - end, - Emanating); -do_match_topics(Trie, State, Varying, [Level|Rest]) -> + fun + ({?EOT, Static}) -> + [{Static, lists:reverse(Varying)}]; + ({?PLUS, NextState}) -> + do_match_topics(Trie, NextState, [?PLUS | Varying], ['#']); + ({_, NextState}) -> + do_match_topics(Trie, NextState, Varying, ['#']) + end, + Emanating + ); +do_match_topics(Trie, State, Varying, [Level | Rest]) -> Emanating = emanating(Trie, State, Level), lists:flatmap( - fun({?EOT, _NextState}) -> - []; - ({?PLUS, NextState}) -> - do_match_topics(Trie, NextState, [Level|Varying], Rest); - ({_, NextState}) -> - do_match_topics(Trie, NextState, Varying, Rest) - end, - Emanating). + fun + ({?EOT, _NextState}) -> + []; + ({?PLUS, NextState}) -> + do_match_topics(Trie, NextState, [Level | Varying], Rest); + ({_, NextState}) -> + do_match_topics(Trie, NextState, Varying, Rest) + end, + Emanating + ). -spec do_lookup_topic_key(trie(), state(), [binary()], [binary()]) -> - {ok, msg_storage_key()} | undefined. + {ok, msg_storage_key()} | undefined. do_lookup_topic_key(Trie, State, [], Varying) -> - case trie_next(Trie, State, ?EOT) of - {false, Static} -> - {ok, {Static, lists:reverse(Varying)}}; - undefined -> - undefined - end; -do_lookup_topic_key(Trie, State, [Tok|Rest], Varying) -> - case trie_next(Trie, State, Tok) of - {true, NextState} -> - do_lookup_topic_key(Trie, NextState, Rest, [Tok|Varying]); - {false, NextState} -> - do_lookup_topic_key(Trie, NextState, Rest, Varying); - undefined -> - undefined - end. + case trie_next(Trie, State, ?EOT) of + {false, Static} -> + {ok, {Static, lists:reverse(Varying)}}; + undefined -> + undefined + end; +do_lookup_topic_key(Trie, State, [Tok | Rest], Varying) -> + case trie_next(Trie, State, Tok) of + {true, NextState} -> + do_lookup_topic_key(Trie, NextState, Rest, [Tok | Varying]); + {false, NextState} -> + do_lookup_topic_key(Trie, NextState, Rest, Varying); + undefined -> + undefined + end. do_topic_key(Trie, _, _, State, [], Varying) -> {_, false, Static} = trie_next_(Trie, State, ?EOT), {Static, lists:reverse(Varying)}; -do_topic_key(Trie, ThresholdFun, Depth, State, [Tok|Rest], Varying0) -> - Threshold = ThresholdFun(Depth), % TODO: it's not necessary to call it every time. - Varying = case trie_next_(Trie, State, Tok) of - {NChildren, _, _DiscardState} when is_integer(NChildren), NChildren > Threshold -> - {_, NextState} = trie_insert(Trie, State, ?PLUS), - [Tok|Varying0]; - {_, false, NextState} -> - Varying0; - {_, true, NextState} -> - [Tok|Varying0] - end, +do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) -> + % TODO: it's not necessary to call it every time. + Threshold = ThresholdFun(Depth), + Varying = + case trie_next_(Trie, State, Tok) of + {NChildren, _, _DiscardState} when is_integer(NChildren), NChildren > Threshold -> + {_, NextState} = trie_insert(Trie, State, ?PLUS), + [Tok | Varying0]; + {_, false, NextState} -> + Varying0; + {_, true, NextState} -> + [Tok | Varying0] + end, do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, Varying). --spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} - when New :: false | non_neg_integer(), - Wildcard :: boolean(). +-spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when + New :: false | non_neg_integer(), + Wildcard :: boolean(). trie_next_(Trie, State, Token) -> case trie_next(Trie, State, Token) of {Wildcard, NextState} -> @@ -278,19 +293,26 @@ trie_next_(Trie, State, Token) -> %% erlfmt-ignore -spec emanating(trie(), state(), edge()) -> [{edge(), state()}]. emanating(#trie{trie = Tab}, State, ?PLUS) -> - ets:select(Tab, ets:fun2ms( - fun(#trans{key = {S, Edge}, next = Next}) when S == State -> - {Edge, Next} - end)); + ets:select( + Tab, + ets:fun2ms( + fun(#trans{key = {S, Edge}, next = Next}) when S == State -> + {Edge, Next} + end + ) + ); emanating(#trie{trie = Tab}, State, ?EOT) -> case ets:lookup(Tab, {State, ?EOT}) of [#trans{next = Next}] -> [{?EOT, Next}]; - [] -> [] + [] -> [] end; emanating(#trie{trie = Tab}, State, Bin) when is_binary(Bin) -> - [{Edge, Next} || #trans{key = {_, Edge}, next = Next} <- - ets:lookup(Tab, {State, ?PLUS}) ++ - ets:lookup(Tab, {State, Bin})]. + [ + {Edge, Next} + || #trans{key = {_, Edge}, next = Next} <- + ets:lookup(Tab, {State, ?PLUS}) ++ + ets:lookup(Tab, {State, Bin}) + ]. %%================================================================================ %% Tests @@ -325,56 +347,71 @@ lookup_key_test() -> {_, S1} = trie_insert(T, ?PREFIX, <<"foo">>), {_, S11} = trie_insert(T, S1, <<"foo">>), %% Topics don't match until we insert ?EOT: - ?assertMatch( undefined - , lookup_topic_key(T, [<<"foo">>]) - ), - ?assertMatch( undefined - , lookup_topic_key(T, [<<"foo">>, <<"foo">>]) - ), + ?assertMatch( + undefined, + lookup_topic_key(T, [<<"foo">>]) + ), + ?assertMatch( + undefined, + lookup_topic_key(T, [<<"foo">>, <<"foo">>]) + ), {_, S10} = trie_insert(T, S1, ?EOT), {_, S110} = trie_insert(T, S11, ?EOT), - ?assertMatch( {ok, {S10, []}} - , lookup_topic_key(T, [<<"foo">>]) - ), - ?assertMatch( {ok, {S110, []}} - , lookup_topic_key(T, [<<"foo">>, <<"foo">>]) - ), + ?assertMatch( + {ok, {S10, []}}, + lookup_topic_key(T, [<<"foo">>]) + ), + ?assertMatch( + {ok, {S110, []}}, + lookup_topic_key(T, [<<"foo">>, <<"foo">>]) + ), %% The rest of keys still don't match: - ?assertMatch( undefined - , lookup_topic_key(T, [<<"bar">>]) - ), - ?assertMatch( undefined - , lookup_topic_key(T, [<<"bar">>, <<"foo">>]) - ). + ?assertMatch( + undefined, + lookup_topic_key(T, [<<"bar">>]) + ), + ?assertMatch( + undefined, + lookup_topic_key(T, [<<"bar">>, <<"foo">>]) + ). wildcard_lookup_test() -> T = trie_create(), {1, S1} = trie_insert(T, ?PREFIX, <<"foo">>), - {0, S11} = trie_insert(T, S1, ?PLUS), %% Plus doesn't increase the number of children + %% Plus doesn't increase the number of children + {0, S11} = trie_insert(T, S1, ?PLUS), {1, S111} = trie_insert(T, S11, <<"foo">>), - {0, S1110} = trie_insert(T, S111, ?EOT), %% ?EOT doesn't increase the number of children - ?assertMatch( {ok, {S1110, [<<"bar">>]}} - , lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"foo">>]) - ), - ?assertMatch( {ok, {S1110, [<<"quux">>]}} - , lookup_topic_key(T, [<<"foo">>, <<"quux">>, <<"foo">>]) - ), - ?assertMatch( undefined - , lookup_topic_key(T, [<<"foo">>]) - ), - ?assertMatch( undefined - , lookup_topic_key(T, [<<"foo">>, <<"bar">>]) - ), - ?assertMatch( undefined - , lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"bar">>]) - ), - ?assertMatch( undefined - , lookup_topic_key(T, [<<"bar">>, <<"foo">>, <<"foo">>]) - ), + %% ?EOT doesn't increase the number of children + {0, S1110} = trie_insert(T, S111, ?EOT), + ?assertMatch( + {ok, {S1110, [<<"bar">>]}}, + lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"foo">>]) + ), + ?assertMatch( + {ok, {S1110, [<<"quux">>]}}, + lookup_topic_key(T, [<<"foo">>, <<"quux">>, <<"foo">>]) + ), + ?assertMatch( + undefined, + lookup_topic_key(T, [<<"foo">>]) + ), + ?assertMatch( + undefined, + lookup_topic_key(T, [<<"foo">>, <<"bar">>]) + ), + ?assertMatch( + undefined, + lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"bar">>]) + ), + ?assertMatch( + undefined, + lookup_topic_key(T, [<<"bar">>, <<"foo">>, <<"foo">>]) + ), {_, S10} = trie_insert(T, S1, ?EOT), - ?assertMatch( {ok, {S10, []}} - , lookup_topic_key(T, [<<"foo">>]) - ). + ?assertMatch( + {ok, {S10, []}}, + lookup_topic_key(T, [<<"foo">>]) + ). %% erlfmt-ignore topic_key_test() -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 846d2ca0c..5d4749c30 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -16,33 +16,52 @@ -module(emqx_ds_replication_layer). -export([ - list_shards/1, - open_db/2, - message_store/3, - get_streams/3, - open_iterator/3, - next/3 - ]). - + list_shards/1, + open_db/2, + store_batch/3, + get_streams/3, + make_iterator/2, + next/2 +]). %% internal exports: --export([ do_open_shard_v1/2, - do_get_streams_v1/3, - do_open_iterator_v1/3, - do_next_v1/3 - ]). +-export([ + do_open_shard_v1/2, + do_get_streams_v1/3, + do_make_iterator_v1/3, + do_next_v1/3 +]). --export_type([shard/0, stream/0, iterator/0, message_id/0]). +-export_type([shard_id/0, stream/0, iterator/0, message_id/0]). %%================================================================================ %% Type declarations %%================================================================================ --opaque stream() :: emqx_ds_storage_layer:stream(). +-type db() :: binary(). --type shard() :: binary(). +-type shard_id() :: binary(). --opaque iterator() :: emqx_ds_storage_layer:iterator(). +%% This record enapsulates the stream entity from the replication +%% level. +%% +%% TODO: currently the stream is hardwired to only support the +%% internal rocksdb storage. In t he future we want to add another +%% implementations for emqx_ds, so this type has to take this into +%% account. +-record(stream, { + shard :: emqx_ds_replication_layer:shard_id(), + enc :: emqx_ds_replication_layer:stream() +}). + +-opaque stream() :: stream(). + +-record(iterator, { + shard :: emqx_ds_replication_layer:shard_id(), + enc :: enqx_ds_replication_layer:iterator() +}). + +-opaque iterator() :: #iterator{}. -type message_id() :: emqx_ds_storage_layer:message_id(). @@ -50,44 +69,71 @@ %% API functions %%================================================================================ --spec list_shards(emqx_ds:db()) -> [shard()]. +-spec list_shards(emqx_ds:db()) -> [shard_id()]. list_shards(DB) -> %% TODO: milestone 5 lists:map( - fun(Node) -> - shard_id(DB, Node) - end, - list_nodes()). + fun(Node) -> + shard_id(DB, Node) + end, + list_nodes() + ). --spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok. +-spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok | {error, _}. open_db(DB, Opts) -> + %% TODO: improve error reporting, don't just crash lists:foreach( - fun(Node) -> - Shard = shard_id(DB, Node), - emqx_ds_proto_v1:open_shard(Node, Shard, Opts) - end, - list_nodes()). + fun(Node) -> + Shard = shard_id(DB, Node), + ok = emqx_ds_proto_v1:open_shard(Node, Shard, Opts) + end, + list_nodes() + ). --spec message_store(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> - {ok, [message_id()]} | {error, _}. -message_store(DB, Msg, Opts) -> - %% TODO: milestone 5. Currently we store messages locally. - Shard = term_to_binary({DB, node()}), - emqx_ds_storage_layer:message_store(Shard, Msg, Opts). +-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> + emqx_ds:store_batch_result(). +store_batch(DB, Msg, Opts) -> + %% TODO: Currently we store messages locally. + Shard = shard_id(DB, node()), + emqx_ds_storage_layer:store_batch(Shard, Msg, Opts). --spec get_streams(shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. -get_streams(Shard, TopicFilter, StartTime) -> +-spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) -> + [{emqx_ds:stream_rank(), stream()}]. +get_streams(DB, TopicFilter, StartTime) -> + Shards = emqx_ds_replication_layer:list_shards(DB), + lists:flatmap( + fun(Shard) -> + Node = node_of_shard(Shard), + Streams = emqx_ds_proto_v1:get_streams(Node, Shard, TopicFilter, StartTime), + lists:map( + fun({RankY, Stream}) -> + RankX = erlang:phash2(Shard, 255), + Rank = {RankX, RankY}, + {Rank, #stream{ + shard = Shard, + enc = Stream + }} + end, + Streams + ) + end, + Shards + ). + +-spec make_iterator(stream(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()). +make_iterator(Stream, StartTime) -> + #stream{shard = Shard, enc = StorageStream} = Stream, Node = node_of_shard(Shard), - emqx_ds_proto_v1:get_streams(Node, Shard, TopicFilter, StartTime). + case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, StartTime) of + {ok, Iter} -> + {ok, #iterator{shard = Shard, enc = Iter}}; + Err = {error, _} -> + Err + end. --spec open_iterator(shard(), stream(), emqx_ds:time()) -> {ok, iterator()} | {error, _}. -open_iterator(Shard, Stream, StartTime) -> - Node = node_of_shard(Shard), - emqx_ds_proto_v1:open_iterator(Node, Shard, Stream, StartTime). - --spec next(shard(), iterator(), pos_integer()) -> - {ok, iterator(), [emqx_types:message()]} | end_of_stream. -next(Shard, Iter, BatchSize) -> +-spec next(iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). +next(Iter0, BatchSize) -> + #iterator{shard = Shard, enc = StorageIter0} = Iter0, Node = node_of_shard(Shard), %% TODO: iterator can contain information that is useful for %% reconstructing messages sent over the network. For example, @@ -97,7 +143,13 @@ next(Shard, Iter, BatchSize) -> %% %% This kind of trickery should be probably done here in the %% replication layer. Or, perhaps, in the logic lary. - emqx_ds_proto_v1:next(Node, Shard, Iter, BatchSize). + case emqx_ds_proto_v1:next(Node, Shard, StorageIter0, BatchSize) of + {ok, StorageIter, Batch} -> + Iter = #iterator{shard = Shard, enc = StorageIter}, + {ok, Iter, Batch}; + Other -> + Other + end. %%================================================================================ %% behavior callbacks @@ -107,35 +159,38 @@ next(Shard, Iter, BatchSize) -> %% Internal exports (RPC targets) %%================================================================================ --spec do_open_shard_v1(shard(), emqx_ds:create_db_opts()) -> ok. +-spec do_open_shard_v1(shard_id(), emqx_ds:create_db_opts()) -> ok. do_open_shard_v1(Shard, Opts) -> - emqx_ds_storage_layer_sup:ensure_shard(Shard, Opts). + emqx_ds_storage_layer:open_shard(Shard, Opts). --spec do_get_streams_v1(shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> - [{emqx_ds:stream_rank(), stream()}]. +-spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> + [{integer(), _Stream}]. do_get_streams_v1(Shard, TopicFilter, StartTime) -> - error({todo, Shard, TopicFilter, StartTime}). + emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime). --spec do_open_iterator_v1(shard(), stream(), emqx_ds:time()) -> iterator(). -do_open_iterator_v1(Shard, Stream, StartTime) -> - error({todo, Shard, Stream, StartTime}). +-spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:time()) -> {ok, iterator()} | {error, _}. +do_make_iterator_v1(Shard, Stream, StartTime) -> + emqx_ds_storage_layer:make_iterator(Shard, Stream, StartTime). --spec do_next_v1(shard(), iterator(), non_neg_integer()) -> - {ok, iterator(), [emqx_types:message()]} | end_of_stream. +-spec do_next_v1(shard_id(), Iter, pos_integer()) -> emqx_ds:next_result(Iter). do_next_v1(Shard, Iter, BatchSize) -> - error({todo, Shard, Iter, BatchSize}). + emqx_ds_storage_layer:next(Shard, Iter, BatchSize). %%================================================================================ %% Internal functions %%================================================================================ +add_shard_to_rank(Shard, RankY) -> + RankX = erlang:phash2(Shard, 255), + {RankX, RankY}. + shard_id(DB, Node) -> %% TODO: don't bake node name into the schema, don't repeat the %% Mnesia's 1M$ mistake. NodeBin = atom_to_binary(Node), - <>. + <>. --spec node_of_shard(shard()) -> node(). +-spec node_of_shard(shard_id()) -> node(). node_of_shard(ShardId) -> [_DB, NodeBin] = binary:split(ShardId, <<":">>), binary_to_atom(NodeBin). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 93c1aaa1f..fdd81a095 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -1,332 +1,240 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_ds_storage_layer). -behaviour(gen_server). -%% API: --export([start_link/2]). --export([create_generation/3]). +%% Replication layer API: +-export([open_shard/2, store_batch/3, get_streams/3, make_iterator/3, next/3]). --export([get_streams/3]). --export([message_store/3]). --export([delete/4]). +%% gen_server +-export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). --export([make_iterator/2, next/1, next/2]). +%% internal exports: +-export([]). --export([ - preserve_iterator/2, - restore_iterator/2, - discard_iterator/2, - ensure_iterator/3, - discard_iterator_prefix/2, - list_iterator_prefix/2, - foldl_iterator_prefix/4 -]). - -%% behaviour callbacks: --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). - --export_type([stream/0, cf_refs/0, gen_id/0, options/0, state/0, iterator/0]). --export_type([db_options/0, db_write_options/0, db_read_options/0]). - --compile({inline, [meta_lookup/2]}). - --include_lib("emqx/include/emqx.hrl"). +-export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]). %%================================================================================ %% Type declarations %%================================================================================ --type stream() :: term(). %% Opaque term returned by the generation callback module - --type options() :: #{ - dir => file:filename() -}. - -%% see rocksdb:db_options() --type db_options() :: proplists:proplist(). -%% see rocksdb:write_options() --type db_write_options() :: proplists:proplist(). -%% see rocksdb:read_options() --type db_read_options() :: proplists:proplist(). +-type shard_id() :: emqx_ds_replication_layer:shard_id(). -type cf_refs() :: [{string(), rocksdb:cf_handle()}]. -%% Message storage generation -%% Keep in mind that instances of this type are persisted in long-term storage. --type generation() :: #{ - %% Module that handles data for the generation +-type gen_id() :: 0..16#ffff. + +%% Note: this record might be stored permanently on a remote node. +-record(stream, { + generation :: gen_id(), + enc :: _EncapsultatedData, + misc = #{} :: map() +}). + +-opaque stream() :: #stream{}. + +%% Note: this record might be stored permanently on a remote node. +-record(it, { + generation :: gen_id(), + enc :: _EncapsultatedData, + misc = #{} :: map() +}). + +-opaque iterator() :: #it{}. + +%%%% Generation: + +-type generation(Data) :: #{ + %% Module that handles data for the generation: module := module(), - %% Module-specific data defined at generation creation time - data := term(), + %% Module-specific data defined at generation creation time: + data := Data, %% When should this generation become active? %% This generation should only contain messages timestamped no earlier than that. %% The very first generation will have `since` equal 0. - since := emqx_ds:time() + since := emqx_ds:time(), + until := emqx_ds:time() | undefined }. --record(s, { - shard :: emqx_ds:shard(), - keyspace :: emqx_ds_conf:keyspace(), - db :: rocksdb:db_handle(), - cf_iterator :: rocksdb:cf_handle(), - cf_generations :: cf_refs() -}). +%% Schema for a generation. Persistent term. +-type generation_schema() :: generation(term()). --record(it, { - shard :: emqx_ds:shard(), - gen :: gen_id(), - replay :: emqx_ds:replay(), - module :: module(), - data :: term() -}). +%% Runtime view of generation: +-type generation() :: generation(term()). --type gen_id() :: 0..16#ffff. +%%%% Shard: --opaque state() :: #s{}. --opaque iterator() :: #it{}. +-type shard(GenData) :: #{ + current_generation := gen_id(), + default_generation_module := module(), + default_generation_config := term(), + {generation, gen_id()} => GenData +}. -%% Contents of the default column family: -%% -%% [{<<"genNN">>, #generation{}}, ..., -%% {<<"current">>, GenID}] +%% Shard schema (persistent): +-type shard_schema() :: shard(generation_schema()). --define(DEFAULT_CF, "default"). --define(DEFAULT_CF_OPTS, []). +%% Shard (runtime): +-type shard() :: shard(generation()). --define(ITERATOR_CF, "$iterators"). +%%================================================================================ +%% Generation callbacks +%%================================================================================ -%% TODO -%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`. -%% 2. Supposedly might be compressed _very_ effectively. -%% 3. `inplace_update_support`? --define(ITERATOR_CF_OPTS, []). +%% Create the new schema given generation id and the options. +%% Create rocksdb column families. +-callback create(shard_id(), rocksdb:db_handle(), gen_id(), _Options) -> + {_Schema, cf_refs()}. + +%% Open the existing schema +-callback open(shard_id(), rocsdb:db_handle(), gen_id(), cf_refs(), _Schema) -> + _Data. + +-callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) -> + ok. + +-callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> + [_Stream]. + +-callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:time()) -> + emqx_ds:make_iterator_result(_Iterator). + +-callback next(shard_id(), _Data, Iter, pos_integer()) -> + {ok, Iter, [emqx_types:message()]} | {error, _}. + +%%================================================================================ +%% API for the replication layer +%%================================================================================ + +-spec open_shard(shard_id(), emqx_ds:create_db_opts()) -> ok. +open_shard(Shard, Options) -> + emqx_ds_storage_layer_sup:ensure_shard(Shard, Options). + +-spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) -> + emqx_ds:store_batch_result(). +store_batch(Shard, Messages, Options) -> + %% We always store messages in the current generation: + GenId = generation_current(Shard), + #{module := Mod, data := GenData} = generation_get(Shard, GenId), + Mod:store_batch(Shard, GenData, Messages, Options). + +-spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> + [{integer(), stream()}]. +get_streams(Shard, TopicFilter, StartTime) -> + Gens = generations_since(Shard, StartTime), + lists:flatmap( + fun(GenId) -> + #{module := Mod, data := GenData} = generation_get(Shard, GenId), + Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime), + [ + {GenId, #stream{ + generation = GenId, + enc = Stream + }} + || Stream <- Streams + ] + end, + Gens + ). + +-spec make_iterator(shard_id(), stream(), emqx_ds:time()) -> + emqx_ds:make_iterator_result(iterator()). +make_iterator(Shard, #stream{generation = GenId, enc = Stream}, StartTime) -> + #{module := Mod, data := GenData} = generation_get(Shard, GenId), + case Mod:make_iterator(Shard, GenData, Stream, StartTime) of + {ok, Iter} -> + {ok, #it{ + generation = GenId, + enc = Iter + }}; + {error, _} = Err -> + Err + end. + +-spec next(shard_id(), iterator(), pos_integer()) -> + emqx_ds:next_result(iterator()). +next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) -> + #{module := Mod, data := GenData} = generation_get(Shard, GenId), + Current = generation_current(Shard), + case Mod:next(Shard, GenData, GenIter0, BatchSize) of + {ok, _GenIter, []} when GenId < Current -> + %% This is a past generation. Storage layer won't write + %% any more messages here. The iterator reached the end: + %% the stream has been fully replayed. + {ok, end_of_stream}; + {ok, GenIter, Batch} -> + {ok, Iter#it{enc = GenIter}, Batch}; + Error = {error, _} -> + Error + end. + +%%================================================================================ +%% gen_server for the shard +%%================================================================================ -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). -%%================================================================================ -%% Callbacks -%%================================================================================ - --callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) -> - {_Schema, cf_refs()}. - --callback open( - emqx_ds:shard(), - rocksdb:db_handle(), - gen_id(), - cf_refs(), - _Schema -) -> - _DB. - --callback store( - _DB, - _MessageID :: binary(), - emqx_ds:time(), - emqx_ds:topic(), - _Payload :: binary() -) -> - ok | {error, _}. - --callback delete(_DB, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) -> - ok | {error, _}. - --callback get_streams(_DB, emqx_ds:topic_filter(), emqx_ds:time()) -> - [_Stream]. - --callback make_iterator(_DB, emqx_ds:replay()) -> - {ok, _It} | {error, _}. - --callback restore_iterator(_DB, _Serialized :: binary()) -> {ok, _It} | {error, _}. - --callback preserve_iterator(_It) -> term(). - --callback next(It) -> {value, binary(), It} | none | {error, closed}. - -%%================================================================================ -%% API funcions -%%================================================================================ - --spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> +-spec start_link(emqx_ds:shard_id(), emqx_ds:create_db_opts()) -> {ok, pid()}. start_link(Shard, Options) -> gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). --spec get_streams(emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [_Stream]. -get_streams(_ShardId, _TopicFilter, _StartTime) -> - []. +-record(s, { + shard_id :: emqx_ds:shard_id(), + db :: rocksdb:db_handle(), + cf_refs :: cf_refs(), + schema :: shard_schema(), + shard :: shard() +}). +-type server_state() :: #s{}. --spec create_generation( - emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config() -) -> - {ok, gen_id()} | {error, nonmonotonic}. -create_generation(ShardId, Since, Config = {_Module, _Options}) -> - gen_server:call(?REF(ShardId), {create_generation, Since, Config}). +-define(DEFAULT_CF, "default"). +-define(DEFAULT_CF_OPTS, []). --spec message_store(emqx_ds:shard(), [emqx_types:message()], emqx_ds:message_store_opts()) -> - {ok, _MessageId} | {error, _}. -message_store(Shard, Msgs, _Opts) -> - {ok, lists:map( - fun(Msg) -> - GUID = emqx_message:id(Msg), - Timestamp = Msg#message.timestamp, - {_GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, Timestamp), - Topic = emqx_topic:words(emqx_message:topic(Msg)), - Payload = serialize(Msg), - Mod:store(ModState, GUID, Timestamp, Topic, Payload), - GUID - end, - Msgs)}. - --spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) -> - ok | {error, _}. -delete(Shard, GUID, Time, Topic) -> - {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), - Mod:delete(Data, GUID, Time, Topic). - --spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) -> - {ok, iterator()} | {error, _TODO}. -make_iterator(Shard, Replay = {_, StartTime}) -> - {GenId, Gen} = meta_lookup_gen(Shard, StartTime), - open_iterator(Gen, #it{ - shard = Shard, - gen = GenId, - replay = Replay - }). - --spec next(iterator()) -> {ok, iterator(), [binary()]} | end_of_stream. -next(It = #it{}) -> - next(It, _BatchSize = 1). - --spec next(iterator(), pos_integer()) -> {ok, iterator(), [binary()]} | end_of_stream. -next(#it{data = {?MODULE, end_of_stream}}, _BatchSize) -> - end_of_stream; -next( - It = #it{shard = Shard, module = Mod, gen = Gen, data = {?MODULE, retry, Serialized}}, BatchSize -) -> - #{data := DBData} = meta_get_gen(Shard, Gen), - {ok, ItData} = Mod:restore_iterator(DBData, Serialized), - next(It#it{data = ItData}, BatchSize); -next(It = #it{}, BatchSize) -> - do_next(It, BatchSize, _Acc = []). - --spec do_next(iterator(), non_neg_integer(), [binary()]) -> - {ok, iterator(), [binary()]} | end_of_stream. -do_next(It, N, Acc) when N =< 0 -> - {ok, It, lists:reverse(Acc)}; -do_next(It = #it{module = Mod, data = ItData}, N, Acc) -> - case Mod:next(ItData) of - {value, Bin, ItDataNext} -> - Val = deserialize(Bin), - do_next(It#it{data = ItDataNext}, N - 1, [Val | Acc]); - {error, _} = _Error -> - %% todo: log? - %% iterator might be invalid now; will need to re-open it. - Serialized = Mod:preserve_iterator(ItData), - {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)}; - none -> - case open_next_iterator(It) of - {ok, ItNext} -> - do_next(ItNext, N, Acc); - {error, _} = _Error -> - %% todo: log? - %% fixme: only bad options may lead to this? - %% return an "empty" iterator to be re-opened when retrying? - Serialized = Mod:preserve_iterator(ItData), - {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)}; - none -> - case Acc of - [] -> - end_of_stream; - _ -> - {ok, It#it{data = {?MODULE, end_of_stream}}, lists:reverse(Acc)} - end - end - end. - --spec preserve_iterator(iterator(), emqx_ds:iterator_id()) -> - ok | {error, _TODO}. -preserve_iterator(It = #it{}, IteratorID) -> - iterator_put_state(IteratorID, It). - --spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> - {ok, iterator()} | {error, _TODO}. -restore_iterator(Shard, ReplayID) -> - case iterator_get_state(Shard, ReplayID) of - {ok, Serial} -> - restore_iterator_state(Shard, Serial); - not_found -> - {error, not_found}; - {error, _Reason} = Error -> - Error - end. - --spec ensure_iterator(emqx_ds:shard(), emqx_ds:iterator_id(), emqx_ds:replay()) -> - {ok, iterator()} | {error, _TODO}. -ensure_iterator(Shard, IteratorID, Replay = {_TopicFilter, _StartMS}) -> - case restore_iterator(Shard, IteratorID) of - {ok, It} -> - {ok, It}; - {error, not_found} -> - {ok, It} = make_iterator(Shard, Replay), - ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), - {ok, It}; - Error -> - Error - end. - --spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> - ok | {error, _TODO}. -discard_iterator(Shard, ReplayID) -> - iterator_delete(Shard, ReplayID). - --spec discard_iterator_prefix(emqx_ds:shard(), binary()) -> - ok | {error, _TODO}. -discard_iterator_prefix(Shard, KeyPrefix) -> - case do_discard_iterator_prefix(Shard, KeyPrefix) of - {ok, _} -> ok; - Error -> Error - end. - --spec list_iterator_prefix( - emqx_ds:shard(), - binary() -) -> {ok, [emqx_ds:iterator_id()]} | {error, _TODO}. -list_iterator_prefix(Shard, KeyPrefix) -> - do_list_iterator_prefix(Shard, KeyPrefix). - --spec foldl_iterator_prefix( - emqx_ds:shard(), - binary(), - fun((_Key :: binary(), _Value :: binary(), Acc) -> Acc), - Acc -) -> {ok, Acc} | {error, _TODO} when - Acc :: term(). -foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> - do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc). - -%%================================================================================ -%% behaviour callbacks -%%================================================================================ - -init({Shard, Options}) -> +init({ShardId, Options}) -> process_flag(trap_exit, true), - {ok, S0} = open_db(Shard, Options), - S = ensure_current_generation(S0), - ok = populate_metadata(S), + erase_schema_runtime(ShardId), + {ok, DB, CFRefs0} = rocksdb_open(ShardId, Options), + {Schema, CFRefs} = + case get_schema_persistent(DB) of + not_found -> + create_new_shard_schema(ShardId, DB, CFRefs0, Options); + Scm -> + {Scm, CFRefs0} + end, + Shard = open_shard(ShardId, DB, CFRefs, Schema), + S = #s{ + shard_id = ShardId, + db = DB, + cf_refs = CFRefs, + schema = Schema, + shard = Shard + }, + commit_metadata(S), {ok, S}. -handle_call({create_generation, Since, Config}, _From, S) -> - case create_new_gen(Since, Config, S) of - {ok, GenId, NS} -> - {reply, {ok, GenId}, NS}; - {error, _} = Error -> - {reply, Error, S} - end; +%% handle_call({create_generation, Since, Config}, _From, S) -> +%% case create_new_gen(Since, Config, S) of +%% {ok, GenId, NS} -> +%% {reply, {ok, GenId}, NS}; +%% {error, _} = Error -> +%% {reply, Error, S} +%% end; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -336,359 +244,156 @@ handle_cast(_Cast, S) -> handle_info(_Info, S) -> {noreply, S}. -terminate(_Reason, #s{db = DB, shard = Shard}) -> - meta_erase(Shard), +terminate(_Reason, #s{db = DB, shard_id = ShardId}) -> + erase_schema_runtime(ShardId), ok = rocksdb:close(DB). +%%================================================================================ +%% Internal exports +%%================================================================================ + %%================================================================================ %% Internal functions %%================================================================================ --record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}). +-spec open_shard(shard_id(), rocksdb:db_handle(), cf_refs(), shard_schema()) -> + shard(). +open_shard(ShardId, DB, CFRefs, ShardSchema) -> + %% Transform generation schemas to generation runtime data: + maps:map( + fun + ({generation, GenId}, GenSchema) -> + open_generation(ShardId, DB, CFRefs, GenId, GenSchema); + (_K, Val) -> + Val + end, + ShardSchema + ). --spec populate_metadata(state()) -> ok. -populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) -> - ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}), - Current = schema_get_current(DBHandle), - lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)). +-spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) -> + generation(). +open_generation(ShardId, DB, CFRefs, GenId, GenSchema) -> + #{module := Mod, data := Schema} = GenSchema, + RuntimeData = Mod:open(ShardId, DB, GenId, CFRefs, Schema), + GenSchema#{data => RuntimeData}. --spec populate_metadata(gen_id(), state()) -> ok. -populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) -> - Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S), - meta_register_gen(Shard, GenId, Gen). - --spec ensure_current_generation(state()) -> state(). -ensure_current_generation(S = #s{shard = _Shard, keyspace = Keyspace, db = DBHandle}) -> - case schema_get_current(DBHandle) of - undefined -> - Config = emqx_ds_conf:keyspace_config(Keyspace), - {ok, _, NS} = create_new_gen(0, Config, S), - NS; - _GenId -> - S - end. - --spec create_new_gen(emqx_ds:time(), emqx_ds_conf:backend_config(), state()) -> - {ok, gen_id(), state()} | {error, nonmonotonic}. -create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) -> - GenId = get_next_id(meta_get_current(Shard)), - GenId = get_next_id(schema_get_current(DBHandle)), - case is_gen_valid(Shard, GenId, Since) of - ok -> - {ok, Gen, NS} = create_gen(GenId, Since, Config, S), - %% TODO: Transaction? Column family creation can't be transactional, anyway. - ok = schema_put_gen(DBHandle, GenId, Gen), - ok = schema_put_current(DBHandle, GenId), - ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)), - {ok, GenId, NS}; - {error, _} = Error -> - Error - end. - --spec create_gen(gen_id(), emqx_ds:time(), emqx_ds_conf:backend_config(), state()) -> - {ok, generation(), state()}. -create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) -> - % TODO: Backend implementation should ensure idempotency. - {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options), - Gen = #{ - module => Module, - data => Schema, - since => Since +-spec create_new_shard_schema(shard_id(), rocksdb:db_handle(), cf_refs(), _Options) -> + {shard_schema(), cf_refs()}. +create_new_shard_schema(ShardId, DB, CFRefs, _Options) -> + GenId = 1, + %% TODO: read from options/config + Mod = emqx_ds_storage_reference, + ModConfig = #{}, + {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConfig), + GenSchema = #{module => Mod, data => GenData, since => 0, until => undefined}, + ShardSchema = #{ + current_generation => GenId, + default_generation_module => Mod, + default_generation_confg => ModConfig, + {generation, GenId} => GenSchema }, - {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. + {ShardSchema, NewCFRefs ++ CFRefs}. --spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. -open_db(Shard, Options) -> +%% @doc Commit current state of the server to both rocksdb and the persistent term +-spec commit_metadata(server_state()) -> ok. +commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB}) -> + ok = put_schema_persistent(DB, Schema), + put_schema_runtime(ShardId, Runtime). + +-spec rocksdb_open(shard_id(), emqx_ds:create_db_opts()) -> + {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}. +rocksdb_open(Shard, Options) -> DefaultDir = binary_to_list(Shard), DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)), - %% TODO: properly forward keyspace - Keyspace = maps:get(keyspace, Options, default_keyspace), DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true} - | emqx_ds_conf:db_options(Keyspace) + | maps:get(db_options, Options, []) ], _ = filelib:ensure_dir(DBDir), ExistingCFs = case rocksdb:list_column_families(DBDir, DBOptions) of {ok, CFs} -> - [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF]; + [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF]; % DB is not present. First start {error, {db_open, _}} -> [] end, ColumnFamilies = [ - {?DEFAULT_CF, ?DEFAULT_CF_OPTS}, - {?ITERATOR_CF, ?ITERATOR_CF_OPTS} + {?DEFAULT_CF, ?DEFAULT_CF_OPTS} | ExistingCFs ], case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of - {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} -> + {ok, DBHandle, [_CFDefault | CFRefs]} -> {CFNames, _} = lists:unzip(ExistingCFs), - {ok, #s{ - shard = Shard, - keyspace = Keyspace, - db = DBHandle, - cf_iterator = CFIterator, - cf_generations = lists:zip(CFNames, CFRefs) - }}; + {ok, DBHandle, lists:zip(CFNames, CFRefs)}; Error -> Error end. --spec open_gen(gen_id(), generation(), state()) -> generation(). -open_gen( - GenId, - Gen = #{module := Mod, data := Data}, - #s{shard = Shard, db = DBHandle, cf_generations = CFs} -) -> - DB = Mod:open(Shard, DBHandle, GenId, CFs, Data), - Gen#{data := DB}. +%%-------------------------------------------------------------------------------- +%% Schema access +%%-------------------------------------------------------------------------------- --spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. -open_next_iterator(It = #it{shard = Shard, gen = GenId}) -> - open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}). +-spec generation_current(shard_id()) -> gen_id(). +generation_current(Shard) -> + #{current_generation := Current} = get_schema_runtime(Shard), + Current. -open_next_iterator(undefined, _It) -> - none; -open_next_iterator(Gen = #{}, It) -> - open_iterator(Gen, It). +-spec generation_get(shard_id(), gen_id()) -> generation(). +generation_get(Shard, GenId) -> + #{{generation, GenId} := GenData} = get_schema_runtime(Shard), + GenData. --spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}. -open_iterator(#{module := Mod, data := Data}, It = #it{}) -> - Options = #{}, % TODO: passthrough options - case Mod:make_iterator(Data, It#it.replay, Options) of - {ok, ItData} -> - {ok, It#it{module = Mod, data = ItData}}; - Err -> - Err - end. - --spec open_restore_iterator(generation(), iterator(), binary()) -> - {ok, iterator()} | {error, _Reason}. -open_restore_iterator(#{module := Mod, data := Data}, It = #it{}, Serial) -> - case Mod:restore_iterator(Data, Serial) of - {ok, ItData} -> - {ok, It#it{module = Mod, data = ItData}}; - Err -> - Err - end. - -%% - --define(KEY_REPLAY_STATE(IteratorId), <<(IteratorId)/binary, "rs">>). --define(KEY_REPLAY_STATE_PAT(KeyReplayState), begin - <> = (KeyReplayState), - IteratorId -end). - --define(ITERATION_WRITE_OPTS, []). --define(ITERATION_READ_OPTS, []). - -iterator_get_state(Shard, ReplayID) -> - #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), - rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS). - -iterator_put_state(ID, It = #it{shard = Shard}) -> - #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), - Serial = preserve_iterator_state(It), - rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS). - -iterator_delete(Shard, ID) -> - #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), - rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS). - -preserve_iterator_state(#it{ - gen = Gen, - replay = {TopicFilter, StartTime}, - module = Mod, - data = ItData -}) -> - term_to_binary(#{ - v => 1, - gen => Gen, - filter => TopicFilter, - start => StartTime, - st => Mod:preserve_iterator(ItData) - }). - -restore_iterator_state(Shard, Serial) when is_binary(Serial) -> - restore_iterator_state(Shard, binary_to_term(Serial)); -restore_iterator_state( - Shard, - #{ - v := 1, - gen := Gen, - filter := TopicFilter, - start := StartTime, - st := State - } -) -> - It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}}, - open_restore_iterator(meta_get_gen(Shard, Gen), It, State). - -do_list_iterator_prefix(Shard, KeyPrefix) -> - Fn = fun(K0, _V, Acc) -> - K = ?KEY_REPLAY_STATE_PAT(K0), - [K | Acc] - end, - do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, []). - -do_discard_iterator_prefix(Shard, KeyPrefix) -> - #db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db), - Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end, - do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, ok). - -do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> - #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), - case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of - {ok, It} -> - NextAction = {seek, KeyPrefix}, - do_foldl_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction, Fn, Acc); - Error -> - Error - end. - -do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction, Fn, Acc) -> - case rocksdb:iterator_move(It, NextAction) of - {ok, K = <>, V} -> - NewAcc = Fn(K, V, Acc), - do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, next, Fn, NewAcc); - {ok, _K, _V} -> - ok = rocksdb:iterator_close(It), - {ok, Acc}; - {error, invalid_iterator} -> - ok = rocksdb:iterator_close(It), - {ok, Acc}; - Error -> - ok = rocksdb:iterator_close(It), - Error - end. - -%% Functions for dealing with the metadata stored persistently in rocksdb - --define(CURRENT_GEN, <<"current">>). --define(SCHEMA_WRITE_OPTS, []). --define(SCHEMA_READ_OPTS, []). - --spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation(). -schema_get_gen(DBHandle, GenId) -> - {ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS), - binary_to_term(Bin). - --spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}. -schema_put_gen(DBHandle, GenId, Gen) -> - rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS). - --spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined. -schema_get_current(DBHandle) -> - case rocksdb:get(DBHandle, ?CURRENT_GEN, ?SCHEMA_READ_OPTS) of - {ok, Bin} -> - binary_to_integer(Bin); - not_found -> - undefined - end. - --spec schema_put_current(rocksdb:db_handle(), gen_id()) -> ok | {error, _}. -schema_put_current(DBHandle, GenId) -> - rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS). - --spec schema_gen_key(integer()) -> binary(). -schema_gen_key(N) -> - <<"gen", N:32>>. - --undef(CURRENT_GEN). --undef(SCHEMA_WRITE_OPTS). --undef(SCHEMA_READ_OPTS). - -%% Functions for dealing with the runtime shard metadata: - --define(PERSISTENT_TERM(SHARD, GEN), {?MODULE, SHARD, GEN}). - --spec meta_register_gen(emqx_ds:shard(), gen_id(), generation()) -> ok. -meta_register_gen(Shard, GenId, Gen) -> - Gs = - case GenId > 0 of - true -> meta_lookup(Shard, GenId - 1); - false -> [] +-spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()]. +generations_since(Shard, Since) -> + Schema = get_schema_runtime(Shard), + maps:fold( + fun + ({generation, GenId}, #{until := Until}, Acc) when Until >= Since -> + [GenId | Acc]; + (_K, _V, Acc) -> + Acc end, - ok = meta_put(Shard, GenId, [Gen | Gs]), - ok = meta_put(Shard, current, GenId). + [], + Schema + ). --spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}. -meta_lookup_gen(Shard, Time) -> - %% TODO - %% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning - %% towards a "no". - Current = meta_lookup(Shard, current), - Gens = meta_lookup(Shard, Current), - find_gen(Time, Current, Gens). +-define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). -find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since -> - {GenId, Gen}; -find_gen(Time, GenId, [_Gen | Rest]) -> - find_gen(Time, GenId - 1, Rest). +-spec get_schema_runtime(shard_id()) -> shard(). +get_schema_runtime(Shard) -> + persistent_term:get(?PERSISTENT_TERM(Shard)). --spec meta_get_gen(emqx_ds:shard(), gen_id()) -> generation() | undefined. -meta_get_gen(Shard, GenId) -> - case meta_lookup(Shard, GenId, []) of - [Gen | _Older] -> Gen; - [] -> undefined - end. +-spec put_schema_runtime(shard_id(), shard()) -> ok. +put_schema_runtime(Shard, RuntimeSchema) -> + persistent_term:put(?PERSISTENT_TERM(Shard), RuntimeSchema), + ok. --spec meta_get_current(emqx_ds:shard()) -> gen_id() | undefined. -meta_get_current(Shard) -> - meta_lookup(Shard, current, undefined). - --spec meta_lookup(emqx_ds:shard(), _K) -> _V. -meta_lookup(Shard, K) -> - persistent_term:get(?PERSISTENT_TERM(Shard, K)). - --spec meta_lookup(emqx_ds:shard(), _K, Default) -> _V | Default. -meta_lookup(Shard, K, Default) -> - persistent_term:get(?PERSISTENT_TERM(Shard, K), Default). - --spec meta_put(emqx_ds:shard(), _K, _V) -> ok. -meta_put(Shard, K, V) -> - persistent_term:put(?PERSISTENT_TERM(Shard, K), V). - --spec meta_erase(emqx_ds:shard()) -> ok. -meta_erase(Shard) -> - [ - persistent_term:erase(K) - || {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard - ], +-spec erase_schema_runtime(shard_id()) -> ok. +erase_schema_runtime(Shard) -> + persistent_term:erase(?PERSISTENT_TERM(Shard)), ok. -undef(PERSISTENT_TERM). -get_next_id(undefined) -> 0; -get_next_id(GenId) -> GenId + 1. +-define(ROCKSDB_SCHEMA_KEY, <<"schema_v1">>). -is_gen_valid(Shard, GenId, Since) when GenId > 0 -> - [GenPrev | _] = meta_lookup(Shard, GenId - 1), - case GenPrev of - #{since := SincePrev} when Since > SincePrev -> - ok; - #{} -> - {error, nonmonotonic} - end; -is_gen_valid(_Shard, 0, 0) -> - ok. +-spec get_schema_persistent(rocksdb:db_handle()) -> shard_schema() | not_found. +get_schema_persistent(DB) -> + case rocksdb:get(DB, ?ROCKSDB_SCHEMA_KEY, []) of + {ok, Blob} -> + Schema = binary_to_term(Blob), + %% Sanity check: + #{current_generation := _, default_generation_module := _} = Schema, + Schema; + not_found -> + not_found + end. -serialize(Msg) -> - %% TODO: remove topic, GUID, etc. from the stored - %% message. Reconstruct it from the metadata. - term_to_binary(emqx_message:to_map(Msg)). +-spec put_schema_persistent(rocksdb:db_handle(), shard_schema()) -> ok. +put_schema_persistent(DB, Schema) -> + Blob = term_to_binary(Schema), + rocksdb:put(DB, ?ROCKSDB_SCHEMA_KEY, Blob, []). -deserialize(Bin) -> - emqx_message:from_map(binary_to_term(Bin)). - - -%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. -%% store_cfs(DBHandle, CFRefs) -> -%% lists:foreach( -%% fun({CFName, CFRef}) -> -%% persistent_term:put({self(), CFName}, {DBHandle, CFRef}) -%% end, -%% CFRefs). +-undef(ROCKSDB_SCHEMA_KEY). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl_ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl_ new file mode 100644 index 000000000..32f18d18b --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl_ @@ -0,0 +1,714 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_storage_layer). + +-behaviour(gen_server). + +%% API: +-export([start_link/2]). +-export([create_generation/3]). + +-export([open_shard/2, get_streams/3]). +-export([message_store/3]). +-export([delete/4]). + +-export([make_iterator/3, next/1, next/2]). + +-export([ + preserve_iterator/2, + restore_iterator/2, + discard_iterator/2, + ensure_iterator/3, + discard_iterator_prefix/2, + list_iterator_prefix/2, + foldl_iterator_prefix/4 +]). + +%% gen_server callbacks: +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-export_type([stream/0, cf_refs/0, gen_id/0, options/0, state/0, iterator/0]). +-export_type([db_options/0, db_write_options/0, db_read_options/0]). + +-compile({inline, [meta_lookup/2]}). + +-include_lib("emqx/include/emqx.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-type options() :: #{ + dir => file:filename() +}. + +%% see rocksdb:db_options() +-type db_options() :: proplists:proplist(). +%% see rocksdb:write_options() +-type db_write_options() :: proplists:proplist(). +%% see rocksdb:read_options() +-type db_read_options() :: proplists:proplist(). + +-type cf_refs() :: [{string(), rocksdb:cf_handle()}]. + +%% Message storage generation +%% Keep in mind that instances of this type are persisted in long-term storage. +-type generation() :: #{ + %% Module that handles data for the generation + module := module(), + %% Module-specific data defined at generation creation time + data := term(), + %% When should this generation become active? + %% This generation should only contain messages timestamped no earlier than that. + %% The very first generation will have `since` equal 0. + since := emqx_ds:time() +}. + +-record(s, { + shard :: emqx_ds:shard(), + keyspace :: emqx_ds_conf:keyspace(), + db :: rocksdb:db_handle(), + cf_iterator :: rocksdb:cf_handle(), + cf_generations :: cf_refs() +}). + +-record(stream, + { generation :: gen_id() + , topic_filter :: emqx_ds:topic_filter() + , since :: emqx_ds:time() + , enc :: _EncapsultatedData + }). + +-opaque stream() :: #stream{}. + +-record(it, { + shard :: emqx_ds:shard(), + gen :: gen_id(), + replay :: emqx_ds:replay(), + module :: module(), + data :: term() +}). + +-type gen_id() :: 0..16#ffff. + +-opaque state() :: #s{}. +-opaque iterator() :: #it{}. + +%% Contents of the default column family: +%% +%% [{<<"genNN">>, #generation{}}, ..., +%% {<<"current">>, GenID}] + +-define(DEFAULT_CF, "default"). +-define(DEFAULT_CF_OPTS, []). + +-define(ITERATOR_CF, "$iterators"). + +%% TODO +%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`. +%% 2. Supposedly might be compressed _very_ effectively. +%% 3. `inplace_update_support`? +-define(ITERATOR_CF_OPTS, []). + +-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). + +%%================================================================================ +%% Callbacks +%%================================================================================ + +-callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) -> + {_Schema, cf_refs()}. + +-callback open( + emqx_ds:shard(), + rocksdb:db_handle(), + gen_id(), + cf_refs(), + _Schema +) -> + _DB. + +-callback store( + _DB, + _MessageID :: binary(), + emqx_ds:time(), + emqx_ds:topic(), + _Payload :: binary() +) -> + ok | {error, _}. + +-callback delete(_DB, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) -> + ok | {error, _}. + +-callback get_streams(_DB, emqx_ds:topic_filter(), emqx_ds:time()) -> + [{_TopicRankX, _Stream}]. + +-callback make_iterator(_DB, emqx_ds:replay()) -> + {ok, _It} | {error, _}. + +-callback restore_iterator(_DB, _Serialized :: binary()) -> {ok, _It} | {error, _}. + +-callback preserve_iterator(_It) -> term(). + +-callback next(It) -> {value, binary(), It} | none | {error, closed}. + +%%================================================================================ +%% Replication layer API +%%================================================================================ + +-spec open_shard(emqx_ds_replication_layer:shard(), emqx_ds_storage_layer:options()) -> ok. +open_shard(Shard, Options) -> + emqx_ds_storage_layer_sup:ensure_shard(Shard, Options). + +-spec get_streams(emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), _Stream}]. +get_streams(Shard, TopicFilter, StartTime) -> + %% TODO: lookup ALL generations + {GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, StartTime), + lists:map( + fun({RankX, ModStream}) -> + Stream = #stream{ generation = GenId + , topic_filter = TopicFilter + , since = StartTime + , enc = ModStream + }, + Rank = {RankX, GenId}, + {Rank, Stream} + end, + Mod:get_streams(ModState, TopicFilter, StartTime)). + +-spec message_store(emqx_ds:shard(), [emqx_types:message()], emqx_ds:message_store_opts()) -> + {ok, _MessageId} | {error, _}. +message_store(Shard, Msgs, _Opts) -> + {ok, lists:map( + fun(Msg) -> + GUID = emqx_message:id(Msg), + Timestamp = Msg#message.timestamp, + {_GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, Timestamp), + Topic = emqx_topic:words(emqx_message:topic(Msg)), + Payload = serialize(Msg), + Mod:store(ModState, GUID, Timestamp, Topic, Payload), + GUID + end, + Msgs)}. + +-spec next(iterator()) -> {ok, iterator(), [binary()]} | end_of_stream. +next(It = #it{}) -> + next(It, _BatchSize = 1). + +-spec next(iterator(), pos_integer()) -> {ok, iterator(), [binary()]} | end_of_stream. +next(#it{data = {?MODULE, end_of_stream}}, _BatchSize) -> + end_of_stream; +next( + It = #it{shard = Shard, module = Mod, gen = Gen, data = {?MODULE, retry, Serialized}}, BatchSize +) -> + #{data := DBData} = meta_get_gen(Shard, Gen), + {ok, ItData} = Mod:restore_iterator(DBData, Serialized), + next(It#it{data = ItData}, BatchSize); +next(It = #it{}, BatchSize) -> + do_next(It, BatchSize, _Acc = []). + +%%================================================================================ +%% API functions +%%================================================================================ + +-spec create_generation( + emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config() +) -> + {ok, gen_id()} | {error, nonmonotonic}. +create_generation(ShardId, Since, Config = {_Module, _Options}) -> + gen_server:call(?REF(ShardId), {create_generation, Since, Config}). + +-spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) -> + ok | {error, _}. +delete(Shard, GUID, Time, Topic) -> + {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), + Mod:delete(Data, GUID, Time, Topic). + +-spec make_iterator(emqx_ds:shard(), stream(), emqx_ds:time()) -> + {ok, iterator()} | {error, _TODO}. +make_iterator(Shard, Stream, StartTime) -> + #stream{ topic_filter = TopicFilter + , since = Since + , enc = Enc + } = Stream, + {GenId, Gen} = meta_lookup_gen(Shard, StartTime), + Replay = {TopicFilter, Since}, + case Mod:make_iterator(Data, Replay, Options) of + #it{ gen = GenId, + replay = {TopicFilter, Since} + }. + +-spec do_next(iterator(), non_neg_integer(), [binary()]) -> + {ok, iterator(), [binary()]} | end_of_stream. +do_next(It, N, Acc) when N =< 0 -> + {ok, It, lists:reverse(Acc)}; +do_next(It = #it{module = Mod, data = ItData}, N, Acc) -> + case Mod:next(ItData) of + {value, Bin, ItDataNext} -> + Val = deserialize(Bin), + do_next(It#it{data = ItDataNext}, N - 1, [Val | Acc]); + {error, _} = _Error -> + %% todo: log? + %% iterator might be invalid now; will need to re-open it. + Serialized = Mod:preserve_iterator(ItData), + {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)}; + none -> + case open_next_iterator(It) of + {ok, ItNext} -> + do_next(ItNext, N, Acc); + {error, _} = _Error -> + %% todo: log? + %% fixme: only bad options may lead to this? + %% return an "empty" iterator to be re-opened when retrying? + Serialized = Mod:preserve_iterator(ItData), + {ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)}; + none -> + case Acc of + [] -> + end_of_stream; + _ -> + {ok, It#it{data = {?MODULE, end_of_stream}}, lists:reverse(Acc)} + end + end + end. + +-spec preserve_iterator(iterator(), emqx_ds:iterator_id()) -> + ok | {error, _TODO}. +preserve_iterator(It = #it{}, IteratorID) -> + iterator_put_state(IteratorID, It). + +-spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> + {ok, iterator()} | {error, _TODO}. +restore_iterator(Shard, ReplayID) -> + case iterator_get_state(Shard, ReplayID) of + {ok, Serial} -> + restore_iterator_state(Shard, Serial); + not_found -> + {error, not_found}; + {error, _Reason} = Error -> + Error + end. + +-spec ensure_iterator(emqx_ds:shard(), emqx_ds:iterator_id(), emqx_ds:replay()) -> + {ok, iterator()} | {error, _TODO}. +ensure_iterator(Shard, IteratorID, Replay = {_TopicFilter, _StartMS}) -> + case restore_iterator(Shard, IteratorID) of + {ok, It} -> + {ok, It}; + {error, not_found} -> + {ok, It} = make_iterator(Shard, Replay), + ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), + {ok, It}; + Error -> + Error + end. + +-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> + ok | {error, _TODO}. +discard_iterator(Shard, ReplayID) -> + iterator_delete(Shard, ReplayID). + +-spec discard_iterator_prefix(emqx_ds:shard(), binary()) -> + ok | {error, _TODO}. +discard_iterator_prefix(Shard, KeyPrefix) -> + case do_discard_iterator_prefix(Shard, KeyPrefix) of + {ok, _} -> ok; + Error -> Error + end. + +-spec list_iterator_prefix( + emqx_ds:shard(), + binary() +) -> {ok, [emqx_ds:iterator_id()]} | {error, _TODO}. +list_iterator_prefix(Shard, KeyPrefix) -> + do_list_iterator_prefix(Shard, KeyPrefix). + +-spec foldl_iterator_prefix( + emqx_ds:shard(), + binary(), + fun((_Key :: binary(), _Value :: binary(), Acc) -> Acc), + Acc +) -> {ok, Acc} | {error, _TODO} when + Acc :: term(). +foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc). + +%%================================================================================ +%% gen_server +%%================================================================================ + +-spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> + {ok, pid()}. +start_link(Shard, Options) -> + gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). + +init({Shard, Options}) -> + process_flag(trap_exit, true), + {ok, S0} = do_open_db(Shard, Options), + S = ensure_current_generation(S0), + ok = populate_metadata(S), + {ok, S}. + +handle_call({create_generation, Since, Config}, _From, S) -> + case create_new_gen(Since, Config, S) of + {ok, GenId, NS} -> + {reply, {ok, GenId}, NS}; + {error, _} = Error -> + {reply, Error, S} + end; +handle_call(_Call, _From, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast(_Cast, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, #s{db = DB, shard = Shard}) -> + meta_erase(Shard), + ok = rocksdb:close(DB). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +-record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}). + +-spec populate_metadata(state()) -> ok. +populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) -> + ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}), + Current = schema_get_current(DBHandle), + lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)). + +-spec populate_metadata(gen_id(), state()) -> ok. +populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) -> + Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S), + meta_register_gen(Shard, GenId, Gen). + +-spec ensure_current_generation(state()) -> state(). +ensure_current_generation(S = #s{shard = _Shard, keyspace = Keyspace, db = DBHandle}) -> + case schema_get_current(DBHandle) of + undefined -> + Config = emqx_ds_conf:keyspace_config(Keyspace), + {ok, _, NS} = create_new_gen(0, Config, S), + NS; + _GenId -> + S + end. + +-spec create_new_gen(emqx_ds:time(), emqx_ds_conf:backend_config(), state()) -> + {ok, gen_id(), state()} | {error, nonmonotonic}. +create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) -> + GenId = get_next_id(meta_get_current(Shard)), + GenId = get_next_id(schema_get_current(DBHandle)), + case is_gen_valid(Shard, GenId, Since) of + ok -> + {ok, Gen, NS} = create_gen(GenId, Since, Config, S), + %% TODO: Transaction? Column family creation can't be transactional, anyway. + ok = schema_put_gen(DBHandle, GenId, Gen), + ok = schema_put_current(DBHandle, GenId), + ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)), + {ok, GenId, NS}; + {error, _} = Error -> + Error + end. + +-spec create_gen(gen_id(), emqx_ds:time(), emqx_ds_conf:backend_config(), state()) -> + {ok, generation(), state()}. +create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) -> + % TODO: Backend implementation should ensure idempotency. + {Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options), + Gen = #{ + module => Module, + data => Schema, + since => Since + }, + {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. + +-spec do_open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. +do_open_db(Shard, Options) -> + DefaultDir = binary_to_list(Shard), + DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)), + %% TODO: properly forward keyspace + Keyspace = maps:get(keyspace, Options, default_keyspace), + DBOptions = [ + {create_if_missing, true}, + {create_missing_column_families, true} + | emqx_ds_conf:db_options(Keyspace) + ], + _ = filelib:ensure_dir(DBDir), + ExistingCFs = + case rocksdb:list_column_families(DBDir, DBOptions) of + {ok, CFs} -> + [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF]; + % DB is not present. First start + {error, {db_open, _}} -> + [] + end, + ColumnFamilies = [ + {?DEFAULT_CF, ?DEFAULT_CF_OPTS}, + {?ITERATOR_CF, ?ITERATOR_CF_OPTS} + | ExistingCFs + ], + case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of + {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} -> + {CFNames, _} = lists:unzip(ExistingCFs), + {ok, #s{ + shard = Shard, + keyspace = Keyspace, + db = DBHandle, + cf_iterator = CFIterator, + cf_generations = lists:zip(CFNames, CFRefs) + }}; + Error -> + Error + end. + +-spec open_gen(gen_id(), generation(), state()) -> generation(). +open_gen( + GenId, + Gen = #{module := Mod, data := Data}, + #s{shard = Shard, db = DBHandle, cf_generations = CFs} +) -> + DB = Mod:open(Shard, DBHandle, GenId, CFs, Data), + Gen#{data := DB}. + +-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. +open_next_iterator(It = #it{shard = Shard, gen = GenId}) -> + open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}). + +open_next_iterator(undefined, _It) -> + none; +open_next_iterator(Gen = #{}, It) -> + open_iterator(Gen, It). + +-spec open_restore_iterator(generation(), iterator(), binary()) -> + {ok, iterator()} | {error, _Reason}. +open_restore_iterator(#{module := Mod, data := Data}, It = #it{}, Serial) -> + case Mod:restore_iterator(Data, Serial) of + {ok, ItData} -> + {ok, It#it{module = Mod, data = ItData}}; + Err -> + Err + end. + +%% + +-define(KEY_REPLAY_STATE(IteratorId), <<(IteratorId)/binary, "rs">>). +-define(KEY_REPLAY_STATE_PAT(KeyReplayState), begin + <> = (KeyReplayState), + IteratorId +end). + +-define(ITERATION_WRITE_OPTS, []). +-define(ITERATION_READ_OPTS, []). + +iterator_get_state(Shard, ReplayID) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), + rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS). + +iterator_put_state(ID, It = #it{shard = Shard}) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), + Serial = preserve_iterator_state(It), + rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS). + +iterator_delete(Shard, ID) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), + rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS). + +preserve_iterator_state(#it{ + gen = Gen, + replay = {TopicFilter, StartTime}, + module = Mod, + data = ItData +}) -> + term_to_binary(#{ + v => 1, + gen => Gen, + filter => TopicFilter, + start => StartTime, + st => Mod:preserve_iterator(ItData) + }). + +restore_iterator_state(Shard, Serial) when is_binary(Serial) -> + restore_iterator_state(Shard, binary_to_term(Serial)); +restore_iterator_state( + Shard, + #{ + v := 1, + gen := Gen, + filter := TopicFilter, + start := StartTime, + st := State + } +) -> + It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}}, + open_restore_iterator(meta_get_gen(Shard, Gen), It, State). + +do_list_iterator_prefix(Shard, KeyPrefix) -> + Fn = fun(K0, _V, Acc) -> + K = ?KEY_REPLAY_STATE_PAT(K0), + [K | Acc] + end, + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, []). + +do_discard_iterator_prefix(Shard, KeyPrefix) -> + #db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db), + Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end, + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, ok). + +do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), + case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of + {ok, It} -> + NextAction = {seek, KeyPrefix}, + do_foldl_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction, Fn, Acc); + Error -> + Error + end. + +do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction, Fn, Acc) -> + case rocksdb:iterator_move(It, NextAction) of + {ok, K = <>, V} -> + NewAcc = Fn(K, V, Acc), + do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, next, Fn, NewAcc); + {ok, _K, _V} -> + ok = rocksdb:iterator_close(It), + {ok, Acc}; + {error, invalid_iterator} -> + ok = rocksdb:iterator_close(It), + {ok, Acc}; + Error -> + ok = rocksdb:iterator_close(It), + Error + end. + +%% Functions for dealing with the metadata stored persistently in rocksdb + +-define(CURRENT_GEN, <<"current">>). +-define(SCHEMA_WRITE_OPTS, []). +-define(SCHEMA_READ_OPTS, []). + +-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation(). +schema_get_gen(DBHandle, GenId) -> + {ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS), + binary_to_term(Bin). + +-spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}. +schema_put_gen(DBHandle, GenId, Gen) -> + rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS). + +-spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined. +schema_get_current(DBHandle) -> + case rocksdb:get(DBHandle, ?CURRENT_GEN, ?SCHEMA_READ_OPTS) of + {ok, Bin} -> + binary_to_integer(Bin); + not_found -> + undefined + end. + +-spec schema_put_current(rocksdb:db_handle(), gen_id()) -> ok | {error, _}. +schema_put_current(DBHandle, GenId) -> + rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS). + +-spec schema_gen_key(integer()) -> binary(). +schema_gen_key(N) -> + <<"gen", N:32>>. + +-undef(CURRENT_GEN). +-undef(SCHEMA_WRITE_OPTS). +-undef(SCHEMA_READ_OPTS). + +%% Functions for dealing with the runtime shard metadata: + +-define(PERSISTENT_TERM(SHARD, GEN), {emqx_ds_storage_layer, SHARD, GEN}). + +-spec meta_register_gen(emqx_ds:shard(), gen_id(), generation()) -> ok. +meta_register_gen(Shard, GenId, Gen) -> + Gs = + case GenId > 0 of + true -> meta_lookup(Shard, GenId - 1); + false -> [] + end, + ok = meta_put(Shard, GenId, [Gen | Gs]), + ok = meta_put(Shard, current, GenId). + +-spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}. +meta_lookup_gen(Shard, Time) -> + %% TODO + %% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning + %% towards a "no". + Current = meta_lookup(Shard, current), + Gens = meta_lookup(Shard, Current), + find_gen(Time, Current, Gens). + +find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since -> + {GenId, Gen}; +find_gen(Time, GenId, [_Gen | Rest]) -> + find_gen(Time, GenId - 1, Rest). + +-spec meta_get_gen(emqx_ds:shard(), gen_id()) -> generation() | undefined. +meta_get_gen(Shard, GenId) -> + case meta_lookup(Shard, GenId, []) of + [Gen | _Older] -> Gen; + [] -> undefined + end. + +-spec meta_get_current(emqx_ds:shard()) -> gen_id() | undefined. +meta_get_current(Shard) -> + meta_lookup(Shard, current, undefined). + +-spec meta_lookup(emqx_ds:shard(), _K) -> _V. +meta_lookup(Shard, Key) -> + persistent_term:get(?PERSISTENT_TERM(Shard, Key)). + +-spec meta_lookup(emqx_ds:shard(), _K, Default) -> _V | Default. +meta_lookup(Shard, K, Default) -> + persistent_term:get(?PERSISTENT_TERM(Shard, K), Default). + +-spec meta_put(emqx_ds:shard(), _K, _V) -> ok. +meta_put(Shard, K, V) -> + persistent_term:put(?PERSISTENT_TERM(Shard, K), V). + +-spec meta_erase(emqx_ds:shard()) -> ok. +meta_erase(Shard) -> + [ + persistent_term:erase(K) + || {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard + ], + ok. + +-undef(PERSISTENT_TERM). + +get_next_id(undefined) -> 0; +get_next_id(GenId) -> GenId + 1. + +is_gen_valid(Shard, GenId, Since) when GenId > 0 -> + [GenPrev | _] = meta_lookup(Shard, GenId - 1), + case GenPrev of + #{since := SincePrev} when Since > SincePrev -> + ok; + #{} -> + {error, nonmonotonic} + end; +is_gen_valid(_Shard, 0, 0) -> + ok. + +serialize(Msg) -> + %% TODO: remove topic, GUID, etc. from the stored + %% message. Reconstruct it from the metadata. + term_to_binary(emqx_message:to_map(Msg)). + +deserialize(Bin) -> + emqx_message:from_map(binary_to_term(Bin)). + + +%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok. +%% store_cfs(DBHandle, CFRefs) -> +%% lists:foreach( +%% fun({CFName, CFRef}) -> +%% persistent_term:put({self(), CFName}, {DBHandle, CFRef}) +%% end, +%% CFRefs). diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_bitmask.erl_ similarity index 98% rename from apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl rename to apps/emqx_durable_storage/src/emqx_ds_storage_layer_bitmask.erl_ index 3290b03e6..bdf5a1453 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_bitmask.erl_ @@ -83,15 +83,11 @@ -export([create_new/3, open/5]). -export([make_keymapper/1]). --export([store/5]). --export([delete/4]). +-export([store/5, delete/4]). --export([get_streams/2]). --export([make_iterator/3, next/1]). +-export([get_streams/3, make_iterator/3, next/1]). --export([preserve_iterator/1]). --export([restore_iterator/2]). --export([refresh_iterator/1]). +-export([preserve_iterator/1, restore_iterator/2, refresh_iterator/1]). %% Debug/troubleshooting: %% Keymappers @@ -131,7 +127,7 @@ %% Type declarations %%================================================================================ --opaque stream() :: singleton_stream. +-opaque stream() :: emqx_ds:topic_filter(). -type topic() :: emqx_ds:topic(). -type topic_filter() :: emqx_ds:topic_filter(). @@ -290,10 +286,10 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper), rocksdb:delete(DBHandle, CFHandle, Key, DB#db.write_options). --spec get_streams(db(), emqx_ds:reply()) -> +-spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [stream()]. -get_streams(_, _) -> - [singleton_stream]. +get_streams(_, TopicFilter, _) -> + [{0, TopicFilter}]. -spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index 2e4f56f10..bf73e3ac8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -6,7 +6,7 @@ -behaviour(supervisor). %% API: --export([start_link/0, start_shard/2, stop_shard/1]). +-export([start_link/0, start_shard/2, stop_shard/1, ensure_shard/2]). %% behaviour callbacks: -export([init/1]). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl new file mode 100644 index 000000000..1fbad5f1b --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -0,0 +1,136 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Reference implementation of the storage. +%% +%% Trivial, extremely slow and inefficient. It also doesn't handle +%% restart of the Erlang node properly, so obviously it's only to be +%% used for testing. +-module(emqx_ds_storage_reference). + +-behavior(emqx_ds_storage_layer). + +%% API: +-export([]). + +%% behavior callbacks: +-export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/4, next/4]). + +%% internal exports: +-export([]). + +-export_type([]). + +-include_lib("emqx/include/emqx.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +%% Permanent state: +-record(schema, {}). + +%% Runtime state: +-record(s, { + db :: rocksdb:db_handle(), + cf :: rocksdb:cf_handle() +}). + +-record(stream, {topic_filter :: emqx_ds:topic_filter()}). + +-record(it, { + topic_filter :: emqx_ds:topic_filter(), + start_time :: emqx_ds:time(), + last_seen_message_key = first :: binary() | first +}). + +%%================================================================================ +%% API funcions +%%================================================================================ + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +create(_ShardId, DBHandle, GenId, _Options) -> + CFName = data_cf(GenId), + {ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, []), + Schema = #schema{}, + {Schema, [{CFName, CFHandle}]}. + +open(_Shard, DBHandle, GenId, CFRefs, #schema{}) -> + {_, CF} = lists:keyfind(data_cf(GenId), 1, CFRefs), + #s{db = DBHandle, cf = CF}. + +store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> + lists:foreach( + fun(Msg) -> + Id = erlang:unique_integer([monotonic]), + Key = <>, + Val = term_to_binary(Msg), + rocksdb:put(DB, CF, Key, Val, []) + end, + Messages + ). + +get_streams(_Shard, _Data, TopicFilter, _StartTime) -> + [#stream{topic_filter = TopicFilter}]. + +make_iterator(_Shard, _Data, #stream{topic_filter = TopicFilter}, StartTime) -> + {ok, #it{ + topic_filter = TopicFilter, + start_time = StartTime + }}. + +next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> + #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, + {ok, ITHandle} = rocksdb:iterator(DB, CF, []), + Action = case Key0 of + first -> + first; + _ -> + rocksdb:iterator_move(ITHandle, Key0), + next + end, + {Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []), + rocksdb:iterator_close(ITHandle), + It = It0#it{last_seen_message_key = Key}, + {ok, It, lists:reverse(Messages)}. + +%%================================================================================ +%% Internal functions +%%================================================================================ + +do_next(_, _, _, _, 0, Key, Acc) -> + {Key, Acc}; +do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) -> + case rocksdb:iterator_move(IT, Action) of + {ok, Key, Blob} -> + Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob), + case emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime of + true -> + do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [Msg | Acc]); + false -> + do_next(TopicFilter, StartTime, IT, next, NLeft, Key, Acc) + end; + {error, invalid_iterator} -> + {Key0, Acc} + end. + +%% @doc Generate a column family ID for the MQTT messages +-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. +data_cf(GenId) -> + ?MODULE_STRING ++ integer_to_list(GenId). diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 79285fe16..df3d64bc3 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -17,8 +17,9 @@ -behavior(emqx_bpapi). +-include_lib("emqx/include/bpapi.hrl"). %% API: --export([]). +-export([open_shard/3, get_streams/4, make_iterator/4, next/4]). %% behavior callbacks: -export([introduced_in/0]). @@ -27,23 +28,29 @@ %% API funcions %%================================================================================ --spec create_shard(node(), emqx_ds_replication_layer:shard(), emqx_ds:create_db_opts()) -> - ok. -create_shard(Node, Shard, Opts) -> - erpc:call(Node, emqx_ds_replication_layer, do_create_shard_v1, [Shard, Opts]). +-spec open_shard(node(), emqx_ds_replication_layer:shard(), emqx_ds:create_db_opts()) -> + ok. +open_shard(Node, Shard, Opts) -> + erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [Shard, Opts]). --spec get_streams(node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> - [emqx_ds_replication_layer:stream()]. +-spec get_streams( + node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time() +) -> + [{integer(), emqx_ds_replication_layer:stream()}]. get_streams(Node, Shard, TopicFilter, Time) -> erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). --spec open_iterator(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:stream(), emqx_ds:time()) -> - {ok, emqx_ds_replication_layer:iterator()} | {error, _}. -open_iterator(Node, Shard, Stream, StartTime) -> - erpc:call(Node, emqx_ds_replication_layer, do_open_iterator_v1, [Shard, Stream, StartTime]). +-spec make_iterator(node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:time()) -> + {ok, emqx_ds_replication_layer:iterator()} | {error, _}. +make_iterator(Node, Shard, Stream, StartTime) -> + erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [Shard, Stream, StartTime]). --spec next(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer()) -> - {ok, emqx_ds_replication_layer:iterator(), [emqx_types:messages()]} | end_of_stream. +-spec next( + node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer() +) -> + {ok, emqx_ds_replication_layer:iterator(), [emqx_types:messages()]} + | {ok, end_of_stream} + | {error, _}. next(Node, Shard, Iter, BatchSize) -> erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [Shard, Iter, BatchSize]). diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl new file mode 100644 index 000000000..effe3b695 --- /dev/null +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -0,0 +1,107 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +%% A simple smoke test that verifies that opening the DB doesn't crash +t_00_smoke_open(_Config) -> + ?assertMatch(ok, emqx_ds:open_db(<<"DB1">>, #{})), + ?assertMatch(ok, emqx_ds:open_db(<<"DB1">>, #{})). + +%% A simple smoke test that verifies that storing the messages doesn't +%% crash +t_01_smoke_store(_Config) -> + DB = <<"default">>, + ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + Msg = message(<<"foo/bar">>, <<"foo">>, 0), + ?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])). + +%% A simple smoke test that verifies that getting the list of streams +%% doesn't crash and that iterators can be opened. +t_02_smoke_get_streams_start_iter(_Config) -> + DB = <<"default">>, + ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + StartTime = 0, + [{Rank, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime), + ?assertMatch({_, _}, Rank), + ?assertMatch({ok, _Iter}, emqx_ds:make_iterator(Stream, StartTime)). + +%% A simple smoke test that verifies that it's possible to iterate +%% over messages. +t_03_smoke_iterate(_Config) -> + DB = atom_to_binary(?FUNCTION_NAME), + ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + StartTime = 0, + Msgs = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo">>, <<"2">>, 1), + message(<<"bar/bar">>, <<"3">>, 2) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + [{_, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime), + {ok, Iter0} = emqx_ds:make_iterator(Stream, StartTime), + {ok, Iter, Batch} = iterate(Iter0, 1), + ?assertEqual(Msgs, Batch, {Iter0, Iter}). + +message(Topic, Payload, PublishedAt) -> + #message{ + topic = Topic, + payload = Payload, + timestamp = PublishedAt, + id = emqx_guid:gen() + }. + +iterate(It, BatchSize) -> + iterate(It, BatchSize, []). + +iterate(It0, BatchSize, Acc) -> + case emqx_ds:next(It0, BatchSize) of + {ok, It, []} -> + {ok, It, Acc}; + {ok, It, Msgs} -> + iterate(It, BatchSize, Acc ++ Msgs); + Ret -> + Ret + end. + +%% CT callbacks + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [mria, emqx_durable_storage], + #{work_dir => ?config(priv_dir, Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)), + ok. + +init_per_testcase(_TC, Config) -> + snabbkaffe:fix_ct_logging(), + application:ensure_all_started(emqx_durable_storage), + Config. + +end_per_testcase(_TC, _Config) -> + ok = application:stop(emqx_durable_storage). diff --git a/apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl_ similarity index 100% rename from apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl rename to apps/emqx_durable_storage/test/emqx_ds_message_storage_bitmask_SUITE.erl_ diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl_ similarity index 100% rename from apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl rename to apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl_ diff --git a/scripts/check-elixir-applications.exs b/scripts/check-elixir-applications.exs index 42c838199..1e604c69f 100755 --- a/scripts/check-elixir-applications.exs +++ b/scripts/check-elixir-applications.exs @@ -1,4 +1,4 @@ -#!/usr/bin/env elixir +#! /usr/bin/env elixir defmodule CheckElixirApplications do alias EMQXUmbrella.MixProject diff --git a/scripts/check-elixir-deps-discrepancies.exs b/scripts/check-elixir-deps-discrepancies.exs index 408079d7d..1363219ed 100755 --- a/scripts/check-elixir-deps-discrepancies.exs +++ b/scripts/check-elixir-deps-discrepancies.exs @@ -1,4 +1,4 @@ -#!/usr/bin/env elixir +#! /usr/bin/env elixir # ensure we have a fresh rebar.lock diff --git a/scripts/check-elixir-emqx-machine-boot-discrepancies.exs b/scripts/check-elixir-emqx-machine-boot-discrepancies.exs index d07e6978f..9ffdc47bf 100755 --- a/scripts/check-elixir-emqx-machine-boot-discrepancies.exs +++ b/scripts/check-elixir-emqx-machine-boot-discrepancies.exs @@ -1,4 +1,4 @@ -#!/usr/bin/env elixir +#! /usr/bin/env elixir defmodule CheckElixirEMQXMachineBootDiscrepancies do alias EMQXUmbrella.MixProject diff --git a/scripts/check_missing_reboot_apps.exs b/scripts/check_missing_reboot_apps.exs index 91d4b39ea..7f2178ec1 100755 --- a/scripts/check_missing_reboot_apps.exs +++ b/scripts/check_missing_reboot_apps.exs @@ -1,4 +1,4 @@ -#!/usr/bin/env elixir +#! /usr/bin/env elixir alias EMQXUmbrella.MixProject