refactor(ds): Passthrough open_db and get_channels to storage layer

This commit is contained in:
ieQu1 2023-10-03 17:13:16 +02:00
parent 59d01dc823
commit c6a721a7eb
20 changed files with 1683 additions and 991 deletions

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_ds_SUITE). -module(emqx_persistent_session_ds_SUITE).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).

View File

@ -27,10 +27,6 @@
]). ]).
%% FIXME %% FIXME
-define(DS_SHARD_ID, <<"local">>).
-define(DEFAULT_KEYSPACE, default).
-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
-define(WHEN_ENABLED(DO), -define(WHEN_ENABLED(DO),
case is_store_enabled() of case is_store_enabled() of
true -> DO; true -> DO;
@ -42,9 +38,9 @@
init() -> init() ->
?WHEN_ENABLED(begin ?WHEN_ENABLED(begin
ok = emqx_ds:create_db(<<"default">>, #{}), ok = emqx_ds:open_db(<<"default">>, #{}),
ok = emqx_persistent_session_ds_router:init_tables(), ok = emqx_persistent_session_ds_router:init_tables(),
ok = emqx_persistent_session_ds:create_tables(), %ok = emqx_persistent_session_ds:create_tables(),
ok ok
end). end).
@ -70,7 +66,7 @@ needs_persistence(Msg) ->
not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)). not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)).
store_message(Msg) -> store_message(Msg) ->
emqx_ds:message_store([Msg]). emqx_ds:store_batch([Msg]).
has_subscribers(#message{topic = Topic}) -> has_subscribers(#message{topic = Topic}) ->
emqx_persistent_session_ds_router:has_any_route(Topic). emqx_persistent_session_ds_router:has_any_route(Topic).

View File

@ -62,22 +62,6 @@
-export([session_open/1]). -export([session_open/1]).
-endif. -endif.
%% RPC
-export([
ensure_iterator_closed_on_all_shards/1,
ensure_all_iterators_closed/1
]).
-export([
do_open_iterator/3,
do_ensure_iterator_closed/1,
do_ensure_all_iterators_closed/1
]).
%% FIXME
-define(DS_SHARD_ID, atom_to_binary(node())).
-define(DEFAULT_KEYSPACE, default).
-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
%% an atom, in theory (?). %% an atom, in theory (?).
-type id() :: binary(). -type id() :: binary().
@ -157,7 +141,6 @@ destroy(#{clientid := ClientID}) ->
destroy_session(ClientID). destroy_session(ClientID).
destroy_session(ClientID) -> destroy_session(ClientID) ->
_ = ensure_all_iterators_closed(ClientID),
session_drop(ClientID). session_drop(ClientID).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -410,9 +393,9 @@ open_iterator_on_all_shards(TopicFilter, Iterator) ->
%% RPC target. %% RPC target.
-spec do_open_iterator(emqx_types:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> -spec do_open_iterator(emqx_types:words(), emqx_ds:time(), emqx_ds:iterator_id()) ->
{ok, emqx_ds_storage_layer:iterator()} | {error, _Reason}. {ok, emqx_ds_storage_layer:iterator()} | {error, _Reason}.
do_open_iterator(TopicFilter, StartMS, IteratorID) -> do_open_iterator(TopicFilter, StartMS, _IteratorID) ->
Replay = {TopicFilter, StartMS}, %% TODO: wrong
emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay). {ok, emqx_ds:make_iterator(TopicFilter, StartMS)}.
-spec del_subscription(topic(), iterator(), id()) -> -spec del_subscription(topic(), iterator(), id()) ->
ok. ok.
@ -420,49 +403,8 @@ del_subscription(TopicFilterBin, #{id := IteratorID}, DSSessionID) ->
% N.B.: see comments in `?MODULE:add_subscription' for a discussion about the % N.B.: see comments in `?MODULE:add_subscription' for a discussion about the
% order of operations here. % order of operations here.
TopicFilter = emqx_topic:words(TopicFilterBin), TopicFilter = emqx_topic:words(TopicFilterBin),
Ctx = #{iterator_id => IteratorID},
?tp_span(
persistent_session_ds_close_iterators,
Ctx,
ok = ensure_iterator_closed_on_all_shards(IteratorID)
),
?tp_span(
persistent_session_ds_iterator_delete,
Ctx,
session_del_iterator(DSSessionID, TopicFilter)
),
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID). ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID).
-spec ensure_iterator_closed_on_all_shards(emqx_ds:iterator_id()) -> ok.
ensure_iterator_closed_on_all_shards(IteratorID) ->
%% Note: currently, shards map 1:1 to nodes, but this will change in the future.
Nodes = emqx:running_nodes(),
Results = emqx_persistent_session_ds_proto_v1:close_iterator(Nodes, IteratorID),
%% TODO: handle errors
true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results),
ok.
%% RPC target.
-spec do_ensure_iterator_closed(emqx_ds:iterator_id()) -> ok.
do_ensure_iterator_closed(IteratorID) ->
ok = emqx_ds_storage_layer:discard_iterator(?DS_SHARD, IteratorID),
ok.
-spec ensure_all_iterators_closed(id()) -> ok.
ensure_all_iterators_closed(DSSessionID) ->
%% Note: currently, shards map 1:1 to nodes, but this will change in the future.
Nodes = emqx:running_nodes(),
Results = emqx_persistent_session_ds_proto_v1:close_all_iterators(Nodes, DSSessionID),
%% TODO: handle errors
true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results),
ok.
%% RPC target.
-spec do_ensure_all_iterators_closed(id()) -> ok.
do_ensure_all_iterators_closed(DSSessionID) ->
ok = emqx_ds_storage_layer:discard_iterator_prefix(?DS_SHARD, DSSessionID),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Session tables operations %% Session tables operations
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -56,14 +56,16 @@ open_iterator(Nodes, TopicFilter, StartMS, IteratorID) ->
emqx_ds:keyspace(), emqx_ds:keyspace(),
emqx_ds:shard_id(), emqx_ds:shard_id(),
emqx_ds:topic_filter(), emqx_ds:topic_filter(),
emqx_ds:time()) -> emqx_ds:time()
) ->
[emqx_ds_storage_layer:stream()]. [emqx_ds_storage_layer:stream()].
get_streams(Node, Keyspace, ShardId, TopicFilter, StartTime) -> get_streams(Node, Keyspace, ShardId, TopicFilter, StartTime) ->
erpc:call( erpc:call(
Node, Node,
emqx_ds_storage_layer, emqx_ds_storage_layer,
get_streams, get_streams,
[Keyspace, ShardId, TopicFilter, StartTime]). [Keyspace, ShardId, TopicFilter, StartTime]
).
-spec close_iterator( -spec close_iterator(
[node()], [node()],

View File

@ -13,31 +13,44 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Main interface module for `emqx_durable_storage' application.
%%
%% It takes care of forwarding calls to the underlying DBMS. Currently
%% only the embedded `emqx_ds_replication_layer' storage is supported,
%% so all the calls are simply passed through.
-module(emqx_ds). -module(emqx_ds).
%% Management API: %% Management API:
-export([open_db/2]). -export([open_db/2]).
%% Message storage API: %% Message storage API:
-export([message_store/1, message_store/2, message_store/3]). -export([store_batch/1, store_batch/2, store_batch/3]).
%% Message replay API: %% Message replay API:
-export([get_streams/3, open_iterator/2, next/2]). -export([get_streams/3, make_iterator/2, next/2]).
%% internal exports: %% Misc. API:
-export([]). -export([]).
-export_type([db/0, time/0, topic_filter/0, topic/0]). -export_type([
db/0,
time/0,
topic_filter/0,
topic/0,
stream/0,
stream_rank/0,
iterator/0,
next_result/1, next_result/0,
store_batch_result/0,
make_iterator_result/1, make_iterator_result/0
]).
%%================================================================================ %%================================================================================
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
%% Different DBs are completely independent from each other. They -type db() :: emqx_ds_replication_layer:db().
%% could represent something like different tenants.
%%
%% Topics stored in different DBs aren't necesserily disjoint.
-type db() :: binary().
%% Parsed topic. %% Parsed topic.
-type topic() :: list(binary()). -type topic() :: list(binary()).
@ -45,30 +58,22 @@
%% Parsed topic filter. %% Parsed topic filter.
-type topic_filter() :: list(binary() | '+' | '#' | ''). -type topic_filter() :: list(binary() | '+' | '#' | '').
%% This record enapsulates the stream entity from the replication
%% level.
%%
%% TODO: currently the stream is hardwired to only support the
%% internal rocksdb storage. In t he future we want to add another
%% implementations for emqx_ds, so this type has to take this into
%% account.
-record(stream,
{ shard :: emqx_ds_replication_layer:shard()
, enc :: emqx_ds_replication_layer:stream()
}).
-type stream_rank() :: {integer(), integer()}. -type stream_rank() :: {integer(), integer()}.
-opaque stream() :: #stream{}. -opaque stream() :: emqx_ds_replication_layer:stream().
%% This record encapsulates the iterator entity from the replication -opaque iterator() :: emqx_ds_replication_layer:iterator().
%% level.
-record(iterator,
{ shard :: emqx_ds_replication_layer:shard()
, enc :: enqx_ds_replication_layer:iterator()
}).
-opaque iterator() :: #iterator{}. -type store_batch_result() :: ok | {error, _}.
-type make_iterator_result(Iterator) :: {ok, Iterator} | {error, _}.
-type make_iterator_result() :: make_iterator_result(iterator()).
-type next_result(Iterator) ::
{ok, Iterator, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}.
-type next_result() :: next_result(iterator()).
%% Timestamp %% Timestamp
%% Earliest possible timestamp is 0. %% Earliest possible timestamp is 0.
@ -78,7 +83,9 @@
-type message_store_opts() :: #{}. -type message_store_opts() :: #{}.
-type create_db_opts() :: #{}. -type create_db_opts() ::
%% TODO: keyspace
#{}.
-type message_id() :: emqx_ds_replication_layer:message_id(). -type message_id() :: emqx_ds_replication_layer:message_id().
@ -88,24 +95,24 @@
%% API funcions %% API funcions
%%================================================================================ %%================================================================================
%% @doc Different DBs are completely independent from each other. They
%% could represent something like different tenants.
-spec open_db(db(), create_db_opts()) -> ok. -spec open_db(db(), create_db_opts()) -> ok.
open_db(DB, Opts) -> open_db(DB, Opts) ->
emqx_ds_replication_layer:open_db(DB, Opts). emqx_ds_replication_layer:open_db(DB, Opts).
-spec message_store([emqx_types:message()]) -> -spec store_batch([emqx_types:message()]) -> store_batch_result().
{ok, [message_id()]} | {error, _}. store_batch(Msgs) ->
message_store(Msgs) -> store_batch(?DEFAULT_DB, Msgs, #{}).
message_store(?DEFAULT_DB, Msgs, #{}).
-spec message_store(db(), [emqx_types:message()], message_store_opts()) -> -spec store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result().
{ok, [message_id()]} | {error, _}. store_batch(DB, Msgs, Opts) ->
message_store(DB, Msgs, Opts) -> emqx_ds_replication_layer:store_batch(DB, Msgs, Opts).
emqx_ds_replication_layer:message_store(DB, Msgs, Opts).
%% TODO: Do we really need to return message IDs? It's extra work... %% TODO: Do we really need to return message IDs? It's extra work...
-spec message_store(db(), [emqx_types:message()]) -> {ok, [message_id()]} | {error, _}. -spec store_batch(db(), [emqx_types:message()]) -> {ok, [message_id()]} | {error, _}.
message_store(DB, Msgs) -> store_batch(DB, Msgs) ->
message_store(DB, Msgs, #{}). store_batch(DB, Msgs, #{}).
%% @doc Get a list of streams needed for replaying a topic filter. %% @doc Get a list of streams needed for replaying a topic filter.
%% %%
@ -113,56 +120,44 @@ message_store(DB, Msgs) ->
%% different locations or even in different databases. A wildcard %% different locations or even in different databases. A wildcard
%% topic filter may require pulling data from any number of locations. %% topic filter may require pulling data from any number of locations.
%% %%
%% Stream is an abstraction exposed by `emqx_ds' that reflects the %% Stream is an abstraction exposed by `emqx_ds' that, on one hand,
%% notion that different topics can be stored differently, but hides %% reflects the notion that different topics can be stored
%% the implementation details. %% differently, but hides the implementation details.
%% %%
%% Rules: %% Rules:
%% %%
%% 1. New streams matching the topic filter can appear without notice, %% 0. There is no 1-to-1 mapping between MQTT topics and streams. One
%% so the replayer must periodically call this function to get the %% stream can contain any number of MQTT topics.
%% updated list of streams. %%
%% 1. New streams matching the topic filter and start time can appear
%% without notice, so the replayer must periodically call this
%% function to get the updated list of streams.
%% %%
%% 2. Streams may depend on one another. Therefore, care should be %% 2. Streams may depend on one another. Therefore, care should be
%% taken while replaying them in parallel to avoid out-of-order %% taken while replaying them in parallel to avoid out-of-order
%% replay. This function returns stream together with its %% replay. This function returns stream together with its
%% "coordinates": `{X, T, Stream}'. If X coordinate of two streams is %% "coordinate": `stream_rank()'.
%% different, then they can be replayed in parallel. If it's the %%
%% same, then the stream with smaller T coordinate should be replayed %% Stream rank is a tuple of two integers, let's call them X and Y. If
%% first. %% X coordinate of two streams is different, they are independent and
%% can be replayed in parallel. If it's the same, then the stream with
%% smaller Y coordinate should be replayed first. If Y coordinates are
%% equal, then the streams are independent.
%%
%% Stream is fully consumed when `next/3' function returns
%% `end_of_stream'. Then the client can proceed to replaying streams
%% that depend on the given one.
-spec get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}]. -spec get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}].
get_streams(DB, TopicFilter, StartTime) -> get_streams(DB, TopicFilter, StartTime) ->
Shards = emqx_ds_replication_layer:list_shards(DB), emqx_ds_replication_layer:get_streams(DB, TopicFilter, StartTime).
lists:flatmap(
fun(Shard) ->
Streams = emqx_ds_replication_layer:get_streams(Shard, TopicFilter, StartTime),
[{Rank, #stream{ shard = Shard
, enc = I
}} || {Rank, I} <- Streams]
end,
Shards).
-spec open_iterator(stream(), time()) -> {ok, iterator()} | {error, _}. -spec make_iterator(stream(), time()) -> make_iterator_result().
open_iterator(#stream{shard = Shard, enc = Stream}, StartTime) -> make_iterator(Stream, StartTime) ->
case emqx_ds_replication_layer:open_iterator(Shard, Stream, StartTime) of emqx_ds_replication_layer:make_iterator(Stream, StartTime).
{ok, Iter} ->
{ok, #iterator{shard = Shard, enc = Iter}};
Err = {error, _} ->
Err
end.
-spec next(iterator(), pos_integer()) -> {ok, iterator(), [emqx_types:message()]} | end_of_stream. -spec next(iterator(), pos_integer()) -> next_result().
next(#iterator{shard = Shard, enc = Iter0}, BatchSize) -> next(Iter, BatchSize) ->
case emqx_ds_replication_layer:next(Shard, Iter0, BatchSize) of emqx_ds_replication_layer:next(Iter, BatchSize).
{ok, Iter, Batch} ->
{ok, #iterator{shard = Shard, enc = Iter}, Batch};
end_of_stream ->
end_of_stream
end.
%%================================================================================
%% behavior callbacks
%%================================================================================
%%================================================================================ %%================================================================================
%% Internal exports %% Internal exports

View File

@ -34,7 +34,8 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-define(EOT, []). %% End Of Topic %% End Of Topic
-define(EOT, []).
-define(PLUS, '+'). -define(PLUS, '+').
-type edge() :: binary() | ?EOT | ?PLUS. -type edge() :: binary() | ?EOT | ?PLUS.
@ -49,17 +50,17 @@
-type threshold_fun() :: fun((non_neg_integer()) -> non_neg_integer()). -type threshold_fun() :: fun((non_neg_integer()) -> non_neg_integer()).
-record(trie, -record(trie, {
{ trie :: ets:tid() trie :: ets:tid(),
, stats :: ets:tid() stats :: ets:tid()
}). }).
-opaque trie() :: #trie{}. -opaque trie() :: #trie{}.
-record(trans, -record(trans, {
{ key :: {state(), edge()} key :: {state(), edge()},
, next :: state() next :: state()
}). }).
%%================================================================================ %%================================================================================
%% API funcions %% API funcions
@ -70,8 +71,9 @@
trie_create() -> trie_create() ->
Trie = ets:new(trie, [{keypos, #trans.key}, set]), Trie = ets:new(trie, [{keypos, #trans.key}, set]),
Stats = ets:new(stats, [{keypos, 1}, set]), Stats = ets:new(stats, [{keypos, 1}, set]),
#trie{ trie = Trie #trie{
, stats = Stats trie = Trie,
stats = Stats
}. }.
%% @doc Create a topic key, %% @doc Create a topic key,
@ -98,10 +100,11 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) ->
lists:foldl( lists:foldl(
fun(#trans{key = {From, Label}, next = To}, {AccN, AccEdge}) -> fun(#trans{key = {From, Label}, next = To}, {AccN, AccEdge}) ->
Edge = {From, To, Label}, Edge = {From, To, Label},
{[From, To] ++ AccN, [Edge|AccEdge]} {[From, To] ++ AccN, [Edge | AccEdge]}
end, end,
{[], []}, {[], []},
L), L
),
Nodes = Nodes =
lists:map( lists:map(
fun(Node) -> fun(Node) ->
@ -111,9 +114,11 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) ->
end, end,
{Node, NChildren} {Node, NChildren}
end, end,
lists:usort(Nodes0)), lists:usort(Nodes0)
),
{ok, FD} = file:open(Filename, [write]), {ok, FD} = file:open(Filename, [write]),
Print = fun (?PREFIX) -> "prefix"; Print = fun
(?PREFIX) -> "prefix";
(NodeId) -> binary:encode_hex(NodeId) (NodeId) -> binary:encode_hex(NodeId)
end, end,
io:format(FD, "digraph {~n", []), io:format(FD, "digraph {~n", []),
@ -122,12 +127,14 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) ->
Id = Print(Node), Id = Print(Node),
io:format(FD, " \"~s\" [label=\"~s : ~p\"];~n", [Id, Id, NChildren]) io:format(FD, " \"~s\" [label=\"~s : ~p\"];~n", [Id, Id, NChildren])
end, end,
Nodes), Nodes
),
lists:foreach( lists:foreach(
fun({From, To, Label}) -> fun({From, To, Label}) ->
io:format(FD, " \"~s\" -> \"~s\" [label=\"~s\"];~n", [Print(From), Print(To), Label]) io:format(FD, " \"~s\" -> \"~s\" [label=\"~s\"];~n", [Print(From), Print(To), Label])
end, end,
Edges), Edges
),
io:format(FD, "}~n", []), io:format(FD, "}~n", []),
file:close(FD). file:close(FD).
@ -135,8 +142,8 @@ dump_to_dot(#trie{trie = Trie, stats = Stats}, Filename) ->
%% Internal exports %% Internal exports
%%================================================================================ %%================================================================================
-spec trie_next(trie(), state(), binary() | ?EOT) -> {Wildcard, state()} | undefined -spec trie_next(trie(), state(), binary() | ?EOT) -> {Wildcard, state()} | undefined when
when Wildcard :: boolean(). Wildcard :: boolean().
trie_next(#trie{trie = Trie}, State, ?EOT) -> trie_next(#trie{trie = Trie}, State, ?EOT) ->
case ets:lookup(Trie, {State, ?EOT}) of case ets:lookup(Trie, {State, ?EOT}) of
[#trans{next = Next}] -> {false, Next}; [#trans{next = Next}] -> {false, Next};
@ -153,17 +160,19 @@ trie_next(#trie{trie = Trie}, State, Token) ->
end end
end. end.
-spec trie_insert(trie(), state(), edge()) -> {Updated, state()} -spec trie_insert(trie(), state(), edge()) -> {Updated, state()} when
when Updated :: false | non_neg_integer(). Updated :: false | non_neg_integer().
trie_insert(#trie{trie = Trie, stats = Stats}, State, Token) -> trie_insert(#trie{trie = Trie, stats = Stats}, State, Token) ->
Key = {State, Token}, Key = {State, Token},
NewState = get_id_for_key(State, Token), NewState = get_id_for_key(State, Token),
Rec = #trans{ key = Key Rec = #trans{
, next = NewState key = Key,
next = NewState
}, },
case ets:insert_new(Trie, Rec) of case ets:insert_new(Trie, Rec) of
true -> true ->
Inc = case Token of Inc =
case Token of
?EOT -> 0; ?EOT -> 0;
?PLUS -> 0; ?PLUS -> 0;
_ -> 1 _ -> 1
@ -207,25 +216,29 @@ do_match_topics(Trie, State, Varying, []) ->
do_match_topics(Trie, State, Varying, ['#']) -> do_match_topics(Trie, State, Varying, ['#']) ->
Emanating = emanating(Trie, State, ?PLUS), Emanating = emanating(Trie, State, ?PLUS),
lists:flatmap( lists:flatmap(
fun({?EOT, Static}) -> fun
({?EOT, Static}) ->
[{Static, lists:reverse(Varying)}]; [{Static, lists:reverse(Varying)}];
({?PLUS, NextState}) -> ({?PLUS, NextState}) ->
do_match_topics(Trie, NextState, [?PLUS|Varying], ['#']); do_match_topics(Trie, NextState, [?PLUS | Varying], ['#']);
({_, NextState}) -> ({_, NextState}) ->
do_match_topics(Trie, NextState, Varying, ['#']) do_match_topics(Trie, NextState, Varying, ['#'])
end, end,
Emanating); Emanating
do_match_topics(Trie, State, Varying, [Level|Rest]) -> );
do_match_topics(Trie, State, Varying, [Level | Rest]) ->
Emanating = emanating(Trie, State, Level), Emanating = emanating(Trie, State, Level),
lists:flatmap( lists:flatmap(
fun({?EOT, _NextState}) -> fun
({?EOT, _NextState}) ->
[]; [];
({?PLUS, NextState}) -> ({?PLUS, NextState}) ->
do_match_topics(Trie, NextState, [Level|Varying], Rest); do_match_topics(Trie, NextState, [Level | Varying], Rest);
({_, NextState}) -> ({_, NextState}) ->
do_match_topics(Trie, NextState, Varying, Rest) do_match_topics(Trie, NextState, Varying, Rest)
end, end,
Emanating). Emanating
).
-spec do_lookup_topic_key(trie(), state(), [binary()], [binary()]) -> -spec do_lookup_topic_key(trie(), state(), [binary()], [binary()]) ->
{ok, msg_storage_key()} | undefined. {ok, msg_storage_key()} | undefined.
@ -236,10 +249,10 @@ do_lookup_topic_key(Trie, State, [], Varying) ->
undefined -> undefined ->
undefined undefined
end; end;
do_lookup_topic_key(Trie, State, [Tok|Rest], Varying) -> do_lookup_topic_key(Trie, State, [Tok | Rest], Varying) ->
case trie_next(Trie, State, Tok) of case trie_next(Trie, State, Tok) of
{true, NextState} -> {true, NextState} ->
do_lookup_topic_key(Trie, NextState, Rest, [Tok|Varying]); do_lookup_topic_key(Trie, NextState, Rest, [Tok | Varying]);
{false, NextState} -> {false, NextState} ->
do_lookup_topic_key(Trie, NextState, Rest, Varying); do_lookup_topic_key(Trie, NextState, Rest, Varying);
undefined -> undefined ->
@ -249,21 +262,23 @@ do_lookup_topic_key(Trie, State, [Tok|Rest], Varying) ->
do_topic_key(Trie, _, _, State, [], Varying) -> do_topic_key(Trie, _, _, State, [], Varying) ->
{_, false, Static} = trie_next_(Trie, State, ?EOT), {_, false, Static} = trie_next_(Trie, State, ?EOT),
{Static, lists:reverse(Varying)}; {Static, lists:reverse(Varying)};
do_topic_key(Trie, ThresholdFun, Depth, State, [Tok|Rest], Varying0) -> do_topic_key(Trie, ThresholdFun, Depth, State, [Tok | Rest], Varying0) ->
Threshold = ThresholdFun(Depth), % TODO: it's not necessary to call it every time. % TODO: it's not necessary to call it every time.
Varying = case trie_next_(Trie, State, Tok) of Threshold = ThresholdFun(Depth),
Varying =
case trie_next_(Trie, State, Tok) of
{NChildren, _, _DiscardState} when is_integer(NChildren), NChildren > Threshold -> {NChildren, _, _DiscardState} when is_integer(NChildren), NChildren > Threshold ->
{_, NextState} = trie_insert(Trie, State, ?PLUS), {_, NextState} = trie_insert(Trie, State, ?PLUS),
[Tok|Varying0]; [Tok | Varying0];
{_, false, NextState} -> {_, false, NextState} ->
Varying0; Varying0;
{_, true, NextState} -> {_, true, NextState} ->
[Tok|Varying0] [Tok | Varying0]
end, end,
do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, Varying). do_topic_key(Trie, ThresholdFun, Depth + 1, NextState, Rest, Varying).
-spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} -spec trie_next_(trie(), state(), binary() | ?EOT) -> {New, Wildcard, state()} when
when New :: false | non_neg_integer(), New :: false | non_neg_integer(),
Wildcard :: boolean(). Wildcard :: boolean().
trie_next_(Trie, State, Token) -> trie_next_(Trie, State, Token) ->
case trie_next(Trie, State, Token) of case trie_next(Trie, State, Token) of
@ -278,19 +293,26 @@ trie_next_(Trie, State, Token) ->
%% erlfmt-ignore %% erlfmt-ignore
-spec emanating(trie(), state(), edge()) -> [{edge(), state()}]. -spec emanating(trie(), state(), edge()) -> [{edge(), state()}].
emanating(#trie{trie = Tab}, State, ?PLUS) -> emanating(#trie{trie = Tab}, State, ?PLUS) ->
ets:select(Tab, ets:fun2ms( ets:select(
Tab,
ets:fun2ms(
fun(#trans{key = {S, Edge}, next = Next}) when S == State -> fun(#trans{key = {S, Edge}, next = Next}) when S == State ->
{Edge, Next} {Edge, Next}
end)); end
)
);
emanating(#trie{trie = Tab}, State, ?EOT) -> emanating(#trie{trie = Tab}, State, ?EOT) ->
case ets:lookup(Tab, {State, ?EOT}) of case ets:lookup(Tab, {State, ?EOT}) of
[#trans{next = Next}] -> [{?EOT, Next}]; [#trans{next = Next}] -> [{?EOT, Next}];
[] -> [] [] -> []
end; end;
emanating(#trie{trie = Tab}, State, Bin) when is_binary(Bin) -> emanating(#trie{trie = Tab}, State, Bin) when is_binary(Bin) ->
[{Edge, Next} || #trans{key = {_, Edge}, next = Next} <- [
{Edge, Next}
|| #trans{key = {_, Edge}, next = Next} <-
ets:lookup(Tab, {State, ?PLUS}) ++ ets:lookup(Tab, {State, ?PLUS}) ++
ets:lookup(Tab, {State, Bin})]. ets:lookup(Tab, {State, Bin})
].
%%================================================================================ %%================================================================================
%% Tests %% Tests
@ -325,55 +347,70 @@ lookup_key_test() ->
{_, S1} = trie_insert(T, ?PREFIX, <<"foo">>), {_, S1} = trie_insert(T, ?PREFIX, <<"foo">>),
{_, S11} = trie_insert(T, S1, <<"foo">>), {_, S11} = trie_insert(T, S1, <<"foo">>),
%% Topics don't match until we insert ?EOT: %% Topics don't match until we insert ?EOT:
?assertMatch( undefined ?assertMatch(
, lookup_topic_key(T, [<<"foo">>]) undefined,
lookup_topic_key(T, [<<"foo">>])
), ),
?assertMatch( undefined ?assertMatch(
, lookup_topic_key(T, [<<"foo">>, <<"foo">>]) undefined,
lookup_topic_key(T, [<<"foo">>, <<"foo">>])
), ),
{_, S10} = trie_insert(T, S1, ?EOT), {_, S10} = trie_insert(T, S1, ?EOT),
{_, S110} = trie_insert(T, S11, ?EOT), {_, S110} = trie_insert(T, S11, ?EOT),
?assertMatch( {ok, {S10, []}} ?assertMatch(
, lookup_topic_key(T, [<<"foo">>]) {ok, {S10, []}},
lookup_topic_key(T, [<<"foo">>])
), ),
?assertMatch( {ok, {S110, []}} ?assertMatch(
, lookup_topic_key(T, [<<"foo">>, <<"foo">>]) {ok, {S110, []}},
lookup_topic_key(T, [<<"foo">>, <<"foo">>])
), ),
%% The rest of keys still don't match: %% The rest of keys still don't match:
?assertMatch( undefined ?assertMatch(
, lookup_topic_key(T, [<<"bar">>]) undefined,
lookup_topic_key(T, [<<"bar">>])
), ),
?assertMatch( undefined ?assertMatch(
, lookup_topic_key(T, [<<"bar">>, <<"foo">>]) undefined,
lookup_topic_key(T, [<<"bar">>, <<"foo">>])
). ).
wildcard_lookup_test() -> wildcard_lookup_test() ->
T = trie_create(), T = trie_create(),
{1, S1} = trie_insert(T, ?PREFIX, <<"foo">>), {1, S1} = trie_insert(T, ?PREFIX, <<"foo">>),
{0, S11} = trie_insert(T, S1, ?PLUS), %% Plus doesn't increase the number of children %% Plus doesn't increase the number of children
{0, S11} = trie_insert(T, S1, ?PLUS),
{1, S111} = trie_insert(T, S11, <<"foo">>), {1, S111} = trie_insert(T, S11, <<"foo">>),
{0, S1110} = trie_insert(T, S111, ?EOT), %% ?EOT doesn't increase the number of children %% ?EOT doesn't increase the number of children
?assertMatch( {ok, {S1110, [<<"bar">>]}} {0, S1110} = trie_insert(T, S111, ?EOT),
, lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"foo">>]) ?assertMatch(
{ok, {S1110, [<<"bar">>]}},
lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"foo">>])
), ),
?assertMatch( {ok, {S1110, [<<"quux">>]}} ?assertMatch(
, lookup_topic_key(T, [<<"foo">>, <<"quux">>, <<"foo">>]) {ok, {S1110, [<<"quux">>]}},
lookup_topic_key(T, [<<"foo">>, <<"quux">>, <<"foo">>])
), ),
?assertMatch( undefined ?assertMatch(
, lookup_topic_key(T, [<<"foo">>]) undefined,
lookup_topic_key(T, [<<"foo">>])
), ),
?assertMatch( undefined ?assertMatch(
, lookup_topic_key(T, [<<"foo">>, <<"bar">>]) undefined,
lookup_topic_key(T, [<<"foo">>, <<"bar">>])
), ),
?assertMatch( undefined ?assertMatch(
, lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"bar">>]) undefined,
lookup_topic_key(T, [<<"foo">>, <<"bar">>, <<"bar">>])
), ),
?assertMatch( undefined ?assertMatch(
, lookup_topic_key(T, [<<"bar">>, <<"foo">>, <<"foo">>]) undefined,
lookup_topic_key(T, [<<"bar">>, <<"foo">>, <<"foo">>])
), ),
{_, S10} = trie_insert(T, S1, ?EOT), {_, S10} = trie_insert(T, S1, ?EOT),
?assertMatch( {ok, {S10, []}} ?assertMatch(
, lookup_topic_key(T, [<<"foo">>]) {ok, {S10, []}},
lookup_topic_key(T, [<<"foo">>])
). ).
%% erlfmt-ignore %% erlfmt-ignore

