From ac56de9fc57b591b44fdc9618d8100d7d4c339c8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 6 Jul 2023 17:42:24 +0200 Subject: [PATCH] feat(ds): allow to redefine directory for shard's rocksdb --- .../src/emqx_ds_message_storage_bitmask.erl | 2 +- .../src/emqx_ds_storage_layer.erl | 40 +++++++++++-------- .../src/emqx_ds_storage_layer_sup.erl | 16 ++++---- .../test/emqx_ds_storage_layer_SUITE.erl | 4 +- 4 files changed, 35 insertions(+), 27 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index 5bb0423d5..57608e5cb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -175,7 +175,7 @@ cf :: rocksdb:cf_handle(), keymapper :: keymapper(), write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(), - read_options = [] :: emqx_ds_storage_layer:db_write_options() + read_options = [] :: emqx_ds_storage_layer:db_read_options() }). -record(it, { diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 43a399a1b..017423b02 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -6,7 +6,7 @@ -behaviour(gen_server). %% API: --export([start_link/1]). +-export([start_link/2]). -export([create_generation/3]). -export([store/5]). @@ -18,7 +18,8 @@ %% behaviour callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). --export_type([cf_refs/0, gen_id/0, db_write_options/0, state/0, iterator/0]). +-export_type([cf_refs/0, gen_id/0, options/0, state/0, iterator/0]). +-export_type([db_options/0, db_write_options/0, db_read_options/0]). -compile({inline, [meta_lookup/2]}). @@ -26,10 +27,16 @@ %% Type declarations %%================================================================================ -%% see rocksdb:db_options() -% -type options() :: proplists:proplist(). +-type options() :: #{ + dir => file:filename() +}. +%% see rocksdb:db_options() +-type db_options() :: proplists:proplist(). +%% see rocksdb:write_options() -type db_write_options() :: proplists:proplist(). +%% see rocksdb:read_options() +-type db_read_options() :: proplists:proplist(). -type cf_refs() :: [{string(), rocksdb:cf_handle()}]. @@ -110,18 +117,16 @@ %% API funcions %%================================================================================ --spec start_link(emqx_ds:shard()) -> {ok, pid()}. -start_link(Shard) -> - gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []). +-spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}. +start_link(Shard, Options) -> + gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). -spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) -> {ok, gen_id()} | {error, nonmonotonic}. create_generation(Shard, Since, Config = {_Module, _Options}) -> gen_server:call(?REF(Shard), {create_generation, Since, Config}). --spec store( - emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary() -) -> +-spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) -> ok | {error, _}. store(Shard, GUID, Time, Topic, Msg) -> {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), @@ -181,9 +186,9 @@ discard_iterator(Shard, ReplayID) -> %% behaviour callbacks %%================================================================================ -init([Shard]) -> +init({Shard, Options}) -> process_flag(trap_exit, true), - {ok, S0} = open_db(Shard), + {ok, S0} = open_db(Shard, Options), S = ensure_current_generation(S0), ok = populate_metadata(S), {ok, S}. @@ -265,16 +270,17 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations }, {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. --spec open_db(emqx_ds:shard()) -> {ok, state()} | {error, _TODO}. -open_db(Shard) -> - Filename = binary_to_list(Shard), +-spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}. +open_db(Shard, Options) -> + DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)), DBOptions = [ {create_if_missing, true}, {create_missing_column_families, true} | emqx_ds_conf:db_options() ], + _ = filelib:ensure_dir(DBDir), ExistingCFs = - case rocksdb:list_column_families(Filename, DBOptions) of + case rocksdb:list_column_families(DBDir, DBOptions) of {ok, CFs} -> [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF]; % DB is not present. First start @@ -286,7 +292,7 @@ open_db(Shard) -> {?ITERATOR_CF, ?ITERATOR_CF_OPTS} | ExistingCFs ], - case rocksdb:open(Filename, DBOptions, ColumnFamilies) of + case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} -> {CFNames, _} = lists:unzip(ExistingCFs), {ok, #s{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl index ed745df5f..56c8c760a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl @@ -6,7 +6,7 @@ -behaviour(supervisor). %% API: --export([start_link/0, start_shard/1, stop_shard/1]). +-export([start_link/0, start_shard/2, stop_shard/1]). %% behaviour callbacks: -export([init/1]). @@ -25,9 +25,10 @@ start_link() -> supervisor:start_link({local, ?SUP}, ?MODULE, []). --spec start_shard(emqx_ds:shard()) -> supervisor:startchild_ret(). -start_shard(Shard) -> - supervisor:start_child(?SUP, shard_child_spec(Shard)). +-spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> + supervisor:startchild_ret(). +start_shard(Shard, Options) -> + supervisor:start_child(?SUP, shard_child_spec(Shard, Options)). -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. stop_shard(Shard) -> @@ -51,11 +52,12 @@ init([]) -> %% Internal functions %%================================================================================ --spec shard_child_spec(emqx_ds:shard()) -> supervisor:child_spec(). -shard_child_spec(Shard) -> +-spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> + supervisor:child_spec(). +shard_child_spec(Shard, Options) -> #{ id => Shard, - start => {emqx_ds_storage_layer, start_link, [Shard]}, + start => {emqx_ds_storage_layer, start_link, [Shard, Options]}, shutdown => 5_000, restart => permanent, type => worker diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl index 46a1436bb..c5c227333 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl @@ -33,7 +33,7 @@ %% Smoke test for opening and reopening the database t_open(_Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD). + {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}). %% Smoke test of store function t_store(_Config) -> @@ -263,7 +263,7 @@ end_per_suite(_Config) -> init_per_testcase(TC, Config) -> ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG), - {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC)), + {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}), Config. end_per_testcase(TC, _Config) ->