From 6d65707d41df06219ba63b1c4e735c52444c7981 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 5 Oct 2023 01:43:35 +0200 Subject: [PATCH] refactor(ds): Implement drop_db function --- apps/emqx_durable_storage/src/emqx_ds.erl | 24 ++++++++++++---- .../src/emqx_ds_replication_layer.erl | 28 ++++++++++++++----- .../src/emqx_ds_storage_layer.erl | 18 ++++++++---- .../src/emqx_ds_storage_reference.erl | 15 +++++----- .../src/proto/emqx_ds_proto_v1.erl | 7 ++++- .../test/emqx_ds_SUITE.erl | 17 ++++++----- 6 files changed, 77 insertions(+), 32 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 6a20afbf1..293f2e531 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -22,7 +22,7 @@ -module(emqx_ds). %% Management API: --export([open_db/2]). +-export([open_db/2, drop_db/1]). %% Message storage API: -export([store_batch/1, store_batch/2, store_batch/3]). @@ -50,7 +50,7 @@ %% Type declarations %%================================================================================ --type db() :: emqx_ds_replication_layer:db(). +-type db() :: atom(). %% Parsed topic. -type topic() :: list(binary()). @@ -101,6 +101,12 @@ open_db(DB, Opts) -> emqx_ds_replication_layer:open_db(DB, Opts). +%% @doc TODO: currently if one or a few shards are down, they won't be +%% deleted. +-spec drop_db(db()) -> ok. +drop_db(DB) -> + emqx_ds_replication_layer:drop_db(DB). + -spec store_batch([emqx_types:message()]) -> store_batch_result(). store_batch(Msgs) -> store_batch(?DEFAULT_DB, Msgs, #{}). @@ -124,7 +130,15 @@ store_batch(DB, Msgs) -> %% reflects the notion that different topics can be stored %% differently, but hides the implementation details. %% -%% Rules: +%% While having to work with multiple iterators to replay a topic +%% filter may be cumbersome, it opens up some possibilities: +%% +%% 1. It's possible to parallelize replays +%% +%% 2. Streams can be shared between different clients to implement +%% shared subscriptions +%% +%% IMPORTANT RULES: %% %% 0. There is no 1-to-1 mapping between MQTT topics and streams. One %% stream can contain any number of MQTT topics. @@ -145,8 +159,8 @@ store_batch(DB, Msgs) -> %% equal, then the streams are independent. %% %% Stream is fully consumed when `next/3' function returns -%% `end_of_stream'. Then the client can proceed to replaying streams -%% that depend on the given one. +%% `end_of_stream'. Then and only then the client can proceed to +%% replaying streams that depend on the given one. -spec get_streams(db(), topic_filter(), time()) -> [{stream_rank(), stream()}]. get_streams(DB, TopicFilter, StartTime) -> emqx_ds_replication_layer:get_streams(DB, TopicFilter, StartTime). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 5d4749c30..e1c775d5a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -18,6 +18,7 @@ -export([ list_shards/1, open_db/2, + drop_db/1, store_batch/3, get_streams/3, make_iterator/2, @@ -27,6 +28,7 @@ %% internal exports: -export([ do_open_shard_v1/2, + do_drop_shard_v1/1, do_get_streams_v1/3, do_make_iterator_v1/3, do_next_v1/3 @@ -38,9 +40,9 @@ %% Type declarations %%================================================================================ --type db() :: binary(). +-type db() :: emqx_ds:db(). --type shard_id() :: binary(). +-type shard_id() :: {emqx_ds:db(), atom()}. %% This record enapsulates the stream entity from the replication %% level. @@ -90,6 +92,16 @@ open_db(DB, Opts) -> list_nodes() ). +-spec drop_db(emqx_ds:db()) -> ok | {error, _}. +drop_db(DB) -> + lists:foreach( + fun(Node) -> + Shard = shard_id(DB, Node), + ok = emqx_ds_proto_v1:drop_shard(Node, Shard) + end, + list_nodes() + ). + -spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). store_batch(DB, Msg, Opts) -> @@ -163,6 +175,10 @@ next(Iter0, BatchSize) -> do_open_shard_v1(Shard, Opts) -> emqx_ds_storage_layer:open_shard(Shard, Opts). +-spec do_drop_shard_v1(shard_id()) -> ok. +do_drop_shard_v1(Shard) -> + emqx_ds_storage_layer:drop_shard(Shard). + -spec do_get_streams_v1(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{integer(), _Stream}]. do_get_streams_v1(Shard, TopicFilter, StartTime) -> @@ -187,13 +203,11 @@ add_shard_to_rank(Shard, RankY) -> shard_id(DB, Node) -> %% TODO: don't bake node name into the schema, don't repeat the %% Mnesia's 1M$ mistake. - NodeBin = atom_to_binary(Node), - <>. + {DB, Node}. -spec node_of_shard(shard_id()) -> node(). -node_of_shard(ShardId) -> - [_DB, NodeBin] = binary:split(ShardId, <<":">>), - binary_to_atom(NodeBin). +node_of_shard({_DB, Node}) -> + Node. list_nodes() -> mria:running_nodes(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index fdd81a095..d531c5985 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -18,13 +18,13 @@ -behaviour(gen_server). %% Replication layer API: --export([open_shard/2, store_batch/3, get_streams/3, make_iterator/3, next/3]). +-export([open_shard/2, drop_shard/1, store_batch/3, get_streams/3, make_iterator/3, next/3]). %% gen_server -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: --export([]). +-export([drop_shard/1]). -export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]). @@ -124,6 +124,11 @@ open_shard(Shard, Options) -> emqx_ds_storage_layer_sup:ensure_shard(Shard, Options). +-spec drop_shard(shard_id()) -> ok. +drop_shard(Shard) -> + emqx_ds_storage_layer_sup:stop_shard(Shard), + ok = rocksdb:destroy(db_dir(Shard), []). + -spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). store_batch(Shard, Messages, Options) -> @@ -188,7 +193,7 @@ next(Shard, Iter = #it{generation = GenId, enc = GenIter0}, BatchSize) -> -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). --spec start_link(emqx_ds:shard_id(), emqx_ds:create_db_opts()) -> +-spec start_link(shard_id(), emqx_ds:create_db_opts()) -> {ok, pid()}. start_link(Shard, Options) -> gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). @@ -303,13 +308,12 @@ commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB -spec rocksdb_open(shard_id(), emqx_ds:create_db_opts()) -> {ok, rocksdb:db_handle(), cf_refs()} | {error, _TODO}. rocksdb_open(Shard, Options) -> - DefaultDir = binary_to_list(Shard), - DBDir = unicode:characters_to_list(maps:get(dir, Options, DefaultDir)), DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true} | maps:get(db_options, Options, []) ], + DBDir = db_dir(Shard), _ = filelib:ensure_dir(DBDir), ExistingCFs = case rocksdb:list_column_families(DBDir, DBOptions) of @@ -331,6 +335,10 @@ rocksdb_open(Shard, Options) -> Error end. +-spec db_dir(shard_id()) -> file:filename(). +db_dir({DB, ShardId}) -> + lists:flatten([atom_to_list(DB), $:, atom_to_list(ShardId)]). + %%-------------------------------------------------------------------------------- %% Schema access %%-------------------------------------------------------------------------------- diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 1fbad5f1b..c0fb29ceb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -98,13 +98,14 @@ make_iterator(_Shard, _Data, #stream{topic_filter = TopicFilter}, StartTime) -> next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, {ok, ITHandle} = rocksdb:iterator(DB, CF, []), - Action = case Key0 of - first -> - first; - _ -> - rocksdb:iterator_move(ITHandle, Key0), - next - end, + Action = + case Key0 of + first -> + first; + _ -> + rocksdb:iterator_move(ITHandle, Key0), + next + end, {Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []), rocksdb:iterator_close(ITHandle), It = It0#it{last_seen_message_key = Key}, diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index df3d64bc3..60671cef7 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -19,7 +19,7 @@ -include_lib("emqx/include/bpapi.hrl"). %% API: --export([open_shard/3, get_streams/4, make_iterator/4, next/4]). +-export([open_shard/3, drop_shard/2, get_streams/4, make_iterator/4, next/4]). %% behavior callbacks: -export([introduced_in/0]). @@ -33,6 +33,11 @@ open_shard(Node, Shard, Opts) -> erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [Shard, Opts]). +-spec drop_shard(node(), emqx_ds_replication_layer:shard()) -> + ok. +drop_shard(Node, Shard) -> + erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [Shard]). + -spec get_streams( node(), emqx_ds_replication_layer:shard(), emqx_ds:topic_filter(), emqx_ds:time() ) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index effe3b695..eabd03277 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -22,15 +22,18 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). -%% A simple smoke test that verifies that opening the DB doesn't crash -t_00_smoke_open(_Config) -> - ?assertMatch(ok, emqx_ds:open_db(<<"DB1">>, #{})), - ?assertMatch(ok, emqx_ds:open_db(<<"DB1">>, #{})). +%% A simple smoke test that verifies that opening/closing the DB +%% doesn't crash +t_00_smoke_open_drop(_Config) -> + DB = 'DB', + ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + ?assertMatch(ok, emqx_ds:open_db(DB, #{})), + ?assertMatch(ok, emqx_ds:drop_db(DB)). %% A simple smoke test that verifies that storing the messages doesn't %% crash t_01_smoke_store(_Config) -> - DB = <<"default">>, + DB = default, ?assertMatch(ok, emqx_ds:open_db(DB, #{})), Msg = message(<<"foo/bar">>, <<"foo">>, 0), ?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])). @@ -38,7 +41,7 @@ t_01_smoke_store(_Config) -> %% A simple smoke test that verifies that getting the list of streams %% doesn't crash and that iterators can be opened. t_02_smoke_get_streams_start_iter(_Config) -> - DB = <<"default">>, + DB = ?FUNCTION_NAME, ?assertMatch(ok, emqx_ds:open_db(DB, #{})), StartTime = 0, [{Rank, Stream}] = emqx_ds:get_streams(DB, ['#'], StartTime), @@ -48,7 +51,7 @@ t_02_smoke_get_streams_start_iter(_Config) -> %% A simple smoke test that verifies that it's possible to iterate %% over messages. t_03_smoke_iterate(_Config) -> - DB = atom_to_binary(?FUNCTION_NAME), + DB = ?FUNCTION_NAME, ?assertMatch(ok, emqx_ds:open_db(DB, #{})), StartTime = 0, Msgs = [