refactor(ds): Implement drop_db function

This commit is contained in:
ieQu1 2023-10-05 01:43:35 +02:00
parent c6a721a7eb
commit 6d65707d41
6 changed files with 77 additions and 32 deletions

View File

@ -22,7 +22,7 @@
-module(emqx_ds). -module(emqx_ds).
%% Management API: %% Management API:
-export([open_db/2]). -export([open_db/2, drop_db/1]).
%% Message storage API: %% Message storage API:
-export([store_batch/1, store_batch/2, store_batch/3]). -export([store_batch/1, store_batch/2, store_batch/3]).
@ -50,7 +50,7 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-type db() :: emqx_ds_replication_layer:db(). -type db() :: atom().
%% Parsed topic. %% Parsed topic.
-type topic() :: list(binary()). -type topic() :: list(binary()).
@ -101,6 +101,12 @@
open_db(DB, Opts) -> open_db(DB, Opts) ->
emqx_ds_replication_layer:open_db(DB, Opts). emqx_ds_replication_layer:open_db(DB, Opts).
%% @doc TODO: currently if one or a few shards are down, they won't be
%% deleted.
-spec drop_db(db()) -> ok.
drop_db(DB) ->
emqx_ds_replication_layer:drop_db(DB).
-spec store_batch([emqx_types:message()]) -> store_batch_result(). -spec store_batch([emqx_types:message()]) -> store_batch_result().
store_batch(Msgs) -> store_batch(Msgs) ->
store_batch(?DEFAULT_DB, Msgs, #{}). store_batch(?DEFAULT_DB, Msgs, #{}).
@ -124,7 +130,15 @@ store_batch(DB, Msgs) ->
%% reflects the notion that different topics can be stored %% reflects the notion that different topics can be stored
%% differently, but hides the implementation details. %% differently, but hides the implementation details.
%% %%
%% Rules: %% While having to work with multiple iterators to replay a topic
%% filter may be cumbersome, it opens up some possibilities:
%%
%% 1. It's possible to parallelize replays
%%
%% 2. Streams can be shared between different clients to implement
%% shared subscriptions
%%
%% IMPORTANT RULES:
%% %%
%% 0. There is no 1-to-1 mapping between MQTT topics and streams. One %% 0. There is no 1-to-1 mapping between MQTT topics and streams. One
%% stream can contain any number of MQTT topics. %% stream can contain any number of MQTT topics.
@ -145,8 +159,8 @@ store_batch(DB, Msgs) ->
%% equal, then the streams are independent. %% equal, then the streams are independent.
%% %%
%% Stream is fully consumed when `next/3' function returns %% Stream is fully consumed when `next/3' function returns
%% `end_of_stream'. Then the client can proceed to replaying streams %% `end_of_stream'. Then and only then the client can proceed to
%% that depend on the given one. %% replaying streams that depend on the given one.
-spec get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}]. -spec get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}].
get_streams(DB, TopicFilter, StartTime) -> get_streams(DB, TopicFilter, StartTime) ->
emqx_ds_replication_layer:get_streams(DB, TopicFilter, StartTime). emqx_ds_replication_layer:get_streams(DB, TopicFilter, StartTime).

View File

