diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 11c809dbd..42b4b87b9 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -479,10 +479,10 @@ shards_of_batch(_DB, [], Acc) -> %% TODO %% There's a possibility of race condition: storage may shut down right after we %% ask for its status. --define(IF_STORAGE_RUNNING(SHARDID, EXPR), - case emqx_ds_storage_layer:shard_info(SHARDID, status) of - running -> EXPR; - down -> {error, recoverable, storage_down} +-define(IF_SHARD_READY(DB, SHARD, EXPR), + case emqx_ds_replication_layer_shard:shard_info(DB, SHARD, ready) of + true -> EXPR; + false -> {error, recoverable, shard_unavailable} end ). @@ -525,8 +525,9 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) -> [{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down). do_get_streams_v2(DB, Shard, TopicFilter, StartTime) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime) ). @@ -552,8 +553,9 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) -> emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime) ). @@ -587,8 +589,9 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) -> emqx_ds:next_result(emqx_ds_storage_layer:iterator()). do_next_v1(DB, Shard, Iter, BatchSize) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:next( ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard) ) @@ -620,8 +623,9 @@ do_add_generation_v2(_DB) -> | emqx_ds:error(storage_down). do_list_generations_with_lifetimes_v3(DB, Shard) -> ShardId = {DB, Shard}, - ?IF_STORAGE_RUNNING( - ShardId, + ?IF_SHARD_READY( + DB, + Shard, emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId) ).