From c57c36adb222d90b3c46cf66280be8e8565a6d6c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 4 Apr 2024 14:05:12 -0300 Subject: [PATCH] feat(ds): clear all checkpoints when (re)starting storage layer Fixes https://emqx.atlassian.net/browse/EMQX-12143 --- .../src/emqx_ds_storage_layer.erl | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 28ce1d943..4981c3fc1 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -500,6 +500,7 @@ init({ShardId, Options}) -> process_flag(trap_exit, true), logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}), erase_schema_runtime(ShardId), + clear_all_checkpoints(ShardId), {ok, DB, CFRefs0} = rocksdb_open(ShardId, Options), {Schema, CFRefs} = case get_schema_persistent(DB) of @@ -567,6 +568,23 @@ terminate(_Reason, #s{db = DB, shard_id = ShardId}) -> %% Internal functions %%================================================================================ +-spec clear_all_checkpoints(shard_id()) -> ok. +clear_all_checkpoints(ShardId) -> + CheckpointBaseDir = checkpoints_dir(ShardId), + ok = filelib:ensure_path(CheckpointBaseDir), + {ok, AllFiles} = file:list_dir(CheckpointBaseDir), + CheckpointDirs = [Dir || Dir <- AllFiles, filelib:is_dir(Dir)], + lists:foreach( + fun(Dir) -> + logger:debug(#{ + msg => "ds_storage_deleting_previous_checkpoint", + dir => Dir + }), + ok = file:del_dir_r(Dir) + end, + CheckpointDirs + ). + -spec open_shard(shard_id(), rocksdb:db_handle(), cf_refs(), shard_schema()) -> shard(). open_shard(ShardId, DB, CFRefs, ShardSchema) -> @@ -777,9 +795,13 @@ rocksdb_open(Shard, Options) -> db_dir({DB, ShardId}) -> filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]). +-spec checkpoints_dir(shard_id()) -> file:filename(). +checkpoints_dir({DB, ShardId}) -> + filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId)]). + -spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename(). -checkpoint_dir({DB, ShardId}, Name) -> - filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId), Name]). +checkpoint_dir(ShardId, Name) -> + filename:join([checkpoints_dir(ShardId), Name]). -spec update_last_until(Schema, emqx_ds:time()) -> Schema | {error, exists | overlaps_existing_generations}