@ -18,6 +18,7 @@
-export([ -export([
list_shards/1, list_shards/1,
open_db/2, open_db/2,
drop_db/1,
store_batch/3, store_batch/3,
get_streams/3, get_streams/3,
make_iterator/2, make_iterator/2,
@ -27,6 +28,7 @@
%% internal exports: %% internal exports:
-export([ -export([
do_open_shard_v1/2, do_open_shard_v1/2,
do_drop_shard_v1/1,
do_get_streams_v1/3, do_get_streams_v1/3,
do_make_iterator_v1/3, do_make_iterator_v1/3,
do_next_v1/3 do_next_v1/3
@ -38,9 +40,9 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-type db() :: binary(). -type db() :: emqx_ds:db().
-type shard_id() :: binary(). -type shard_id() :: {emqx_ds:db(), atom()}.
%% This record enapsulates the stream entity from the replication %% This record enapsulates the stream entity from the replication
%% level. %% level.
@ -90,6 +92,16 @@ open_db(DB, Opts) ->
list_nodes() list_nodes()
). ).
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
drop_db(DB) ->
lists:foreach(
fun(Node) ->
Shard = shard_id(DB, Node),
ok = emqx_ds_proto_v1:drop_shard(Node, Shard)
end,
list_nodes()
).
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> -spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(DB, Msg, Opts) -> store_batch(DB, Msg, Opts) ->
@ -163,6 +175,10 @@ next(Iter0, BatchSize) ->
do_open_shard_v1(Shard, Opts) -> do_open_shard_v1(Shard, Opts) ->
emqx_ds_storage_layer:open_shard(Shard, Opts). emqx_ds_storage_layer:open_shard(Shard, Opts).
-spec do_drop_shard_v1(shard_id()) -> ok.
do_drop_shard_v1(Shard) ->
emqx_ds_storage_layer:drop_shard(Shard).
-spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) ->
[{integer(), _Stream}]. [{integer(), _Stream}].
do_get_streams_v1(Shard, TopicFilter, StartTime) -> do_get_streams_v1(Shard, TopicFilter, StartTime) ->
@ -187,13 +203,11 @@ add_shard_to_rank(Shard, RankY) ->
shard_id(DB, Node) -> shard_id(DB, Node) ->
%% TODO: don't bake node name into the schema, don't repeat the %% TODO: don't bake node name into the schema, don't repeat the
%% Mnesia's 1M$ mistake. %% Mnesia's 1M$ mistake.
NodeBin = atom_to_binary(Node), {DB, Node}.
<<DB/binary, ":", NodeBin/binary>>.
-spec node_of_shard(shard_id()) -> node(). -spec node_of_shard(shard_id()) -> node().
node_of_shard(ShardId) -> node_of_shard({_DB, Node}) ->
[_DB, NodeBin] = binary:split(ShardId, <<":">>), Node.
binary_to_atom(NodeBin).
list_nodes() -> list_nodes() ->
mria:running_nodes(). mria:running_nodes().

View File

@ -18,13 +18,13 @@
-behaviour(gen_server). -behaviour(gen_server).
%% Replication layer API: %% Replication layer API:
-export([open_shard/2, store_batch/3, get_streams/3, make_iterator/3, next/3]). -export([open_shard/2, drop_shard/1, store_batch/3, get_streams/3, make_iterator/3, next/3]).
%% gen_server %% gen_server
-export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
%% internal exports: %% internal exports:
-export([]). -export([drop_shard/1]).
-export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]). -export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]).
@ -124,6 +124,11 @@
open_shard(Shard, Options) -> open_shard(Shard, Options) ->
emqx_ds_storage_layer_sup:ensure_shard(Shard, Options). emqx_ds_storage_layer_sup:ensure_shard(Shard, Options).
-spec drop_shard(shard_id()) -> ok.
drop_shard(Shard) ->
emqx_ds_storage_layer_sup:stop_shard(Shard),
ok = rocksdb:destroy(db_dir(Shard), []).
-spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) -> -spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
store_batch(Shard, Messages, Options) -> store_batch(Shard, Messages, Options) ->
@ -188,7 +193,7 @@ next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) ->
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
-spec start_link(emqx_ds:shard_id(), emqx_ds:create_db_opts()) -> -spec start_link(shard_id(), emqx_ds:create_db_opts()) ->
{ok, pid()}. {ok, pid()}.
start_link(Shard, Options) -> start_link(Shard, Options) ->
gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
@ -303,13 +308,12 @@ commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB
-spec rocksdb_open(shard_id(), emqx_ds:create_db_opts()) -> -spec rocksdb_open(shard_id(), emqx_ds:create_db_opts()) ->
{ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}. {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}.
rocksdb_open(Shard, Options) -> rocksdb_open(Shard, Options) ->
DefaultDir = binary_to_list(Shard),
DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)),
DBOptions = [ DBOptions = [
{create_if_missing, true}, {create_if_missing, true},
{create_missing_column_families, true} {create_missing_column_families, true}
| maps:get(db_options, Options, []) | maps:get(db_options, Options, [])
], ],
DBDir = db_dir(Shard),
_ = filelib:ensure_dir(DBDir), _ = filelib:ensure_dir(DBDir),
ExistingCFs = ExistingCFs =
case rocksdb:list_column_families(DBDir, DBOptions) of case rocksdb:list_column_families(DBDir, DBOptions) of
@ -331,6 +335,10 @@ rocksdb_open(Shard, Options) ->
Error Error
end. end.
-spec db_dir(shard_id()) -> file:filename().
db_dir({DB, ShardId}) ->
lists:flatten([atom_to_list(DB), $:, atom_to_list(ShardId)]).
%%-------------------------------------------------------------------------------- %%--------------------------------------------------------------------------------
%% Schema access %% Schema access
%%-------------------------------------------------------------------------------- %%--------------------------------------------------------------------------------

