feat(ds): clear all checkpoints when (re)starting storage layer
Fixes https://emqx.atlassian.net/browse/EMQX-12143
This commit is contained in:
parent
069cd4fbb4
commit
c57c36adb2
|
@ -500,6 +500,7 @@ init({ShardId, Options}) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}),
|
logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}),
|
||||||
erase_schema_runtime(ShardId),
|
erase_schema_runtime(ShardId),
|
||||||
|
clear_all_checkpoints(ShardId),
|
||||||
{ok, DB, CFRefs0} = rocksdb_open(ShardId, Options),
|
{ok, DB, CFRefs0} = rocksdb_open(ShardId, Options),
|
||||||
{Schema, CFRefs} =
|
{Schema, CFRefs} =
|
||||||
case get_schema_persistent(DB) of
|
case get_schema_persistent(DB) of
|
||||||
|
@ -567,6 +568,23 @@ terminate(_Reason, #s{db = DB, shard_id = ShardId}) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec clear_all_checkpoints(shard_id()) -> ok.
|
||||||
|
clear_all_checkpoints(ShardId) ->
|
||||||
|
CheckpointBaseDir = checkpoints_dir(ShardId),
|
||||||
|
ok = filelib:ensure_path(CheckpointBaseDir),
|
||||||
|
{ok, AllFiles} = file:list_dir(CheckpointBaseDir),
|
||||||
|
CheckpointDirs = [Dir || Dir <- AllFiles, filelib:is_dir(Dir)],
|
||||||
|
lists:foreach(
|
||||||
|
fun(Dir) ->
|
||||||
|
logger:debug(#{
|
||||||
|
msg => "ds_storage_deleting_previous_checkpoint",
|
||||||
|
dir => Dir
|
||||||
|
}),
|
||||||
|
ok = file:del_dir_r(Dir)
|
||||||
|
end,
|
||||||
|
CheckpointDirs
|
||||||
|
).
|
||||||
|
|
||||||
-spec open_shard(shard_id(), rocksdb:db_handle(), cf_refs(), shard_schema()) ->
|
-spec open_shard(shard_id(), rocksdb:db_handle(), cf_refs(), shard_schema()) ->
|
||||||
shard().
|
shard().
|
||||||
open_shard(ShardId, DB, CFRefs, ShardSchema) ->
|
open_shard(ShardId, DB, CFRefs, ShardSchema) ->
|
||||||
|
@ -777,9 +795,13 @@ rocksdb_open(Shard, Options) ->
|
||||||
db_dir({DB, ShardId}) ->
|
db_dir({DB, ShardId}) ->
|
||||||
filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]).
|
filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]).
|
||||||
|
|
||||||
|
-spec checkpoints_dir(shard_id()) -> file:filename().
|
||||||
|
checkpoints_dir({DB, ShardId}) ->
|
||||||
|
filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId)]).
|
||||||
|
|
||||||
-spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename().
|
-spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename().
|
||||||
checkpoint_dir({DB, ShardId}, Name) ->
|
checkpoint_dir(ShardId, Name) ->
|
||||||
filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId), Name]).
|
filename:join([checkpoints_dir(ShardId), Name]).
|
||||||
|
|
||||||
-spec update_last_until(Schema, emqx_ds:time()) ->
|
-spec update_last_until(Schema, emqx_ds:time()) ->
|
||||||
Schema | {error, exists | overlaps_existing_generations}
|
Schema | {error, exists | overlaps_existing_generations}
|
||||||
|
|
Loading…
Reference in New Issue