feat(ds): allow to redefine directory for shard's rocksdb

This commit is contained in:
Andrew Mayorov 2023-07-06 17:42:24 +02:00
parent 7e76914599
commit ac56de9fc5
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 35 additions and 27 deletions

View File

@ -175,7 +175,7 @@
cf :: rocksdb:cf_handle(), cf :: rocksdb:cf_handle(),
keymapper :: keymapper(), keymapper :: keymapper(),
write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(), write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(),
read_options = [] :: emqx_ds_storage_layer:db_write_options() read_options = [] :: emqx_ds_storage_layer:db_read_options()
}). }).
-record(it, { -record(it, {

View File

@ -6,7 +6,7 @@
-behaviour(gen_server). -behaviour(gen_server).
%% API: %% API:
-export([start_link/1]). -export([start_link/2]).
-export([create_generation/3]). -export([create_generation/3]).
-export([store/5]). -export([store/5]).
@ -18,7 +18,8 @@
%% behaviour callbacks: %% behaviour callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export_type([cf_refs/0, gen_id/0, db_write_options/0, state/0, iterator/0]). -export_type([cf_refs/0, gen_id/0, options/0, state/0, iterator/0]).
-export_type([db_options/0, db_write_options/0, db_read_options/0]).
-compile({inline, [meta_lookup/2]}). -compile({inline, [meta_lookup/2]}).
@ -26,10 +27,16 @@
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
%% see rocksdb:db_options() -type options() :: #{
% -type options() :: proplists:proplist(). dir => file:filename()
}.
%% see rocksdb:db_options()
-type db_options() :: proplists:proplist().
%% see rocksdb:write_options()
-type db_write_options() :: proplists:proplist(). -type db_write_options() :: proplists:proplist().
%% see rocksdb:read_options()
-type db_read_options() :: proplists:proplist().
-type cf_refs() :: [{string(), rocksdb:cf_handle()}]. -type cf_refs() :: [{string(), rocksdb:cf_handle()}].
@ -110,18 +117,16 @@
%% API funcions %% API funcions
%%================================================================================ %%================================================================================
-spec start_link(emqx_ds:shard()) -> {ok, pid()}. -spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}.
start_link(Shard) -> start_link(Shard, Options) ->
gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []). gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
-spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) -> -spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) ->
{ok, gen_id()} | {error, nonmonotonic}. {ok, gen_id()} | {error, nonmonotonic}.
create_generation(Shard, Since, Config = {_Module, _Options}) -> create_generation(Shard, Since, Config = {_Module, _Options}) ->
gen_server:call(?REF(Shard), {create_generation, Since, Config}). gen_server:call(?REF(Shard), {create_generation, Since, Config}).
-spec store( -spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()
) ->
ok | {error, _}. ok | {error, _}.
store(Shard, GUID, Time, Topic, Msg) -> store(Shard, GUID, Time, Topic, Msg) ->
{_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
@ -181,9 +186,9 @@ discard_iterator(Shard, ReplayID) ->
%% behaviour callbacks %% behaviour callbacks
%%================================================================================ %%================================================================================
init([Shard]) -> init({Shard, Options}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
{ok, S0} = open_db(Shard), {ok, S0} = open_db(Shard, Options),
S = ensure_current_generation(S0), S = ensure_current_generation(S0),
ok = populate_metadata(S), ok = populate_metadata(S),
{ok, S}. {ok, S}.
@ -265,16 +270,17 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations
}, },
{ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}. {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
-spec open_db(emqx_ds:shard()) -> {ok, state()} | {error, _TODO}. -spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}.
open_db(Shard) -> open_db(Shard, Options) ->
Filename = binary_to_list(Shard), DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)),
DBOptions = [ DBOptions = [
{create_if_missing, true}, {create_if_missing, true},
{create_missing_column_families, true} {create_missing_column_families, true}
| emqx_ds_conf:db_options() | emqx_ds_conf:db_options()
], ],
_ = filelib:ensure_dir(DBDir),
ExistingCFs = ExistingCFs =
case rocksdb:list_column_families(Filename, DBOptions) of case rocksdb:list_column_families(DBDir, DBOptions) of
{ok, CFs} -> {ok, CFs} ->
[{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF]; [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF];
% DB is not present. First start % DB is not present. First start
@ -286,7 +292,7 @@ open_db(Shard) ->
{?ITERATOR_CF, ?ITERATOR_CF_OPTS} {?ITERATOR_CF, ?ITERATOR_CF_OPTS}
| ExistingCFs | ExistingCFs
], ],
case rocksdb:open(Filename, DBOptions, ColumnFamilies) of case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of
{ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} -> {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
{CFNames, _} = lists:unzip(ExistingCFs), {CFNames, _} = lists:unzip(ExistingCFs),
{ok, #s{ {ok, #s{

View File

@ -6,7 +6,7 @@
-behaviour(supervisor). -behaviour(supervisor).
%% API: %% API:
-export([start_link/0, start_shard/1, stop_shard/1]). -export([start_link/0, start_shard/2, stop_shard/1]).
%% behaviour callbacks: %% behaviour callbacks:
-export([init/1]). -export([init/1]).
@ -25,9 +25,10 @@
start_link() -> start_link() ->
supervisor:start_link({local, ?SUP}, ?MODULE, []). supervisor:start_link({local, ?SUP}, ?MODULE, []).
-spec start_shard(emqx_ds:shard()) -> supervisor:startchild_ret(). -spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
start_shard(Shard) -> supervisor:startchild_ret().
supervisor:start_child(?SUP, shard_child_spec(Shard)). start_shard(Shard, Options) ->
supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
-spec stop_shard(emqx_ds:shard()) -> ok | {error, _}. -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
stop_shard(Shard) -> stop_shard(Shard) ->
@ -51,11 +52,12 @@ init([]) ->
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
-spec shard_child_spec(emqx_ds:shard()) -> supervisor:child_spec(). -spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
shard_child_spec(Shard) -> supervisor:child_spec().
shard_child_spec(Shard, Options) ->
#{ #{
id => Shard, id => Shard,
start => {emqx_ds_storage_layer, start_link, [Shard]}, start => {emqx_ds_storage_layer, start_link, [Shard, Options]},
shutdown => 5_000, shutdown => 5_000,
restart => permanent, restart => permanent,
type => worker type => worker

View File

@ -33,7 +33,7 @@
%% Smoke test for opening and reopening the database %% Smoke test for opening and reopening the database
t_open(_Config) -> t_open(_Config) ->
ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD), ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
{ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD). {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}).
%% Smoke test of store function %% Smoke test of store function
t_store(_Config) -> t_store(_Config) ->
@ -263,7 +263,7 @@ end_per_suite(_Config) ->
init_per_testcase(TC, Config) -> init_per_testcase(TC, Config) ->
ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG), ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
{ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC)), {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}),
Config. Config.
end_per_testcase(TC, _Config) -> end_per_testcase(TC, _Config) ->