View File

@ -18,31 +18,50 @@
-export([ -export([
list_shards/1, list_shards/1,
open_db/2, open_db/2,
message_store/3, store_batch/3,
get_streams/3, get_streams/3,
open_iterator/3, make_iterator/2,
next/3 next/2
]). ]).
%% internal exports: %% internal exports:
-export([ do_open_shard_v1/2, -export([
do_open_shard_v1/2,
do_get_streams_v1/3, do_get_streams_v1/3,
do_open_iterator_v1/3, do_make_iterator_v1/3,
do_next_v1/3 do_next_v1/3
]). ]).
-export_type([shard/0, stream/0, iterator/0, message_id/0]). -export_type([shard_id/0, stream/0, iterator/0, message_id/0]).
%%================================================================================ %%================================================================================
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-opaque stream() :: emqx_ds_storage_layer:stream(). -type db() :: binary().
-type shard() :: binary(). -type shard_id() :: binary().
-opaque iterator() :: emqx_ds_storage_layer:iterator(). %% This record enapsulates the stream entity from the replication
%% level.
%%
%% TODO: currently the stream is hardwired to only support the
%% internal rocksdb storage. In t he future we want to add another
%% implementations for emqx_ds, so this type has to take this into
%% account.
-record(stream, {
shard :: emqx_ds_replication_layer:shard_id(),
enc :: emqx_ds_replication_layer:stream()
}).
-opaque stream() :: stream().
-record(iterator, {
shard :: emqx_ds_replication_layer:shard_id(),
enc :: enqx_ds_replication_layer:iterator()
}).
-opaque iterator() :: #iterator{}.
-type message_id() :: emqx_ds_storage_layer:message_id(). -type message_id() :: emqx_ds_storage_layer:message_id().
@ -50,44 +69,71 @@
%% API functions %% API functions
%%================================================================================ %%================================================================================
-spec list_shards(emqx_ds:db()) -> [shard()]. -spec list_shards(emqx_ds:db()) -> [shard_id()].
list_shards(DB) -> list_shards(DB) ->
%% TODO: milestone 5 %% TODO: milestone 5
lists:map( lists:map(
fun(Node) -> fun(Node) ->
shard_id(DB, Node) shard_id(DB, Node)
end, end,
list_nodes()). list_nodes()
).
-spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok. -spec open_db(emqx_ds:db(), emqx_ds:create_db_opts()) -> ok | {error, _}.
open_db(DB, Opts) -> open_db(DB, Opts) ->
%% TODO: improve error reporting, don't just crash
lists:foreach( lists:foreach(
fun(Node) -> fun(Node) ->
Shard = shard_id(DB, Node), Shard = shard_id(DB, Node),
emqx_ds_proto_v1:open_shard(Node, Shard, Opts) ok = emqx_ds_proto_v1:open_shard(Node, Shard, Opts)
end, end,
list_nodes()). list_nodes()
).
-spec message_store(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> -spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
{ok, [message_id()]} | {error, _}. emqx_ds:store_batch_result().
message_store(DB, Msg, Opts) -> store_batch(DB, Msg, Opts) ->
%% TODO: milestone 5. Currently we store messages locally. %% TODO: Currently we store messages locally.
Shard = term_to_binary({DB, node()}), Shard = shard_id(DB, node()),
emqx_ds_storage_layer:message_store(Shard, Msg, Opts). emqx_ds_storage_layer:store_batch(Shard, Msg, Opts).
-spec get_streams(shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. -spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
get_streams(Shard, TopicFilter, StartTime) -> [{emqx_ds:stream_rank(), stream()}].
get_streams(DB, TopicFilter, StartTime) ->
Shards = emqx_ds_replication_layer:list_shards(DB),
lists:flatmap(
fun(Shard) ->
Node = node_of_shard(Shard), Node = node_of_shard(Shard),
emqx_ds_proto_v1:get_streams(Node, Shard, TopicFilter, StartTime). Streams = emqx_ds_proto_v1:get_streams(Node, Shard, TopicFilter, StartTime),
lists:map(
fun({RankY, Stream}) ->
RankX = erlang:phash2(Shard, 255),
Rank = {RankX, RankY},
{Rank, #stream{
shard = Shard,
enc = Stream
}}
end,
Streams
)
end,
Shards
).
-spec open_iterator(shard(), stream(), emqx_ds:time()) -> {ok, iterator()} | {error, _}. -spec make_iterator(stream(), emqx_ds:time()) -> emqx_ds:make_iterator_result(iterator()).
open_iterator(Shard, Stream, StartTime) -> make_iterator(Stream, StartTime) ->
#stream{shard = Shard, enc = StorageStream} = Stream,
Node = node_of_shard(Shard), Node = node_of_shard(Shard),
emqx_ds_proto_v1:open_iterator(Node, Shard, Stream, StartTime). case emqx_ds_proto_v1:make_iterator(Node, Shard, StorageStream, StartTime) of
{ok, Iter} ->
{ok, #iterator{shard = Shard, enc = Iter}};
Err = {error, _} ->
Err
end.
-spec next(shard(), iterator(), pos_integer()) -> -spec next(iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
{ok, iterator(), [emqx_types:message()]} | end_of_stream. next(Iter0, BatchSize) ->
next(Shard, Iter, BatchSize) -> #iterator{shard = Shard, enc = StorageIter0} = Iter0,
Node = node_of_shard(Shard), Node = node_of_shard(Shard),
%% TODO: iterator can contain information that is useful for %% TODO: iterator can contain information that is useful for
%% reconstructing messages sent over the network. For example, %% reconstructing messages sent over the network. For example,
@ -97,7 +143,13 @@ next(Shard, Iter, BatchSize) ->
%% %%
%% This kind of trickery should be probably done here in the %% This kind of trickery should be probably done here in the
%% replication layer. Or, perhaps, in the logic lary. %% replication layer. Or, perhaps, in the logic lary.
emqx_ds_proto_v1:next(Node, Shard, Iter, BatchSize). case emqx_ds_proto_v1:next(Node, Shard, StorageIter0, BatchSize) of
{ok, StorageIter, Batch} ->
Iter = #iterator{shard = Shard, enc = StorageIter},
{ok, Iter, Batch};
Other ->
Other
end.
%%================================================================================ %%================================================================================
%% behavior callbacks %% behavior callbacks
@ -107,35 +159,38 @@ next(Shard, Iter, BatchSize) ->
%% Internal exports (RPC targets) %% Internal exports (RPC targets)
%%================================================================================ %%================================================================================
-spec do_open_shard_v1(shard(), emqx_ds:create_db_opts()) -> ok. -spec do_open_shard_v1(shard_id(), emqx_ds:create_db_opts()) -> ok.
do_open_shard_v1(Shard, Opts) -> do_open_shard_v1(Shard, Opts) ->
emqx_ds_storage_layer_sup:ensure_shard(Shard, Opts). emqx_ds_storage_layer:open_shard(Shard, Opts).
-spec do_get_streams_v1(shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{emqx_ds:stream_rank(), stream()}]. [{integer(), _Stream}].
do_get_streams_v1(Shard, TopicFilter, StartTime) -> do_get_streams_v1(Shard, TopicFilter, StartTime) ->
error({todo, Shard, TopicFilter, StartTime}). emqx_ds_storage_layer:get_streams(Shard, TopicFilter, StartTime).
-spec do_open_iterator_v1(shard(), stream(), emqx_ds:time()) -> iterator(). -spec do_make_iterator_v1(shard_id(), _Stream, emqx_ds:time()) -> {ok, iterator()} | {error, _}.
do_open_iterator_v1(Shard, Stream, StartTime) -> do_make_iterator_v1(Shard, Stream, StartTime) ->
error({todo, Shard, Stream, StartTime}). emqx_ds_storage_layer:make_iterator(Shard, Stream, StartTime).
-spec do_next_v1(shard(), iterator(), non_neg_integer()) -> -spec do_next_v1(shard_id(), Iter, pos_integer()) -> emqx_ds:next_result(Iter).
{ok, iterator(), [emqx_types:message()]} | end_of_stream.
do_next_v1(Shard, Iter, BatchSize) -> do_next_v1(Shard, Iter, BatchSize) ->
error({todo, Shard, Iter, BatchSize}). emqx_ds_storage_layer:next(Shard, Iter, BatchSize).
%%================================================================================ %%================================================================================
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
add_shard_to_rank(Shard, RankY) ->
RankX = erlang:phash2(Shard, 255),
{RankX, RankY}.
shard_id(DB, Node) -> shard_id(DB, Node) ->
%% TODO: don't bake node name into the schema, don't repeat the %% TODO: don't bake node name into the schema, don't repeat the
%% Mnesia's 1M$ mistake. %% Mnesia's 1M$ mistake.
NodeBin = atom_to_binary(Node), NodeBin = atom_to_binary(Node),
<<DB/binary, ":", NodeBin/binary>>. <<DB/binary, ":", NodeBin/binary>>.
-spec node_of_shard(shard()) -> node(). -spec node_of_shard(shard_id()) -> node().
node_of_shard(ShardId) -> node_of_shard(ShardId) ->
[_DB, NodeBin] = binary:split(ShardId, <<":">>), [_DB, NodeBin] = binary:split(ShardId, <<":">>),
binary_to_atom(NodeBin). binary_to_atom(NodeBin).

View File

@ -1,332 +1,240 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_ds_storage_layer). -module(emqx_ds_storage_layer).
-behaviour(gen_server). -behaviour(gen_server).
%% API: %% Replication layer API:
-export([start_link/2]). -export([open_shard/2, store_batch/3, get_streams/3, make_iterator/3, next/3]).
-export([create_generation/3]).
-export([get_streams/3]). %% gen_server
-export([message_store/3]). -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export([delete/4]).
-export([make_iterator/2, next/1, next/2]). %% internal exports:
-export([]).
-export([ -export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]).
preserve_iterator/2,
restore_iterator/2,
discard_iterator/2,
ensure_iterator/3,
discard_iterator_prefix/2,
list_iterator_prefix/2,
foldl_iterator_prefix/4
]).
%% behaviour callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export_type([stream/0, cf_refs/0, gen_id/0, options/0, state/0, iterator/0]).
-export_type([db_options/0, db_write_options/0, db_read_options/0]).
-compile({inline, [meta_lookup/2]}).
-include_lib("emqx/include/emqx.hrl").
%%================================================================================ %%================================================================================
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-type stream() :: term(). %% Opaque term returned by the generation callback module -type shard_id() :: emqx_ds_replication_layer:shard_id().
-type options() :: #{
dir => file:filename()
}.
%% see rocksdb:db_options()
-type db_options() :: proplists:proplist().
%% see rocksdb:write_options()
-type db_write_options() :: proplists:proplist().
%% see rocksdb:read_options()
-type db_read_options() :: proplists:proplist().
-type cf_refs() :: [{string(), rocksdb:cf_handle()}]. -type cf_refs() :: [{string(), rocksdb:cf_handle()}].
%% Message storage generation -type gen_id() :: 0..16#ffff.
%% Keep in mind that instances of this type are persisted in long-term storage.
-type generation() :: #{ %% Note: this record might be stored permanently on a remote node.
%% Module that handles data for the generation -record(stream, {
generation :: gen_id(),
enc :: _EncapsultatedData,
misc = #{} :: map()
}).
-opaque stream() :: #stream{}.
%% Note: this record might be stored permanently on a remote node.
-record(it, {
generation :: gen_id(),
enc :: _EncapsultatedData,
misc = #{} :: map()
}).
-opaque iterator() :: #it{}.
%%%% Generation:
-type generation(Data) :: #{
%% Module that handles data for the generation:
module := module(), module := module(),
%% Module-specific data defined at generation creation time %% Module-specific data defined at generation creation time:
data := term(), data := Data,
%% When should this generation become active? %% When should this generation become active?
%% This generation should only contain messages timestamped no earlier than that. %% This generation should only contain messages timestamped no earlier than that.
%% The very first generation will have `since` equal 0. %% The very first generation will have `since` equal 0.
since := emqx_ds:time() since := emqx_ds:time(),
until := emqx_ds:time() | undefined
}. }.
-record(s, { %% Schema for a generation. Persistent term.
shard :: emqx_ds:shard(), -type generation_schema() :: generation(term()).
keyspace :: emqx_ds_conf:keyspace(),
db :: rocksdb:db_handle(),
cf_iterator :: rocksdb:cf_handle(),
cf_generations :: cf_refs()
}).
-record(it, { %% Runtime view of generation:
shard :: emqx_ds:shard(), -type generation() :: generation(term()).
gen :: gen_id(),
replay :: emqx_ds:replay(),
module :: module(),
data :: term()
}).
-type gen_id() :: 0..16#ffff. %%%% Shard:
-opaque state() :: #s{}. -type shard(GenData) :: #{
-opaque iterator() :: #it{}. current_generation := gen_id(),
default_generation_module := module(),
default_generation_config := term(),
{generation, gen_id()} => GenData
}.
%% Contents of the default column family: %% Shard schema (persistent):
%% -type shard_schema() :: shard(generation_schema()).
%% [{<<"genNN">>, #generation{}}, ...,
%% {<<"current">>, GenID}]
-define(DEFAULT_CF, "default"). %% Shard (runtime):
-define(DEFAULT_CF_OPTS, []). -type shard() :: shard(generation()).
-define(ITERATOR_CF, "$iterators"). %%================================================================================
%% Generation callbacks
%%================================================================================
%% TODO %% Create the new schema given generation id and the options.
%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`. %% Create rocksdb column families.
%% 2. Supposedly might be compressed _very_ effectively. -callback create(shard_id(), rocksdb:db_handle(), gen_id(), _Options) ->
%% 3. `inplace_update_support`? {_Schema, cf_refs()}.
-define(ITERATOR_CF_OPTS, []).
%% Open the existing schema
-callback open(shard_id(), rocsdb:db_handle(), gen_id(), cf_refs(), _Schema) ->
_Data.
-callback store_batch(shard_id(), _Data, [emqx_types:message()], emqx_ds:message_store_opts()) ->
ok.
-callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) ->
[_Stream].
-callback make_iterator(shard_id(), _Data, _Stream, emqx_ds:time()) ->
emqx_ds:make_iterator_result(_Iterator).
-callback next(shard_id(), _Data, Iter, pos_integer()) ->
{ok, Iter, [emqx_types:message()]} | {error, _}.
%%================================================================================
%% API for the replication layer
%%================================================================================
-spec open_shard(shard_id(), emqx_ds:create_db_opts()) -> ok.
open_shard(Shard, Options) ->
emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
-spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result().
store_batch(Shard, Messages, Options) ->
%% We always store messages in the current generation:
GenId = generation_current(Shard),
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
Mod:store_batch(Shard, GenData, Messages, Options).
-spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{integer(), stream()}].
get_streams(Shard, TopicFilter, StartTime) ->
Gens = generations_since(Shard, StartTime),
lists:flatmap(
fun(GenId) ->
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
[
{GenId, #stream{
generation = GenId,
enc = Stream
}}
|| Stream <- Streams
]
end,
Gens
).
-spec make_iterator(shard_id(), stream(), emqx_ds:time()) ->
emqx_ds:make_iterator_result(iterator()).
make_iterator(Shard, #stream{generation = GenId, enc = Stream}, StartTime) ->
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
case Mod:make_iterator(Shard, GenData, Stream, StartTime) of
{ok, Iter} ->
{ok, #it{
generation = GenId,
enc = Iter
}};
{error, _} = Err ->
Err
end.
-spec next(shard_id(), iterator(), pos_integer()) ->
emqx_ds:next_result(iterator()).
next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) ->
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
Current = generation_current(Shard),
case Mod:next(Shard, GenData, GenIter0, BatchSize) of
{ok, _GenIter, []} when GenId < Current ->
%% This is a past generation. Storage layer won't write
%% any more messages here. The iterator reached the end:
%% the stream has been fully replayed.
{ok, end_of_stream};
{ok, GenIter, Batch} ->
{ok, Iter#it{enc = GenIter}, Batch};
Error = {error, _} ->
Error
end.
%%================================================================================
%% gen_server for the shard
%%================================================================================
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
%%================================================================================ -spec start_link(emqx_ds:shard_id(), emqx_ds:create_db_opts()) ->
%% Callbacks
%%================================================================================
-callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) ->
{_Schema, cf_refs()}.
-callback open(
emqx_ds:shard(),
rocksdb:db_handle(),
gen_id(),
cf_refs(),
_Schema
) ->
_DB.
-callback store(
_DB,
_MessageID :: binary(),
emqx_ds:time(),
emqx_ds:topic(),
_Payload :: binary()
) ->
ok | {error, _}.
-callback delete(_DB, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) ->
ok | {error, _}.
-callback get_streams(_DB, emqx_ds:topic_filter(), emqx_ds:time()) ->
[_Stream].
-callback make_iterator(_DB, emqx_ds:replay()) ->
{ok, _It} | {error, _}.
-callback restore_iterator(_DB, _Serialized :: binary()) -> {ok, _It} | {error, _}.
-callback preserve_iterator(_It) -> term().
-callback next(It) -> {value, binary(), It} | none | {error, closed}.
%%================================================================================
%% API funcions
%%================================================================================
-spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
{ok, pid()}. {ok, pid()}.
start_link(Shard, Options) -> start_link(Shard, Options) ->
gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
-spec get_streams(emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [_Stream]. -record(s, {
get_streams(_ShardId, _TopicFilter, _StartTime) -> shard_id :: emqx_ds:shard_id(),
[]. db :: rocksdb:db_handle(),
cf_refs :: cf_refs(),
schema :: shard_schema(),
shard :: shard()
}).
-type server_state() :: #s{}.
-spec create_generation( -define(DEFAULT_CF, "default").
emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config() -define(DEFAULT_CF_OPTS, []).
) ->
{ok, gen_id()} | {error, nonmonotonic}.
create_generation(ShardId, Since, Config = {_Module, _Options}) ->
gen_server:call(?REF(ShardId), {create_generation, Since, Config}).
-spec message_store(emqx_ds:shard(), [emqx_types:message()], emqx_ds:message_store_opts()) -> init({ShardId, Options}) ->
{ok, _MessageId} | {error, _}.
message_store(Shard, Msgs, _Opts) ->
{ok, lists:map(
fun(Msg) ->
GUID = emqx_message:id(Msg),
Timestamp = Msg#message.timestamp,
{_GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, Timestamp),
Topic = emqx_topic:words(emqx_message:topic(Msg)),
Payload = serialize(Msg),
Mod:store(ModState, GUID, Timestamp, Topic, Payload),
GUID
end,
Msgs)}.
-spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) ->
ok | {error, _}.
delete(Shard, GUID, Time, Topic) ->
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
Mod:delete(Data, GUID, Time, Topic).
-spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(Shard, Replay = {_, StartTime}) ->
{GenId, Gen} = meta_lookup_gen(Shard, StartTime),
open_iterator(Gen, #it{
shard = Shard,
gen = GenId,
replay = Replay
}).
-spec next(iterator()) -> {ok, iterator(), [binary()]} | end_of_stream.
next(It = #it{}) ->
next(It, _BatchSize = 1).
-spec next(iterator(), pos_integer()) -> {ok, iterator(), [binary()]} | end_of_stream.
next(#it{data = {?MODULE, end_of_stream}}, _BatchSize) ->
end_of_stream;
next(
It = #it{shard = Shard, module = Mod, gen = Gen, data = {?MODULE, retry, Serialized}}, BatchSize
) ->
#{data := DBData} = meta_get_gen(Shard, Gen),
{ok, ItData} = Mod:restore_iterator(DBData, Serialized),
next(It#it{data = ItData}, BatchSize);
next(It = #it{}, BatchSize) ->
do_next(It, BatchSize, _Acc = []).
-spec do_next(iterator(), non_neg_integer(), [binary()]) ->
{ok, iterator(), [binary()]} | end_of_stream.
do_next(It, N, Acc) when N =< 0 ->
{ok, It, lists:reverse(Acc)};
do_next(It = #it{module = Mod, data = ItData}, N, Acc) ->
case Mod:next(ItData) of
{value, Bin, ItDataNext} ->
Val = deserialize(Bin),
do_next(It#it{data = ItDataNext}, N - 1, [Val | Acc]);
{error, _} = _Error ->
%% todo: log?
%% iterator might be invalid now; will need to re-open it.
Serialized = Mod:preserve_iterator(ItData),
{ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)};
none ->
case open_next_iterator(It) of
{ok, ItNext} ->
do_next(ItNext, N, Acc);
{error, _} = _Error ->
%% todo: log?
%% fixme: only bad options may lead to this?
%% return an "empty" iterator to be re-opened when retrying?
Serialized = Mod:preserve_iterator(ItData),
{ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)};
none ->
case Acc of
[] ->
end_of_stream;
_ ->
{ok, It#it{data = {?MODULE, end_of_stream}}, lists:reverse(Acc)}
end
end
end.
-spec preserve_iterator(iterator(), emqx_ds:iterator_id()) ->
ok | {error, _TODO}.
preserve_iterator(It = #it{}, IteratorID) ->
iterator_put_state(IteratorID, It).
-spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
{ok, iterator()} | {error, _TODO}.
restore_iterator(Shard, ReplayID) ->
case iterator_get_state(Shard, ReplayID) of
{ok, Serial} ->
restore_iterator_state(Shard, Serial);
not_found ->
{error, not_found};
{error, _Reason} = Error ->
Error
end.
-spec ensure_iterator(emqx_ds:shard(), emqx_ds:iterator_id(), emqx_ds:replay()) ->
{ok, iterator()} | {error, _TODO}.
ensure_iterator(Shard, IteratorID, Replay = {_TopicFilter, _StartMS}) ->
case restore_iterator(Shard, IteratorID) of
{ok, It} ->
{ok, It};
{error, not_found} ->
{ok, It} = make_iterator(Shard, Replay),
ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID),
{ok, It};
Error ->
Error
end.
-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
ok | {error, _TODO}.
discard_iterator(Shard, ReplayID) ->
iterator_delete(Shard, ReplayID).
-spec discard_iterator_prefix(emqx_ds:shard(), binary()) ->
ok | {error, _TODO}.
discard_iterator_prefix(Shard, KeyPrefix) ->
case do_discard_iterator_prefix(Shard, KeyPrefix) of
{ok, _} -> ok;
Error -> Error
end.
-spec list_iterator_prefix(
emqx_ds:shard(),
binary()
) -> {ok, [emqx_ds:iterator_id()]} | {error, _TODO}.
list_iterator_prefix(Shard, KeyPrefix) ->
do_list_iterator_prefix(Shard, KeyPrefix).
-spec foldl_iterator_prefix(
emqx_ds:shard(),
binary(),
fun((_Key :: binary(), _Value :: binary(), Acc) -> Acc),
Acc
) -> {ok, Acc} | {error, _TODO} when
Acc :: term().
foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc).
%%================================================================================
%% behaviour callbacks
%%================================================================================
init({Shard, Options}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, S0} = open_db(Shard, Options), erase_schema_runtime(ShardId),
S = ensure_current_generation(S0), {ok, DB, CFRefs0} = rocksdb_open(ShardId, Options),
ok = populate_metadata(S), {Schema, CFRefs} =
case get_schema_persistent(DB) of
not_found ->
create_new_shard_schema(ShardId, DB, CFRefs0, Options);
Scm ->
{Scm, CFRefs0}
end,
Shard = open_shard(ShardId, DB, CFRefs, Schema),
S = #s{
shard_id = ShardId,
db = DB,
cf_refs = CFRefs,
schema = Schema,
shard = Shard
},
commit_metadata(S),
{ok, S}. {ok, S}.
handle_call({create_generation, Since, Config}, _From, S) -> %% handle_call({create_generation, Since, Config}, _From, S) ->
case create_new_gen(Since, Config, S) of %% case create_new_gen(Since, Config, S) of
{ok, GenId, NS} -> %% {ok, GenId, NS} ->
{reply, {ok, GenId}, NS}; %% {reply, {ok, GenId}, NS};
{error, _} = Error -> %% {error, _} = Error ->
{reply, Error, S} %% {reply, Error, S}
end; %% end;
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}. {reply, {error, unknown_call}, S}.
@ -336,359 +244,156 @@ handle_cast(_Cast, S) ->
handle_info(_Info, S) -> handle_info(_Info, S) ->
{noreply, S}. {noreply, S}.
terminate(_Reason, #s{db = DB, shard = Shard}) -> terminate(_Reason, #s{db = DB, shard_id = ShardId}) ->
meta_erase(Shard), erase_schema_runtime(ShardId),
ok = rocksdb:close(DB). ok = rocksdb:close(DB).
%%================================================================================
%% Internal exports
%%================================================================================
%%================================================================================ %%================================================================================
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
-record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}). -spec open_shard(shard_id(), rocksdb:db_handle(), cf_refs(), shard_schema()) ->
shard().
open_shard(ShardId, DB, CFRefs, ShardSchema) ->
%% Transform generation schemas to generation runtime data:
maps:map(
fun
({generation, GenId}, GenSchema) ->
open_generation(ShardId, DB, CFRefs, GenId, GenSchema);
(_K, Val) ->
Val
end,
ShardSchema
).
-spec populate_metadata(state()) -> ok. -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) ->
populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) -> generation().
ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}), open_generation(ShardId, DB, CFRefs, GenId, GenSchema) ->
Current = schema_get_current(DBHandle), #{module := Mod, data := Schema} = GenSchema,
lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)). RuntimeData = Mod:open(ShardId, DB, GenId, CFRefs, Schema),
GenSchema#{data => RuntimeData}.
-spec populate_metadata(gen_id(), state()) -> ok. -spec create_new_shard_schema(shard_id(), rocksdb:db_handle(), cf_refs(), _Options) ->
populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) -> {shard_schema(), cf_refs()}.
Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S), create_new_shard_schema(ShardId, DB, CFRefs, _Options) ->
meta_register_gen(Shard, GenId, Gen). GenId = 1,
%% TODO: read from options/config
-spec ensure_current_generation(state()) -> state(). Mod = emqx_ds_storage_reference,
ensure_current_generation(S = #s{shard = _Shard, keyspace = Keyspace, db = DBHandle}) -> ModConfig = #{},
case schema_get_current(DBHandle) of {GenData, NewCFRefs} = Mod:create(ShardId, DB, GenId, ModConfig),
undefined -> GenSchema = #{module => Mod, data => GenData, since => 0, until => undefined},
Config = emqx_ds_conf:keyspace_config(Keyspace), ShardSchema = #{
{ok, _, NS} = create_new_gen(0, Config, S), current_generation => GenId,
NS; default_generation_module => Mod,
_GenId -> default_generation_confg => ModConfig,
S {generation, GenId} => GenSchema
end.
-spec create_new_gen(emqx_ds:time(), emqx_ds_conf:backend_config(), state()) ->
{ok, gen_id(), state()} | {error, nonmonotonic}.
create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) ->
GenId = get_next_id(meta_get_current(Shard)),
GenId = get_next_id(schema_get_current(DBHandle)),
case is_gen_valid(Shard, GenId, Since) of
ok ->
{ok, Gen, NS} = create_gen(GenId, Since, Config, S),
%% TODO: Transaction? Column family creation can't be transactional, anyway.
ok = schema_put_gen(DBHandle, GenId, Gen),
ok = schema_put_current(DBHandle, GenId),
ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)),
{ok, GenId, NS};
{error, _} = Error ->
Error
end.
-spec create_gen(gen_id(), emqx_ds:time(), emqx_ds_conf:backend_config(), state()) ->
{ok, generation(), state()}.
create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) ->
% TODO: Backend implementation should ensure idempotency.
{Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options),
Gen = #{
module => Module,
data => Schema,
since => Since
}, },
{ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. {ShardSchema, NewCFRefs ++ CFRefs}.
-spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. %% @doc Commit current state of the server to both rocksdb and the persistent term
open_db(Shard, Options) -> -spec commit_metadata(server_state()) -> ok.
commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB}) ->
ok = put_schema_persistent(DB, Schema),
put_schema_runtime(ShardId, Runtime).
-spec rocksdb_open(shard_id(), emqx_ds:create_db_opts()) ->
{ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}.
rocksdb_open(Shard, Options) ->
DefaultDir = binary_to_list(Shard), DefaultDir = binary_to_list(Shard),
DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)), DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)),
%% TODO: properly forward keyspace
Keyspace = maps:get(keyspace, Options, default_keyspace),
DBOptions = [ DBOptions = [
{create_if_missing, true}, {create_if_missing, true},
{create_missing_column_families, true} {create_missing_column_families, true}
| emqx_ds_conf:db_options(Keyspace) | maps:get(db_options, Options, [])
], ],
_ = filelib:ensure_dir(DBDir), _ = filelib:ensure_dir(DBDir),
ExistingCFs = ExistingCFs =
case rocksdb:list_column_families(DBDir, DBOptions) of case rocksdb:list_column_families(DBDir, DBOptions) of
{ok, CFs} -> {ok, CFs} ->
[{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF]; [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF];
% DB is not present. First start % DB is not present. First start
{error, {db_open, _}} -> {error, {db_open, _}} ->
[] []
end, end,
ColumnFamilies = [ ColumnFamilies = [
{?DEFAULT_CF, ?DEFAULT_CF_OPTS}, {?DEFAULT_CF, ?DEFAULT_CF_OPTS}
{?ITERATOR_CF, ?ITERATOR_CF_OPTS}
| ExistingCFs | ExistingCFs
], ],
case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of
{ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} -> {ok, DBHandle, [_CFDefault | CFRefs]} ->
{CFNames, _} = lists:unzip(ExistingCFs), {CFNames, _} = lists:unzip(ExistingCFs),
{ok, #s{ {ok, DBHandle, lists:zip(CFNames, CFRefs)};
shard = Shard,
keyspace = Keyspace,
db = DBHandle,
cf_iterator = CFIterator,
cf_generations = lists:zip(CFNames, CFRefs)
}};
Error -> Error ->
Error Error
end. end.
-spec open_gen(gen_id(), generation(), state()) -> generation(). %%--------------------------------------------------------------------------------
open_gen( %% Schema access
GenId, %%--------------------------------------------------------------------------------
Gen = #{module := Mod, data := Data},
#s{shard = Shard, db = DBHandle, cf_generations = CFs}
) ->
DB = Mod:open(Shard, DBHandle, GenId, CFs, Data),
Gen#{data := DB}.
-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none. -spec generation_current(shard_id()) -> gen_id().
open_next_iterator(It = #it{shard = Shard, gen = GenId}) -> generation_current(Shard) ->
open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}). #{current_generation := Current} = get_schema_runtime(Shard),
Current.
open_next_iterator(undefined, _It) -> -spec generation_get(shard_id(), gen_id()) -> generation().
none; generation_get(Shard, GenId) ->
open_next_iterator(Gen = #{}, It) -> #{{generation, GenId} := GenData} = get_schema_runtime(Shard),
open_iterator(Gen, It). GenData.
-spec open_iterator(generation(), iterator()) -> {ok, iterator()} | {error, _Reason}. -spec generations_since(shard_id(), emqx_ds:time()) -> [gen_id()].
open_iterator(#{module := Mod, data := Data}, It = #it{}) -> generations_since(Shard, Since) ->
Options = #{}, % TODO: passthrough options Schema = get_schema_runtime(Shard),
case Mod:make_iterator(Data, It#it.replay, Options) of maps:fold(
{ok, ItData} -> fun
{ok, It#it{module = Mod, data = ItData}}; ({generation, GenId}, #{until := Until}, Acc) when Until >= Since ->
Err -> [GenId | Acc];
Err (_K, _V, Acc) ->
end. Acc
-spec open_restore_iterator(generation(), iterator(), binary()) ->
{ok, iterator()} | {error, _Reason}.
open_restore_iterator(#{module := Mod, data := Data}, It = #it{}, Serial) ->
case Mod:restore_iterator(Data, Serial) of
{ok, ItData} ->
{ok, It#it{module = Mod, data = ItData}};
Err ->
Err
end.
%%
-define(KEY_REPLAY_STATE(IteratorId), <<(IteratorId)/binary, "rs">>).
-define(KEY_REPLAY_STATE_PAT(KeyReplayState), begin
<<IteratorId:(size(KeyReplayState) - 2)/binary, "rs">> = (KeyReplayState),
IteratorId
end).
-define(ITERATION_WRITE_OPTS, []).
-define(ITERATION_READ_OPTS, []).
iterator_get_state(Shard, ReplayID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS).
iterator_put_state(ID, It = #it{shard = Shard}) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
Serial = preserve_iterator_state(It),
rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS).
iterator_delete(Shard, ID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS).
preserve_iterator_state(#it{
gen = Gen,
replay = {TopicFilter, StartTime},
module = Mod,
data = ItData
}) ->
term_to_binary(#{
v => 1,
gen => Gen,
filter => TopicFilter,
start => StartTime,
st => Mod:preserve_iterator(ItData)
}).
restore_iterator_state(Shard, Serial) when is_binary(Serial) ->
restore_iterator_state(Shard, binary_to_term(Serial));
restore_iterator_state(
Shard,
#{
v := 1,
gen := Gen,
filter := TopicFilter,
start := StartTime,
st := State
}
) ->
It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}},
open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
do_list_iterator_prefix(Shard, KeyPrefix) ->
Fn = fun(K0, _V, Acc) ->
K = ?KEY_REPLAY_STATE_PAT(K0),
[K | Acc]
end, end,
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, []). [],
Schema
).
do_discard_iterator_prefix(Shard, KeyPrefix) -> -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}).
#db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db),
Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end,
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, ok).
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> -spec get_schema_runtime(shard_id()) -> shard().
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), get_schema_runtime(Shard) ->
case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of persistent_term:get(?PERSISTENT_TERM(Shard)).
{ok, It} ->
NextAction = {seek, KeyPrefix},
do_foldl_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction, Fn, Acc);
Error ->
Error
end.
do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction, Fn, Acc) -> -spec put_schema_runtime(shard_id(), shard()) -> ok.
case rocksdb:iterator_move(It, NextAction) of put_schema_runtime(Shard, RuntimeSchema) ->
{ok, K = <<KeyPrefix:(size(KeyPrefix))/binary, _/binary>>, V} -> persistent_term:put(?PERSISTENT_TERM(Shard), RuntimeSchema),
NewAcc = Fn(K, V, Acc), ok.
do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, next, Fn, NewAcc);
{ok, _K, _V} ->
ok = rocksdb:iterator_close(It),
{ok, Acc};
{error, invalid_iterator} ->
ok = rocksdb:iterator_close(It),
{ok, Acc};
Error ->
ok = rocksdb:iterator_close(It),
Error
end.
%% Functions for dealing with the metadata stored persistently in rocksdb -spec erase_schema_runtime(shard_id()) -> ok.
erase_schema_runtime(Shard) ->
-define(CURRENT_GEN, <<"current">>). persistent_term:erase(?PERSISTENT_TERM(Shard)),
-define(SCHEMA_WRITE_OPTS, []).
-define(SCHEMA_READ_OPTS, []).
-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation().
schema_get_gen(DBHandle, GenId) ->
{ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS),
binary_to_term(Bin).
-spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}.
schema_put_gen(DBHandle, GenId, Gen) ->
rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS).
-spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined.
schema_get_current(DBHandle) ->
case rocksdb:get(DBHandle, ?CURRENT_GEN, ?SCHEMA_READ_OPTS) of
{ok, Bin} ->
binary_to_integer(Bin);
not_found ->
undefined
end.
-spec schema_put_current(rocksdb:db_handle(), gen_id()) -> ok | {error, _}.
schema_put_current(DBHandle, GenId) ->
rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS).
-spec schema_gen_key(integer()) -> binary().
schema_gen_key(N) ->
<<"gen", N:32>>.
-undef(CURRENT_GEN).
-undef(SCHEMA_WRITE_OPTS).
-undef(SCHEMA_READ_OPTS).
%% Functions for dealing with the runtime shard metadata:
-define(PERSISTENT_TERM(SHARD, GEN), {?MODULE, SHARD, GEN}).
-spec meta_register_gen(emqx_ds:shard(), gen_id(), generation()) -> ok.
meta_register_gen(Shard, GenId, Gen) ->
Gs =
case GenId > 0 of
true -> meta_lookup(Shard, GenId - 1);
false -> []
end,
ok = meta_put(Shard, GenId, [Gen | Gs]),
ok = meta_put(Shard, current, GenId).
-spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}.
meta_lookup_gen(Shard, Time) ->
%% TODO
%% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
%% towards a "no".
Current = meta_lookup(Shard, current),
Gens = meta_lookup(Shard, Current),
find_gen(Time, Current, Gens).
find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
{GenId, Gen};
find_gen(Time, GenId, [_Gen | Rest]) ->
find_gen(Time, GenId - 1, Rest).
-spec meta_get_gen(emqx_ds:shard(), gen_id()) -> generation() | undefined.
meta_get_gen(Shard, GenId) ->
case meta_lookup(Shard, GenId, []) of
[Gen | _Older] -> Gen;
[] -> undefined
end.
-spec meta_get_current(emqx_ds:shard()) -> gen_id() | undefined.
meta_get_current(Shard) ->
meta_lookup(Shard, current, undefined).
-spec meta_lookup(emqx_ds:shard(), _K) -> _V.
meta_lookup(Shard, K) ->
persistent_term:get(?PERSISTENT_TERM(Shard, K)).
-spec meta_lookup(emqx_ds:shard(), _K, Default) -> _V | Default.
meta_lookup(Shard, K, Default) ->
persistent_term:get(?PERSISTENT_TERM(Shard, K), Default).
-spec meta_put(emqx_ds:shard(), _K, _V) -> ok.
meta_put(Shard, K, V) ->
persistent_term:put(?PERSISTENT_TERM(Shard, K), V).
-spec meta_erase(emqx_ds:shard()) -> ok.
meta_erase(Shard) ->
[
persistent_term:erase(K)
|| {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard
],
ok. ok.
-undef(PERSISTENT_TERM). -undef(PERSISTENT_TERM).
get_next_id(undefined) -> 0; -define(ROCKSDB_SCHEMA_KEY, <<"schema_v1">>).
get_next_id(GenId) -> GenId + 1.
is_gen_valid(Shard, GenId, Since) when GenId > 0 -> -spec get_schema_persistent(rocksdb:db_handle()) -> shard_schema() | not_found.
[GenPrev | _] = meta_lookup(Shard, GenId - 1), get_schema_persistent(DB) ->
case GenPrev of case rocksdb:get(DB, ?ROCKSDB_SCHEMA_KEY, []) of
#{since := SincePrev} when Since > SincePrev -> {ok, Blob} ->
ok; Schema = binary_to_term(Blob),
#{} -> %% Sanity check:
{error, nonmonotonic} #{current_generation := _, default_generation_module := _} = Schema,
end; Schema;
is_gen_valid(_Shard, 0, 0) -> not_found ->
ok. not_found
end.
serialize(Msg) -> -spec put_schema_persistent(rocksdb:db_handle(), shard_schema()) -> ok.
%% TODO: remove topic, GUID, etc. from the stored put_schema_persistent(DB, Schema) ->
%% message. Reconstruct it from the metadata. Blob = term_to_binary(Schema),
term_to_binary(emqx_message:to_map(Msg)). rocksdb:put(DB, ?ROCKSDB_SCHEMA_KEY, Blob, []).
deserialize(Bin) -> -undef(ROCKSDB_SCHEMA_KEY).
emqx_message:from_map(binary_to_term(Bin)).
%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
%% store_cfs(DBHandle, CFRefs) ->
%% lists:foreach(
%% fun({CFName, CFRef}) ->
%% persistent_term:put({self(), CFName}, {DBHandle, CFRef})
%% end,
%% CFRefs).

View File

@ -0,0 +1,714 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_storage_layer).
-behaviour(gen_server).
%% API:
-export([start_link/2]).
-export([create_generation/3]).
-export([open_shard/2, get_streams/3]).
-export([message_store/3]).
-export([delete/4]).
-export([make_iterator/3, next/1, next/2]).
-export([
preserve_iterator/2,
restore_iterator/2,
discard_iterator/2,
ensure_iterator/3,
discard_iterator_prefix/2,
list_iterator_prefix/2,
foldl_iterator_prefix/4
]).
%% gen_server callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export_type([stream/0, cf_refs/0, gen_id/0, options/0, state/0, iterator/0]).
-export_type([db_options/0, db_write_options/0, db_read_options/0]).
-compile({inline, [meta_lookup/2]}).
-include_lib("emqx/include/emqx.hrl").
%%================================================================================
%% Type declarations
%%================================================================================
-type options() :: #{
dir => file:filename()
}.
%% see rocksdb:db_options()
-type db_options() :: proplists:proplist().
%% see rocksdb:write_options()
-type db_write_options() :: proplists:proplist().
%% see rocksdb:read_options()
-type db_read_options() :: proplists:proplist().
-type cf_refs() :: [{string(), rocksdb:cf_handle()}].
%% Message storage generation
%% Keep in mind that instances of this type are persisted in long-term storage.
-type generation() :: #{
%% Module that handles data for the generation
module := module(),
%% Module-specific data defined at generation creation time
data := term(),
%% When should this generation become active?
%% This generation should only contain messages timestamped no earlier than that.
%% The very first generation will have `since` equal 0.
since := emqx_ds:time()
}.
-record(s, {
shard :: emqx_ds:shard(),
keyspace :: emqx_ds_conf:keyspace(),
db :: rocksdb:db_handle(),
cf_iterator :: rocksdb:cf_handle(),
cf_generations :: cf_refs()
}).
-record(stream,
{ generation :: gen_id()
, topic_filter :: emqx_ds:topic_filter()
, since :: emqx_ds:time()
, enc :: _EncapsultatedData
}).
-opaque stream() :: #stream{}.
-record(it, {
shard :: emqx_ds:shard(),
gen :: gen_id(),
replay :: emqx_ds:replay(),
module :: module(),
data :: term()
}).
-type gen_id() :: 0..16#ffff.
-opaque state() :: #s{}.
-opaque iterator() :: #it{}.
%% Contents of the default column family:
%%
%% [{<<"genNN">>, #generation{}}, ...,
%% {<<"current">>, GenID}]
-define(DEFAULT_CF, "default").
-define(DEFAULT_CF_OPTS, []).
-define(ITERATOR_CF, "$iterators").
%% TODO
%% 1. CuckooTable might be of use here / `OptimizeForPointLookup(...)`.
%% 2. Supposedly might be compressed _very_ effectively.
%% 3. `inplace_update_support`?
-define(ITERATOR_CF_OPTS, []).
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
%%================================================================================
%% Callbacks
%%================================================================================
-callback create_new(rocksdb:db_handle(), gen_id(), _Options :: term()) ->
{_Schema, cf_refs()}.
-callback open(
emqx_ds:shard(),
rocksdb:db_handle(),
gen_id(),
cf_refs(),
_Schema
) ->
_DB.
-callback store(
_DB,
_MessageID :: binary(),
emqx_ds:time(),
emqx_ds:topic(),
_Payload :: binary()
) ->
ok | {error, _}.
-callback delete(_DB, _MessageID :: binary(), emqx_ds:time(), emqx_ds:topic()) ->
ok | {error, _}.
-callback get_streams(_DB, emqx_ds:topic_filter(), emqx_ds:time()) ->
[{_TopicRankX, _Stream}].
-callback make_iterator(_DB, emqx_ds:replay()) ->
{ok, _It} | {error, _}.
-callback restore_iterator(_DB, _Serialized :: binary()) -> {ok, _It} | {error, _}.
-callback preserve_iterator(_It) -> term().
-callback next(It) -> {value, binary(), It} | none | {error, closed}.
%%================================================================================
%% Replication layer API
%%================================================================================
-spec open_shard(emqx_ds_replication_layer:shard(), emqx_ds_storage_layer:options()) -> ok.
open_shard(Shard, Options) ->
emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
-spec get_streams(emqx_ds:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), _Stream}].
get_streams(Shard, TopicFilter, StartTime) ->
%% TODO: lookup ALL generations
{GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, StartTime),
lists:map(
fun({RankX, ModStream}) ->
Stream = #stream{ generation = GenId
, topic_filter = TopicFilter
, since = StartTime
, enc = ModStream
},
Rank = {RankX, GenId},
{Rank, Stream}
end,
Mod:get_streams(ModState, TopicFilter, StartTime)).
-spec message_store(emqx_ds:shard(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
{ok, _MessageId} | {error, _}.
message_store(Shard, Msgs, _Opts) ->
{ok, lists:map(
fun(Msg) ->
GUID = emqx_message:id(Msg),
Timestamp = Msg#message.timestamp,
{_GenId, #{module := Mod, data := ModState}} = meta_lookup_gen(Shard, Timestamp),
Topic = emqx_topic:words(emqx_message:topic(Msg)),
Payload = serialize(Msg),
Mod:store(ModState, GUID, Timestamp, Topic, Payload),
GUID
end,
Msgs)}.
-spec next(iterator()) -> {ok, iterator(), [binary()]} | end_of_stream.
next(It = #it{}) ->
next(It, _BatchSize = 1).
-spec next(iterator(), pos_integer()) -> {ok, iterator(), [binary()]} | end_of_stream.
next(#it{data = {?MODULE, end_of_stream}}, _BatchSize) ->
end_of_stream;
next(
It = #it{shard = Shard, module = Mod, gen = Gen, data = {?MODULE, retry, Serialized}}, BatchSize
) ->
#{data := DBData} = meta_get_gen(Shard, Gen),
{ok, ItData} = Mod:restore_iterator(DBData, Serialized),
next(It#it{data = ItData}, BatchSize);
next(It = #it{}, BatchSize) ->
do_next(It, BatchSize, _Acc = []).
%%================================================================================
%% API functions
%%================================================================================
-spec create_generation(
emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()
) ->
{ok, gen_id()} | {error, nonmonotonic}.
create_generation(ShardId, Since, Config = {_Module, _Options}) ->
gen_server:call(?REF(ShardId), {create_generation, Since, Config}).
-spec delete(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic()) ->
ok | {error, _}.
delete(Shard, GUID, Time, Topic) ->
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
Mod:delete(Data, GUID, Time, Topic).
-spec make_iterator(emqx_ds:shard(), stream(), emqx_ds:time()) ->
{ok, iterator()} | {error, _TODO}.
make_iterator(Shard, Stream, StartTime) ->
#stream{ topic_filter = TopicFilter
, since = Since
, enc = Enc
} = Stream,
{GenId, Gen} = meta_lookup_gen(Shard, StartTime),
Replay = {TopicFilter, Since},
case Mod:make_iterator(Data, Replay, Options) of
#it{ gen = GenId,
replay = {TopicFilter, Since}
}.
-spec do_next(iterator(), non_neg_integer(), [binary()]) ->
{ok, iterator(), [binary()]} | end_of_stream.
do_next(It, N, Acc) when N =< 0 ->
{ok, It, lists:reverse(Acc)};
do_next(It = #it{module = Mod, data = ItData}, N, Acc) ->
case Mod:next(ItData) of
{value, Bin, ItDataNext} ->
Val = deserialize(Bin),
do_next(It#it{data = ItDataNext}, N - 1, [Val | Acc]);
{error, _} = _Error ->
%% todo: log?
%% iterator might be invalid now; will need to re-open it.
Serialized = Mod:preserve_iterator(ItData),
{ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)};
none ->
case open_next_iterator(It) of
{ok, ItNext} ->
do_next(ItNext, N, Acc);
{error, _} = _Error ->
%% todo: log?
%% fixme: only bad options may lead to this?
%% return an "empty" iterator to be re-opened when retrying?
Serialized = Mod:preserve_iterator(ItData),
{ok, It#it{data = {?MODULE, retry, Serialized}}, lists:reverse(Acc)};
none ->
case Acc of
[] ->
end_of_stream;
_ ->
{ok, It#it{data = {?MODULE, end_of_stream}}, lists:reverse(Acc)}
end
end
end.
-spec preserve_iterator(iterator(), emqx_ds:iterator_id()) ->
ok | {error, _TODO}.
preserve_iterator(It = #it{}, IteratorID) ->
iterator_put_state(IteratorID, It).
-spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
{ok, iterator()} | {error, _TODO}.
restore_iterator(Shard, ReplayID) ->
case iterator_get_state(Shard, ReplayID) of
{ok, Serial} ->
restore_iterator_state(Shard, Serial);
not_found ->
{error, not_found};
{error, _Reason} = Error ->
Error
end.
-spec ensure_iterator(emqx_ds:shard(), emqx_ds:iterator_id(), emqx_ds:replay()) ->
{ok, iterator()} | {error, _TODO}.
ensure_iterator(Shard, IteratorID, Replay = {_TopicFilter, _StartMS}) ->
case restore_iterator(Shard, IteratorID) of
{ok, It} ->
{ok, It};
{error, not_found} ->
{ok, It} = make_iterator(Shard, Replay),
ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID),
{ok, It};
Error ->
Error
end.
-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->
ok | {error, _TODO}.
discard_iterator(Shard, ReplayID) ->
iterator_delete(Shard, ReplayID).
-spec discard_iterator_prefix(emqx_ds:shard(), binary()) ->
ok | {error, _TODO}.
discard_iterator_prefix(Shard, KeyPrefix) ->
case do_discard_iterator_prefix(Shard, KeyPrefix) of
{ok, _} -> ok;
Error -> Error
end.
-spec list_iterator_prefix(
emqx_ds:shard(),
binary()
) -> {ok, [emqx_ds:iterator_id()]} | {error, _TODO}.
list_iterator_prefix(Shard, KeyPrefix) ->
do_list_iterator_prefix(Shard, KeyPrefix).
-spec foldl_iterator_prefix(
emqx_ds:shard(),
binary(),
fun((_Key :: binary(), _Value :: binary(), Acc) -> Acc),
Acc
) -> {ok, Acc} | {error, _TODO} when
Acc :: term().
foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc).
%%================================================================================
%% gen_server
%%================================================================================
-spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
{ok, pid()}.
start_link(Shard, Options) ->
gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
init({Shard, Options}) ->
process_flag(trap_exit, true),
{ok, S0} = do_open_db(Shard, Options),
S = ensure_current_generation(S0),
ok = populate_metadata(S),
{ok, S}.
handle_call({create_generation, Since, Config}, _From, S) ->
case create_new_gen(Since, Config, S) of
{ok, GenId, NS} ->
{reply, {ok, GenId}, NS};
{error, _} = Error ->
{reply, Error, S}
end;
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
handle_cast(_Cast, S) ->
{noreply, S}.
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, #s{db = DB, shard = Shard}) ->
meta_erase(Shard),
ok = rocksdb:close(DB).
%%================================================================================
%% Internal functions
%%================================================================================
-record(db, {handle :: rocksdb:db_handle(), cf_iterator :: rocksdb:cf_handle()}).
-spec populate_metadata(state()) -> ok.
populate_metadata(S = #s{shard = Shard, db = DBHandle, cf_iterator = CFIterator}) ->
ok = meta_put(Shard, db, #db{handle = DBHandle, cf_iterator = CFIterator}),
Current = schema_get_current(DBHandle),
lists:foreach(fun(GenId) -> populate_metadata(GenId, S) end, lists:seq(0, Current)).
-spec populate_metadata(gen_id(), state()) -> ok.
populate_metadata(GenId, S = #s{shard = Shard, db = DBHandle}) ->
Gen = open_gen(GenId, schema_get_gen(DBHandle, GenId), S),
meta_register_gen(Shard, GenId, Gen).
-spec ensure_current_generation(state()) -> state().
ensure_current_generation(S = #s{shard = _Shard, keyspace = Keyspace, db = DBHandle}) ->
case schema_get_current(DBHandle) of
undefined ->
Config = emqx_ds_conf:keyspace_config(Keyspace),
{ok, _, NS} = create_new_gen(0, Config, S),
NS;
_GenId ->
S
end.
-spec create_new_gen(emqx_ds:time(), emqx_ds_conf:backend_config(), state()) ->
{ok, gen_id(), state()} | {error, nonmonotonic}.
create_new_gen(Since, Config, S = #s{shard = Shard, db = DBHandle}) ->
GenId = get_next_id(meta_get_current(Shard)),
GenId = get_next_id(schema_get_current(DBHandle)),
case is_gen_valid(Shard, GenId, Since) of
ok ->
{ok, Gen, NS} = create_gen(GenId, Since, Config, S),
%% TODO: Transaction? Column family creation can't be transactional, anyway.
ok = schema_put_gen(DBHandle, GenId, Gen),
ok = schema_put_current(DBHandle, GenId),
ok = meta_register_gen(Shard, GenId, open_gen(GenId, Gen, NS)),
{ok, GenId, NS};
{error, _} = Error ->
Error
end.
-spec create_gen(gen_id(), emqx_ds:time(), emqx_ds_conf:backend_config(), state()) ->
{ok, generation(), state()}.
create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations = CFs}) ->
% TODO: Backend implementation should ensure idempotency.
{Schema, NewCFs} = Module:create_new(DBHandle, GenId, Options),
Gen = #{
module => Module,
data => Schema,
since => Since
},
{ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
-spec do_open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}.
do_open_db(Shard, Options) ->
DefaultDir = binary_to_list(Shard),
DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)),
%% TODO: properly forward keyspace
Keyspace = maps:get(keyspace, Options, default_keyspace),
DBOptions = [
{create_if_missing, true},
{create_missing_column_families, true}
| emqx_ds_conf:db_options(Keyspace)
],
_ = filelib:ensure_dir(DBDir),
ExistingCFs =
case rocksdb:list_column_families(DBDir, DBOptions) of
{ok, CFs} ->
[{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF];
% DB is not present. First start
{error, {db_open, _}} ->
[]
end,
ColumnFamilies = [
{?DEFAULT_CF, ?DEFAULT_CF_OPTS},
{?ITERATOR_CF, ?ITERATOR_CF_OPTS}
| ExistingCFs
],
case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of
{ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
{CFNames, _} = lists:unzip(ExistingCFs),
{ok, #s{
shard = Shard,
keyspace = Keyspace,
db = DBHandle,
cf_iterator = CFIterator,
cf_generations = lists:zip(CFNames, CFRefs)
}};
Error ->
Error
end.
-spec open_gen(gen_id(), generation(), state()) -> generation().
open_gen(
GenId,
Gen = #{module := Mod, data := Data},
#s{shard = Shard, db = DBHandle, cf_generations = CFs}
) ->
DB = Mod:open(Shard, DBHandle, GenId, CFs, Data),
Gen#{data := DB}.
-spec open_next_iterator(iterator()) -> {ok, iterator()} | {error, _Reason} | none.
open_next_iterator(It = #it{shard = Shard, gen = GenId}) ->
open_next_iterator(meta_get_gen(Shard, GenId + 1), It#it{gen = GenId + 1}).
open_next_iterator(undefined, _It) ->
none;
open_next_iterator(Gen = #{}, It) ->
open_iterator(Gen, It).
-spec open_restore_iterator(generation(), iterator(), binary()) ->
{ok, iterator()} | {error, _Reason}.
open_restore_iterator(#{module := Mod, data := Data}, It = #it{}, Serial) ->
case Mod:restore_iterator(Data, Serial) of
{ok, ItData} ->
{ok, It#it{module = Mod, data = ItData}};
Err ->
Err
end.
%%
-define(KEY_REPLAY_STATE(IteratorId), <<(IteratorId)/binary, "rs">>).
-define(KEY_REPLAY_STATE_PAT(KeyReplayState), begin
<<IteratorId:(size(KeyReplayState) - 2)/binary, "rs">> = (KeyReplayState),
IteratorId
end).
-define(ITERATION_WRITE_OPTS, []).
-define(ITERATION_READ_OPTS, []).
iterator_get_state(Shard, ReplayID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
rocksdb:get(Handle, CF, ?KEY_REPLAY_STATE(ReplayID), ?ITERATION_READ_OPTS).
iterator_put_state(ID, It = #it{shard = Shard}) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
Serial = preserve_iterator_state(It),
rocksdb:put(Handle, CF, ?KEY_REPLAY_STATE(ID), Serial, ?ITERATION_WRITE_OPTS).
iterator_delete(Shard, ID) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
rocksdb:delete(Handle, CF, ?KEY_REPLAY_STATE(ID), ?ITERATION_WRITE_OPTS).
preserve_iterator_state(#it{
gen = Gen,
replay = {TopicFilter, StartTime},
module = Mod,
data = ItData
}) ->
term_to_binary(#{
v => 1,
gen => Gen,
filter => TopicFilter,
start => StartTime,
st => Mod:preserve_iterator(ItData)
}).
restore_iterator_state(Shard, Serial) when is_binary(Serial) ->
restore_iterator_state(Shard, binary_to_term(Serial));
restore_iterator_state(
Shard,
#{
v := 1,
gen := Gen,
filter := TopicFilter,
start := StartTime,
st := State
}
) ->
It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}},
open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
do_list_iterator_prefix(Shard, KeyPrefix) ->
Fn = fun(K0, _V, Acc) ->
K = ?KEY_REPLAY_STATE_PAT(K0),
[K | Acc]
end,
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, []).
do_discard_iterator_prefix(Shard, KeyPrefix) ->
#db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db),
Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end,
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, ok).
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) ->
#db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db),
case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of
{ok, It} ->
NextAction = {seek, KeyPrefix},
do_foldl_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction, Fn, Acc);
Error ->
Error
end.
do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction, Fn, Acc) ->
case rocksdb:iterator_move(It, NextAction) of
{ok, K = <<KeyPrefix:(size(KeyPrefix))/binary, _/binary>>, V} ->
NewAcc = Fn(K, V, Acc),
do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, next, Fn, NewAcc);
{ok, _K, _V} ->
ok = rocksdb:iterator_close(It),
{ok, Acc};
{error, invalid_iterator} ->
ok = rocksdb:iterator_close(It),
{ok, Acc};
Error ->
ok = rocksdb:iterator_close(It),
Error
end.
%% Functions for dealing with the metadata stored persistently in rocksdb
-define(CURRENT_GEN, <<"current">>).
-define(SCHEMA_WRITE_OPTS, []).
-define(SCHEMA_READ_OPTS, []).
-spec schema_get_gen(rocksdb:db_handle(), gen_id()) -> generation().
schema_get_gen(DBHandle, GenId) ->
{ok, Bin} = rocksdb:get(DBHandle, schema_gen_key(GenId), ?SCHEMA_READ_OPTS),
binary_to_term(Bin).
-spec schema_put_gen(rocksdb:db_handle(), gen_id(), generation()) -> ok | {error, _}.
schema_put_gen(DBHandle, GenId, Gen) ->
rocksdb:put(DBHandle, schema_gen_key(GenId), term_to_binary(Gen), ?SCHEMA_WRITE_OPTS).
-spec schema_get_current(rocksdb:db_handle()) -> gen_id() | undefined.
schema_get_current(DBHandle) ->
case rocksdb:get(DBHandle, ?CURRENT_GEN, ?SCHEMA_READ_OPTS) of
{ok, Bin} ->
binary_to_integer(Bin);
not_found ->
undefined
end.
-spec schema_put_current(rocksdb:db_handle(), gen_id()) -> ok | {error, _}.
schema_put_current(DBHandle, GenId) ->
rocksdb:put(DBHandle, ?CURRENT_GEN, integer_to_binary(GenId), ?SCHEMA_WRITE_OPTS).
-spec schema_gen_key(integer()) -> binary().
schema_gen_key(N) ->
<<"gen", N:32>>.
-undef(CURRENT_GEN).
-undef(SCHEMA_WRITE_OPTS).
-undef(SCHEMA_READ_OPTS).
%% Functions for dealing with the runtime shard metadata:
-define(PERSISTENT_TERM(SHARD, GEN), {emqx_ds_storage_layer, SHARD, GEN}).
-spec meta_register_gen(emqx_ds:shard(), gen_id(), generation()) -> ok.
meta_register_gen(Shard, GenId, Gen) ->
Gs =
case GenId > 0 of
true -> meta_lookup(Shard, GenId - 1);
false -> []
end,
ok = meta_put(Shard, GenId, [Gen | Gs]),
ok = meta_put(Shard, current, GenId).
-spec meta_lookup_gen(emqx_ds:shard(), emqx_ds:time()) -> {gen_id(), generation()}.
meta_lookup_gen(Shard, Time) ->
%% TODO
%% Is cheaper persistent term GC on update here worth extra lookup? I'm leaning
%% towards a "no".
Current = meta_lookup(Shard, current),
Gens = meta_lookup(Shard, Current),
find_gen(Time, Current, Gens).
find_gen(Time, GenId, [Gen = #{since := Since} | _]) when Time >= Since ->
{GenId, Gen};
find_gen(Time, GenId, [_Gen | Rest]) ->
find_gen(Time, GenId - 1, Rest).
-spec meta_get_gen(emqx_ds:shard(), gen_id()) -> generation() | undefined.
meta_get_gen(Shard, GenId) ->
case meta_lookup(Shard, GenId, []) of
[Gen | _Older] -> Gen;
[] -> undefined
end.
-spec meta_get_current(emqx_ds:shard()) -> gen_id() | undefined.
meta_get_current(Shard) ->
meta_lookup(Shard, current, undefined).
-spec meta_lookup(emqx_ds:shard(), _K) -> _V.
meta_lookup(Shard, Key) ->
persistent_term:get(?PERSISTENT_TERM(Shard, Key)).
-spec meta_lookup(emqx_ds:shard(), _K, Default) -> _V | Default.
meta_lookup(Shard, K, Default) ->
persistent_term:get(?PERSISTENT_TERM(Shard, K), Default).
-spec meta_put(emqx_ds:shard(), _K, _V) -> ok.
meta_put(Shard, K, V) ->
persistent_term:put(?PERSISTENT_TERM(Shard, K), V).
-spec meta_erase(emqx_ds:shard()) -> ok.
meta_erase(Shard) ->
[
persistent_term:erase(K)
|| {K = ?PERSISTENT_TERM(Z, _), _} <- persistent_term:get(), Z =:= Shard
],
ok.
-undef(PERSISTENT_TERM).
get_next_id(undefined) -> 0;
get_next_id(GenId) -> GenId + 1.
is_gen_valid(Shard, GenId, Since) when GenId > 0 ->
[GenPrev | _] = meta_lookup(Shard, GenId - 1),
case GenPrev of
#{since := SincePrev} when Since > SincePrev ->
ok;
#{} ->
{error, nonmonotonic}
end;
is_gen_valid(_Shard, 0, 0) ->
ok.
serialize(Msg) ->
%% TODO: remove topic, GUID, etc. from the stored
%% message. Reconstruct it from the metadata.
term_to_binary(emqx_message:to_map(Msg)).
deserialize(Bin) ->
emqx_message:from_map(binary_to_term(Bin)).
%% -spec store_cfs(rocksdb:db_handle(), [{string(), rocksdb:cf_handle()}]) -> ok.
%% store_cfs(DBHandle, CFRefs) ->
%% lists:foreach(
%% fun({CFName, CFRef}) ->
%% persistent_term:put({self(), CFName}, {DBHandle, CFRef})
%% end,
%% CFRefs).

View File

@ -83,15 +83,11 @@
-export([create_new/3, open/5]). -export([create_new/3, open/5]).
-export([make_keymapper/1]). -export([make_keymapper/1]).
-export([store/5]). -export([store/5, delete/4]).
-export([delete/4]).
-export([get_streams/2]). -export([get_streams/3, make_iterator/3, next/1]).
-export([make_iterator/3, next/1]).
-export([preserve_iterator/1]). -export([preserve_iterator/1, restore_iterator/2, refresh_iterator/1]).
-export([restore_iterator/2]).
-export([refresh_iterator/1]).
%% Debug/troubleshooting: %% Debug/troubleshooting:
%% Keymappers %% Keymappers
@ -131,7 +127,7 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-opaque stream() :: singleton_stream. -opaque stream() :: emqx_ds:topic_filter().
-type topic() :: emqx_ds:topic(). -type topic() :: emqx_ds:topic().
-type topic_filter() :: emqx_ds:topic_filter(). -type topic_filter() :: emqx_ds:topic_filter().
@ -290,10 +286,10 @@ delete(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic
Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper), Key = make_message_key(Topic, PublishedAt, MessageID, DB#db.keymapper),
rocksdb:delete(DBHandle, CFHandle, Key, DB#db.write_options). rocksdb:delete(DBHandle, CFHandle, Key, DB#db.write_options).
-spec get_streams(db(), emqx_ds:reply()) -> -spec get_streams(db(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[stream()]. [stream()].
get_streams(_, _) -> get_streams(_, TopicFilter, _) ->
[singleton_stream]. [{0, TopicFilter}].
-spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> -spec make_iterator(db(), emqx_ds:replay(), iteration_options()) ->
% {error, invalid_start_time}? might just start from the beginning of time % {error, invalid_start_time}? might just start from the beginning of time

View File

@ -6,7 +6,7 @@
-behaviour(supervisor). -behaviour(supervisor).
%% API: %% API:
-export([start_link/0, start_shard/2, stop_shard/1]). -export([start_link/0, start_shard/2, stop_shard/1, ensure_shard/2]).
%% behaviour callbacks: %% behaviour callbacks:
-export([init/1]). -export([init/1]).

View File

@ -0,0 +1,136 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Reference implementation of the storage.
%%
%% Trivial, extremely slow and inefficient. It also doesn't handle
%% restart of the Erlang node properly, so obviously it's only to be
%% used for testing.
-module(emqx_ds_storage_reference).
-behavior(emqx_ds_storage_layer).
%% API:
-export([]).
%% behavior callbacks:
-export([create/4, open/5, store_batch/4, get_streams/4, make_iterator/4, next/4]).
%% internal exports:
-export([]).
-export_type([]).
-include_lib("emqx/include/emqx.hrl").
%%================================================================================
%% Type declarations
%%================================================================================
%% Permanent state:
-record(schema, {}).
%% Runtime state:
-record(s, {
db :: rocksdb:db_handle(),
cf :: rocksdb:cf_handle()
}).
-record(stream, {topic_filter :: emqx_ds:topic_filter()}).
-record(it, {
topic_filter :: emqx_ds:topic_filter(),
start_time :: emqx_ds:time(),
last_seen_message_key = first :: binary() | first
}).
%%================================================================================
%% API funcions
%%================================================================================
%%================================================================================
%% behavior callbacks
%%================================================================================
create(_ShardId, DBHandle, GenId, _Options) ->
CFName = data_cf(GenId),
{ok, CFHandle} = rocksdb:create_column_family(DBHandle, CFName, []),
Schema = #schema{},
{Schema, [{CFName, CFHandle}]}.
open(_Shard, DBHandle, GenId, CFRefs, #schema{}) ->
{_, CF} = lists:keyfind(data_cf(GenId), 1, CFRefs),
#s{db = DBHandle, cf = CF}.
store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
lists:foreach(
fun(Msg) ->
Id = erlang:unique_integer([monotonic]),
Key = <<Id:64>>,
Val = term_to_binary(Msg),
rocksdb:put(DB, CF, Key, Val, [])
end,
Messages
).
get_streams(_Shard, _Data, TopicFilter, _StartTime) ->
[#stream{topic_filter = TopicFilter}].
make_iterator(_Shard, _Data, #stream{topic_filter = TopicFilter}, StartTime) ->
{ok, #it{
topic_filter = TopicFilter,
start_time = StartTime
}}.
next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) ->
#it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0,
{ok, ITHandle} = rocksdb:iterator(DB, CF, []),
Action = case Key0 of
first ->
first;
_ ->
rocksdb:iterator_move(ITHandle, Key0),
next
end,
{Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []),
rocksdb:iterator_close(ITHandle),
It = It0#it{last_seen_message_key = Key},
{ok, It, lists:reverse(Messages)}.
%%================================================================================
%% Internal functions
%%================================================================================
do_next(_, _, _, _, 0, Key, Acc) ->
{Key, Acc};
do_next(TopicFilter, StartTime, IT, Action, NLeft, Key0, Acc) ->
case rocksdb:iterator_move(IT, Action) of
{ok, Key, Blob} ->
Msg = #message{topic = Topic, timestamp = TS} = binary_to_term(Blob),
case emqx_topic:match(Topic, TopicFilter) andalso TS >= StartTime of
true ->
do_next(TopicFilter, StartTime, IT, next, NLeft - 1, Key, [Msg | Acc]);
false ->
do_next(TopicFilter, StartTime, IT, next, NLeft, Key, Acc)
end;
{error, invalid_iterator} ->
{Key0, Acc}
end.
%% @doc Generate a column family ID for the MQTT messages
-spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
data_cf(GenId) ->
?MODULE_STRING ++ integer_to_list(GenId).

View File

@ -17,8 +17,9 @@
-behavior(emqx_bpapi). -behavior(emqx_bpapi).
-include_lib("emqx/include/bpapi.hrl").
%% API: %% API:
-export([]). -export([open_shard/3, get_streams/4, make_iterator/4, next/4]).
%% behavior callbacks: %% behavior callbacks:
-export([introduced_in/0]). -export([introduced_in/0]).
@ -27,23 +28,29 @@
%% API funcions %% API funcions
%%================================================================================ %%================================================================================
-spec create_shard(node(), emqx_ds_replication_layer:shard(), emqx_ds:create_db_opts()) -> -spec open_shard(node(), emqx_ds_replication_layer:shard(), emqx_ds:create_db_opts()) ->
ok. ok.
create_shard(Node, Shard, Opts) -> open_shard(Node, Shard, Opts) ->
erpc:call(Node, emqx_ds_replication_layer, do_create_shard_v1, [Shard, Opts]). erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [Shard, Opts]).
-spec get_streams(node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec get_streams(
[emqx_ds_replication_layer:stream()]. node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time()
) ->
[{integer(), emqx_ds_replication_layer:stream()}].
get_streams(Node, Shard, TopicFilter, Time) -> get_streams(Node, Shard, TopicFilter, Time) ->
erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]). erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [Shard, TopicFilter, Time]).
-spec open_iterator(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:stream(), emqx_ds:time()) -> -spec make_iterator(node(), emqx_ds_replication_layer:shard(), _Stream, emqx_ds:time()) ->
{ok, emqx_ds_replication_layer:iterator()} | {error, _}. {ok, emqx_ds_replication_layer:iterator()} | {error, _}.
open_iterator(Node, Shard, Stream, StartTime) -> make_iterator(Node, Shard, Stream, StartTime) ->
erpc:call(Node, emqx_ds_replication_layer, do_open_iterator_v1, [Shard, Stream, StartTime]). erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v1, [Shard, Stream, StartTime]).
-spec next(node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer()) -> -spec next(
{ok, emqx_ds_replication_layer:iterator(), [emqx_types:messages()]} | end_of_stream. node(), emqx_ds_replication_layer:shard(), emqx_ds_replication_layer:iterator(), pos_integer()
) ->
{ok, emqx_ds_replication_layer:iterator(), [emqx_types:messages()]}
| {ok, end_of_stream}
| {error, _}.
next(Node, Shard, Iter, BatchSize) -> next(Node, Shard, Iter, BatchSize) ->
erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [Shard, Iter, BatchSize]). erpc:call(Node, emqx_ds_replication_layer, do_next_v1, [Shard, Iter, BatchSize]).

View File

@ -0,0 +1,107 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
%% A simple smoke test that verifies that opening the DB doesn't crash
t_00_smoke_open(_Config) ->
?assertMatch(ok, emqx_ds:open_db(<<"DB1">>, #{})),
?assertMatch(ok, emqx_ds:open_db(<<"DB1">>, #{})).
%% A simple smoke test that verifies that storing the messages doesn't
%% crash
t_01_smoke_store(_Config) ->
DB = <<"default">>,
?assertMatch(ok, emqx_ds:open_db(DB, #{})),
Msg = message(<<"foo/bar">>, <<"foo">>, 0),
?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])).
%% A simple smoke test that verifies that getting the list of streams
%% doesn't crash and that iterators can be opened.
t_02_smoke_get_streams_start_iter(_Config) ->
DB = <<"default">>,
?assertMatch(ok, emqx_ds:open_db(DB, #{})),
StartTime = 0,
[{Rank, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime),
?assertMatch({_, _}, Rank),
?assertMatch({ok, _Iter}, emqx_ds:make_iterator(Stream, StartTime)).
%% A simple smoke test that verifies that it's possible to iterate
%% over messages.
t_03_smoke_iterate(_Config) ->
DB = atom_to_binary(?FUNCTION_NAME),
?assertMatch(ok, emqx_ds:open_db(DB, #{})),
StartTime = 0,
Msgs = [
message(<<"foo/bar">>, <<"1">>, 0),
message(<<"foo">>, <<"2">>, 1),
message(<<"bar/bar">>, <<"3">>, 2)
],
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
[{_, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime),
{ok, Iter0} = emqx_ds:make_iterator(Stream, StartTime),
{ok, Iter, Batch} = iterate(Iter0, 1),
?assertEqual(Msgs, Batch, {Iter0, Iter}).
message(Topic, Payload, PublishedAt) ->
#message{
topic = Topic,
payload = Payload,
timestamp = PublishedAt,
id = emqx_guid:gen()
}.
iterate(It, BatchSize) ->
iterate(It, BatchSize, []).
iterate(It0, BatchSize, Acc) ->
case emqx_ds:next(It0, BatchSize) of
{ok, It, []} ->
{ok, It, Acc};
{ok, It, Msgs} ->
iterate(It, BatchSize, Acc ++ Msgs);
Ret ->
Ret
end.
%% CT callbacks
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[mria, emqx_durable_storage],
#{work_dir => ?config(priv_dir, Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)),
ok.
init_per_testcase(_TC, Config) ->
snabbkaffe:fix_ct_logging(),
application:ensure_all_started(emqx_durable_storage),
Config.
end_per_testcase(_TC, _Config) ->
ok = application:stop(emqx_durable_storage).

View File

@ -1,4 +1,4 @@
#!/usr/bin/env elixir #! /usr/bin/env elixir
defmodule CheckElixirApplications do defmodule CheckElixirApplications do
alias EMQXUmbrella.MixProject alias EMQXUmbrella.MixProject

View File

@ -1,4 +1,4 @@
#!/usr/bin/env elixir #! /usr/bin/env elixir
# ensure we have a fresh rebar.lock # ensure we have a fresh rebar.lock

View File

@ -1,4 +1,4 @@
#!/usr/bin/env elixir #! /usr/bin/env elixir
defmodule CheckElixirEMQXMachineBootDiscrepancies do defmodule CheckElixirEMQXMachineBootDiscrepancies do
alias EMQXUmbrella.MixProject alias EMQXUmbrella.MixProject

View File

@ -1,4 +1,4 @@
#!/usr/bin/env elixir #! /usr/bin/env elixir
alias EMQXUmbrella.MixProject alias EMQXUmbrella.MixProject