Merge pull request #12400 from thalesmg/ds-fix-data-dir-schema-m-20240125

fix(ds): don't make data dir part of the schema
This commit is contained in:
Thales Macedo Garitezi 2024-01-25 15:52:11 -03:00 committed by GitHub
commit 590746ddfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 13 additions and 17 deletions

View File

@ -61,16 +61,14 @@ force_ds() ->
emqx_config:get([session_persistence, force_persistence]). emqx_config:get([session_persistence, force_persistence]).
storage_backend(#{ storage_backend(#{
builtin := Opts = #{ builtin := #{
enable := true, enable := true,
n_shards := NShards, n_shards := NShards,
replication_factor := ReplicationFactor replication_factor := ReplicationFactor
} }
}) -> }) ->
DataDir = maps:get(data_dir, Opts, emqx:data_dir()),
#{ #{
backend => builtin, backend => builtin,
data_dir => DataDir,
storage => {emqx_ds_storage_bitfield_lts, #{}}, storage => {emqx_ds_storage_bitfield_lts, #{}},
n_shards => NShards, n_shards => NShards,
replication_factor => ReplicationFactor replication_factor => ReplicationFactor

View File

@ -1896,6 +1896,7 @@ fields("session_storage_backend_builtin") ->
string(), string(),
#{ #{
desc => ?DESC(session_builtin_data_dir), desc => ?DESC(session_builtin_data_dir),
mapping => "emqx_durable_storage.db_data_dir",
required => false, required => false,
importance => ?IMPORTANCE_LOW importance => ?IMPORTANCE_LOW
} }

View File

@ -36,7 +36,7 @@
-export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
%% internal exports: %% internal exports:
-export([db_dir/2]). -export([db_dir/1]).
-export_type([ -export_type([
gen_id/0, gen_id/0,
@ -52,6 +52,7 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(APP, emqx_durable_storage).
-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
%%================================================================================ %%================================================================================
@ -199,13 +200,7 @@ open_shard(Shard, Options) ->
-spec drop_shard(shard_id()) -> ok. -spec drop_shard(shard_id()) -> ok.
drop_shard(Shard) -> drop_shard(Shard) ->
case persistent_term:get({?MODULE, Shard, data_dir}, undefined) of ok = rocksdb:destroy(db_dir(Shard), []).
undefined ->
ok;
BaseDir ->
ok = rocksdb:destroy(db_dir(BaseDir, Shard), []),
persistent_term:erase({?MODULE, Shard, base_dir})
end.
-spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) -> -spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
emqx_ds:store_batch_result(). emqx_ds:store_batch_result().
@ -589,8 +584,7 @@ rocksdb_open(Shard, Options) ->
{enable_write_thread_adaptive_yield, false} {enable_write_thread_adaptive_yield, false}
| maps:get(db_options, Options, []) | maps:get(db_options, Options, [])
], ],
DataDir = maps:get(data_dir, Options, emqx:data_dir()), DBDir = db_dir(Shard),
DBDir = db_dir(DataDir, Shard),
_ = filelib:ensure_dir(DBDir), _ = filelib:ensure_dir(DBDir),
ExistingCFs = ExistingCFs =
case rocksdb:list_column_families(DBDir, DBOptions) of case rocksdb:list_column_families(DBDir, DBOptions) of
@ -606,16 +600,19 @@ rocksdb_open(Shard, Options) ->
], ],
case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of
{ok, DBHandle, [_CFDefault | CFRefs]} -> {ok, DBHandle, [_CFDefault | CFRefs]} ->
persistent_term:put({?MODULE, Shard, data_dir}, DataDir),
{CFNames, _} = lists:unzip(ExistingCFs), {CFNames, _} = lists:unzip(ExistingCFs),
{ok, DBHandle, lists:zip(CFNames, CFRefs)}; {ok, DBHandle, lists:zip(CFNames, CFRefs)};
Error -> Error ->
Error Error
end. end.
-spec db_dir(file:filename(), shard_id()) -> file:filename(). -spec db_dir(shard_id()) -> file:filename().
db_dir(BaseDir, {DB, ShardId}) -> db_dir({DB, ShardId}) ->
filename:join([BaseDir, atom_to_list(DB), binary_to_list(ShardId)]). filename:join([base_dir(), atom_to_list(DB), binary_to_list(ShardId)]).
-spec base_dir() -> file:filename().
base_dir() ->
application:get_env(?APP, db_data_dir, emqx:data_dir()).
-spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard(). -spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard().
update_last_until(Schema, Until) -> update_last_until(Schema, Until) ->