diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 2523d1c4f..81efa6c5b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -23,8 +23,10 @@ %% Management API: -export([ - base_dir/0, + register_backend/2, + open_db/2, + close_db/1, update_db_config/2, add_generation/1, list_generations_with_lifetimes/1, @@ -199,6 +201,8 @@ -callback open_db(db(), create_db_opts()) -> ok | {error, _}. +-callback close_db(db()) -> ok. + -callback add_generation(db()) -> ok | {error, _}. -callback update_db_config(db(), create_db_opts()) -> ok | {error, _}. @@ -247,21 +251,26 @@ %% API functions %%================================================================================ --spec base_dir() -> file:filename(). -base_dir() -> - application:get_env(?APP, db_data_dir, emqx:data_dir()). +%% @doc Register DS backend. +-spec register_backend(atom(), module()) -> ok. +register_backend(Name, Module) -> + persistent_term:put({emqx_ds_backend_module, Name}, Module). %% @doc Different DBs are completely independent from each other. They %% could represent something like different tenants. -spec open_db(db(), create_db_opts()) -> ok. -open_db(DB, Opts = #{backend := Backend}) when Backend =:= builtin orelse Backend =:= fdb -> - Module = - case Backend of - builtin -> emqx_ds_replication_layer; - fdb -> emqx_fdb_ds - end, - persistent_term:put(?persistent_term(DB), Module), - ?module(DB):open_db(DB, Opts). +open_db(DB, Opts = #{backend := Backend}) -> + case persistent_term:get({emqx_ds_backend_module, Backend}, undefined) of + undefined -> + error({no_such_backend, Backend}); + Module -> + persistent_term:put(?persistent_term(DB), Module), + ?module(DB):open_db(DB, Opts) + end. + +-spec close_db(db()) -> ok. +close_db(DB) -> + ?module(DB):close_db(DB). -spec add_generation(db()) -> ok. add_generation(DB) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index b6161d956..3ca2dcefd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -55,7 +55,7 @@ -export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: --export([db_dir/1]). +-export([db_dir/1, base_dir/0]). -export_type([ gen_id/0, @@ -889,13 +889,17 @@ rocksdb_open(Shard, Options) -> Error end. +-spec base_dir() -> file:filename(). +base_dir() -> + application:get_env(?APP, db_data_dir, emqx:data_dir()). + -spec db_dir(shard_id()) -> file:filename(). db_dir({DB, ShardId}) -> - filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]). + filename:join([base_dir(), DB, binary_to_list(ShardId)]). -spec checkpoints_dir(shard_id()) -> file:filename(). checkpoints_dir({DB, ShardId}) -> - filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId)]). + filename:join([base_dir(), DB, checkpoints, binary_to_list(ShardId)]). -spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename(). checkpoint_dir(ShardId, Name) ->