From fb39e1eacc88df8c6f0b2660932b694f3614c132 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 16 Jan 2024 17:10:07 -0300 Subject: [PATCH] feat(ds): allow customizing the data directory The storage expectations for the RocksDB DB may be different from our usual data directory. Also, it may consume a lot more storage than other data. This allows customizing the data directory for the builtin DS storage backend. Note: if the cluster was already initialized using a directory path, changing that config will have no effect. This path is currently persisted in mnesia and used when reopening the DB. --- apps/emqx/src/emqx_persistent_message.erl | 8 ++++- apps/emqx/src/emqx_schema.erl | 30 +++++++++++++++++++ apps/emqx_conf/src/emqx_conf_schema.erl | 18 ++--------- .../src/emqx_ds_storage_layer.erl | 20 +++++++++---- 4 files changed, 53 insertions(+), 23 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 295ddd3dc..d725c9b2c 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -61,10 +61,16 @@ force_ds() -> emqx_config:get([session_persistence, force_persistence]). storage_backend(#{ - builtin := #{enable := true, n_shards := NShards, replication_factor := ReplicationFactor} + builtin := #{ + enable := true, + data_dir := DataDir, + n_shards := NShards, + replication_factor := ReplicationFactor + } }) -> #{ backend => builtin, + data_dir => DataDir, storage => {emqx_ds_storage_bitfield_lts, #{}}, n_shards => NShards, replication_factor => ReplicationFactor diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ae22db14f..7cd67089d 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -94,6 +94,7 @@ non_empty_string/1, validations/0, naive_env_interpolation/1, + ensure_unicode_path/2, validate_server_ssl_opts/1, validate_tcp_keepalive/1, parse_tcp_keepalive/1 @@ -1882,6 +1883,18 @@ fields("session_storage_backend_builtin") -> default => true } )}, + {"data_dir", + sc( + string(), + #{ + desc => ?DESC(session_builtin_data_dir), + default => <<"${EMQX_DATA_DIR}">>, + importance => ?IMPORTANCE_LOW, + converter => fun(Path, Opts) -> + naive_env_interpolation(ensure_unicode_path(Path, Opts)) + end + } + )}, {"n_shards", sc( pos_integer(), @@ -3836,3 +3849,20 @@ tags_schema() -> importance => ?IMPORTANCE_LOW } ). + +ensure_unicode_path(undefined, _) -> + undefined; +ensure_unicode_path(Path, #{make_serializable := true}) -> + %% format back to serializable string + unicode:characters_to_binary(Path, utf8); +ensure_unicode_path(Path, Opts) when is_binary(Path) -> + case unicode:characters_to_list(Path, utf8) of + {R, _, _} when R =:= error orelse R =:= incomplete -> + throw({"bad_file_path_string", Path}); + PathStr -> + ensure_unicode_path(PathStr, Opts) + end; +ensure_unicode_path(Path, _) when is_list(Path) -> + Path; +ensure_unicode_path(Path, _) -> + throw({"not_string", Path}). diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index abb2e14e3..04f19b95f 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -1432,22 +1432,8 @@ convert_rotation(#{} = Rotation, _Opts) -> maps:get(<<"count">>, Rotation, 10); convert_rotation(Count, _Opts) when is_integer(Count) -> Count; convert_rotation(Count, _Opts) -> throw({"bad_rotation", Count}). -ensure_unicode_path(undefined, _) -> - undefined; -ensure_unicode_path(Path, #{make_serializable := true}) -> - %% format back to serializable string - unicode:characters_to_binary(Path, utf8); -ensure_unicode_path(Path, Opts) when is_binary(Path) -> - case unicode:characters_to_list(Path, utf8) of - {R, _, _} when R =:= error orelse R =:= incomplete -> - throw({"bad_file_path_string", Path}); - PathStr -> - ensure_unicode_path(PathStr, Opts) - end; -ensure_unicode_path(Path, _) when is_list(Path) -> - Path; -ensure_unicode_path(Path, _) -> - throw({"not_string", Path}). +ensure_unicode_path(Path, Opts) -> + emqx_schema:ensure_unicode_path(Path, Opts). log_level() -> hoconsc:enum([debug, info, notice, warning, error, critical, alert, emergency, all]). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index ab64005b6..d44235924 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -34,7 +34,7 @@ -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: --export([db_dir/1]). +-export([db_dir/2]). -export_type([ gen_id/0, @@ -168,7 +168,13 @@ open_shard(Shard, Options) -> -spec drop_shard(shard_id()) -> ok. drop_shard(Shard) -> catch emqx_ds_storage_layer_sup:stop_shard(Shard), - ok = rocksdb:destroy(db_dir(Shard), []). + case persistent_term:get({?MODULE, Shard, data_dir}, undefined) of + undefined -> + ok; + BaseDir -> + ok = rocksdb:destroy(db_dir(BaseDir, Shard), []), + persistent_term:erase({?MODULE, Shard, base_dir}) + end. -spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). @@ -424,7 +430,8 @@ rocksdb_open(Shard, Options) -> {create_missing_column_families, true} | maps:get(db_options, Options, []) ], - DBDir = db_dir(Shard), + DataDir = maps:get(data_dir, Options, emqx:data_dir()), + DBDir = db_dir(DataDir, Shard), _ = filelib:ensure_dir(DBDir), ExistingCFs = case rocksdb:list_column_families(DBDir, DBOptions) of @@ -440,15 +447,16 @@ rocksdb_open(Shard, Options) -> ], case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of {ok, DBHandle, [_CFDefault | CFRefs]} -> + persistent_term:put({?MODULE, Shard, data_dir}, DataDir), {CFNames, _} = lists:unzip(ExistingCFs), {ok, DBHandle, lists:zip(CFNames, CFRefs)}; Error -> Error end. --spec db_dir(shard_id()) -> file:filename(). -db_dir({DB, ShardId}) -> - filename:join([emqx:data_dir(), atom_to_list(DB), binary_to_list(ShardId)]). +-spec db_dir(file:filename(), shard_id()) -> file:filename(). +db_dir(BaseDir, {DB, ShardId}) -> + filename:join([BaseDir, atom_to_list(DB), binary_to_list(ShardId)]). -spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard(). update_last_until(Schema, Until) ->