From 8e31afe6c2f8db8d3d21d591b275f21037e017af Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 25 Jan 2024 13:44:27 -0300 Subject: [PATCH] fix(ds): don't make data dir part of the schema The data directory was ending up being persisted in the database schema. This led to issues when opening the DB on different nodes. --- apps/emqx/src/emqx_persistent_message.erl | 4 +-- apps/emqx/src/emqx_schema.erl | 1 + .../src/emqx_ds_storage_layer.erl | 25 ++++++++----------- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index effad17dd..b178a742c 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -61,16 +61,14 @@ force_ds() -> emqx_config:get([session_persistence, force_persistence]). storage_backend(#{ - builtin := Opts = #{ + builtin := #{ enable := true, n_shards := NShards, replication_factor := ReplicationFactor } }) -> - DataDir = maps:get(data_dir, Opts, emqx:data_dir()), #{ backend => builtin, - data_dir => DataDir, storage => {emqx_ds_storage_bitfield_lts, #{}}, n_shards => NShards, replication_factor => ReplicationFactor diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index b03cfe72e..afbe2cfa7 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1896,6 +1896,7 @@ fields("session_storage_backend_builtin") -> string(), #{ desc => ?DESC(session_builtin_data_dir), + mapping => "emqx_durable_storage.db_data_dir", required => false, importance => ?IMPORTANCE_LOW } diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 85a94a846..8f4b2afc6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -36,7 +36,7 @@ -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: --export([db_dir/2]). +-export([db_dir/1]). -export_type([ gen_id/0, @@ -52,6 +52,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-define(APP, emqx_durable_storage). -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). %%================================================================================ @@ -199,13 +200,7 @@ open_shard(Shard, Options) -> -spec drop_shard(shard_id()) -> ok. drop_shard(Shard) -> - case persistent_term:get({?MODULE, Shard, data_dir}, undefined) of - undefined -> - ok; - BaseDir -> - ok = rocksdb:destroy(db_dir(BaseDir, Shard), []), - persistent_term:erase({?MODULE, Shard, base_dir}) - end. + ok = rocksdb:destroy(db_dir(Shard), []). -spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). @@ -589,8 +584,7 @@ rocksdb_open(Shard, Options) -> {enable_write_thread_adaptive_yield, false} | maps:get(db_options, Options, []) ], - DataDir = maps:get(data_dir, Options, emqx:data_dir()), - DBDir = db_dir(DataDir, Shard), + DBDir = db_dir(Shard), _ = filelib:ensure_dir(DBDir), ExistingCFs = case rocksdb:list_column_families(DBDir, DBOptions) of @@ -606,16 +600,19 @@ rocksdb_open(Shard, Options) -> ], case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of {ok, DBHandle, [_CFDefault | CFRefs]} -> - persistent_term:put({?MODULE, Shard, data_dir}, DataDir), {CFNames, _} = lists:unzip(ExistingCFs), {ok, DBHandle, lists:zip(CFNames, CFRefs)}; Error -> Error end. --spec db_dir(file:filename(), shard_id()) -> file:filename(). -db_dir(BaseDir, {DB, ShardId}) -> - filename:join([BaseDir, atom_to_list(DB), binary_to_list(ShardId)]). +-spec db_dir(shard_id()) -> file:filename(). +db_dir({DB, ShardId}) -> + filename:join([base_dir(), atom_to_list(DB), binary_to_list(ShardId)]). + +-spec base_dir() -> file:filename(). +base_dir() -> + application:get_env(?APP, db_data_dir, emqx:data_dir()). -spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard(). update_last_until(Schema, Until) ->