feat(ds): Dynamic backend registration

This commit is contained in:
ieQu1 2024-06-13 15:19:57 +02:00
parent 83dc8f4d77
commit 63f1856a2c
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
2 changed files with 28 additions and 15 deletions

View File

@ -23,8 +23,10 @@
%% Management API:
-export([
base_dir/0,
register_backend/2,
open_db/2,
close_db/1,
update_db_config/2,
add_generation/1,
list_generations_with_lifetimes/1,
@ -199,6 +201,8 @@
-callback open_db(db(), create_db_opts()) -> ok | {error, _}.
-callback close_db(db()) -> ok.
-callback add_generation(db()) -> ok | {error, _}.
-callback update_db_config(db(), create_db_opts()) -> ok | {error, _}.
@ -247,21 +251,26 @@
%% API functions
%%================================================================================
-spec base_dir() -> file:filename().
base_dir() ->
application:get_env(?APP, db_data_dir, emqx:data_dir()).
%% @doc Register DS backend.
-spec register_backend(atom(), module()) -> ok.
register_backend(Name, Module) ->
persistent_term:put({emqx_ds_backend_module, Name}, Module).
%% @doc Different DBs are completely independent from each other. They
%% could represent something like different tenants.
-spec open_db(db(), create_db_opts()) -> ok.
open_db(DB, Opts = #{backend := Backend}) when Backend =:= builtin orelse Backend =:= fdb ->
Module =
case Backend of
builtin -> emqx_ds_replication_layer;
fdb -> emqx_fdb_ds
end,
persistent_term:put(?persistent_term(DB), Module),
?module(DB):open_db(DB, Opts).
open_db(DB, Opts = #{backend := Backend}) ->
case persistent_term:get({emqx_ds_backend_module, Backend}, undefined) of
undefined ->
error({no_such_backend, Backend});
Module ->
persistent_term:put(?persistent_term(DB), Module),
?module(DB):open_db(DB, Opts)
end.
-spec close_db(db()) -> ok.
close_db(DB) ->
?module(DB):close_db(DB).
-spec add_generation(db()) -> ok.
add_generation(DB) ->

View File

@ -55,7 +55,7 @@
-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
%% internal exports:
-export([db_dir/1]).
-export([db_dir/1, base_dir/0]).
-export_type([
gen_id/0,
@ -889,13 +889,17 @@ rocksdb_open(Shard, Options) ->
Error
end.
-spec base_dir() -> file:filename().
base_dir() ->
application:get_env(?APP, db_data_dir, emqx:data_dir()).
-spec db_dir(shard_id()) -> file:filename().
db_dir({DB, ShardId}) ->
filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]).
filename:join([base_dir(), DB, binary_to_list(ShardId)]).
-spec checkpoints_dir(shard_id()) -> file:filename().
checkpoints_dir({DB, ShardId}) ->
filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId)]).
filename:join([base_dir(), DB, checkpoints, binary_to_list(ShardId)]).
-spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename().
checkpoint_dir(ShardId, Name) ->