View File

@ -98,13 +98,14 @@ make_iterator(_Shard, _Data, #stream{topic_filter = TopicFilter}, StartTime) ->
next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) ->
#it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0,
{ok, ITHandle} = rocksdb:iterator(DB, CF, []), {ok, ITHandle} = rocksdb:iterator(DB, CF, []),
Action = case Key0 of Action =
first -> case Key0 of
first; first ->
_ -> first;
rocksdb:iterator_move(ITHandle, Key0), _ ->
next rocksdb:iterator_move(ITHandle, Key0),
end, next
end,
{Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []), {Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []),
rocksdb:iterator_close(ITHandle), rocksdb:iterator_close(ITHandle),
It = It0#it{last_seen_message_key = Key}, It = It0#it{last_seen_message_key = Key},

View File

@ -19,7 +19,7 @@
-include_lib("emqx/include/bpapi.hrl"). -include_lib("emqx/include/bpapi.hrl").
%% API: %% API:
-export([open_shard/3, get_streams/4, make_iterator/4, next/4]). -export([open_shard/3, drop_shard/2, get_streams/4, make_iterator/4, next/4]).
%% behavior callbacks: %% behavior callbacks:
-export([introduced_in/0]). -export([introduced_in/0]).
@ -33,6 +33,11 @@
open_shard(Node, Shard, Opts) -> open_shard(Node, Shard, Opts) ->
erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [Shard, Opts]). erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [Shard, Opts]).
-spec drop_shard(node(), emqx_ds_replication_layer:shard()) ->
ok.
drop_shard(Node, Shard) ->
erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [Shard]).
-spec get_streams( -spec get_streams(
node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time() node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time()
) -> ) ->

View File

@ -22,15 +22,18 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
%% A simple smoke test that verifies that opening the DB doesn't crash %% A simple smoke test that verifies that opening/closing the DB
t_00_smoke_open(_Config) -> %% doesn't crash
?assertMatch(ok, emqx_ds:open_db(<<"DB1">>, #{})), t_00_smoke_open_drop(_Config) ->
?assertMatch(ok, emqx_ds:open_db(<<"DB1">>, #{})). DB = 'DB',
?assertMatch(ok, emqx_ds:open_db(DB, #{})),
?assertMatch(ok, emqx_ds:open_db(DB, #{})),
?assertMatch(ok, emqx_ds:drop_db(DB)).
%% A simple smoke test that verifies that storing the messages doesn't %% A simple smoke test that verifies that storing the messages doesn't
%% crash %% crash
t_01_smoke_store(_Config) -> t_01_smoke_store(_Config) ->
DB = <<"default">>, DB = default,
?assertMatch(ok, emqx_ds:open_db(DB, #{})), ?assertMatch(ok, emqx_ds:open_db(DB, #{})),
Msg = message(<<"foo/bar">>, <<"foo">>, 0), Msg = message(<<"foo/bar">>, <<"foo">>, 0),
?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])). ?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])).
@ -38,7 +41,7 @@ t_01_smoke_store(_Config) ->
%% A simple smoke test that verifies that getting the list of streams %% A simple smoke test that verifies that getting the list of streams
%% doesn't crash and that iterators can be opened. %% doesn't crash and that iterators can be opened.
t_02_smoke_get_streams_start_iter(_Config) -> t_02_smoke_get_streams_start_iter(_Config) ->
DB = <<"default">>, DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, #{})), ?assertMatch(ok, emqx_ds:open_db(DB, #{})),
StartTime = 0, StartTime = 0,
[{Rank, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime), [{Rank, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime),
@ -48,7 +51,7 @@ t_02_smoke_get_streams_start_iter(_Config) ->
%% A simple smoke test that verifies that it's possible to iterate %% A simple smoke test that verifies that it's possible to iterate
%% over messages. %% over messages.
t_03_smoke_iterate(_Config) -> t_03_smoke_iterate(_Config) ->
DB = atom_to_binary(?FUNCTION_NAME), DB = ?FUNCTION_NAME,
?assertMatch(ok, emqx_ds:open_db(DB, #{})), ?assertMatch(ok, emqx_ds:open_db(DB, #{})),
StartTime = 0, StartTime = 0,
Msgs = [ Msgs = [