fix(dsraft): use shard readiness as criterion for reads availability

This commit is contained in:
Andrew Mayorov 2024-08-01 19:04:33 +02:00
parent 10dadbad3b
commit 5b15886836
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 16 additions and 12 deletions

View File

@ -479,10 +479,10 @@ shards_of_batch(_DB, [], Acc) ->
%% TODO
%% There's a possibility of race condition: storage may shut down right after we
%% ask for its status.
-define(IF_STORAGE_RUNNING(SHARDID, EXPR),
case emqx_ds_storage_layer:shard_info(SHARDID, status) of
running -> EXPR;
down -> {error, recoverable, storage_down}
-define(IF_SHARD_READY(DB, SHARD, EXPR),
case emqx_ds_replication_layer_shard:shard_info(DB, SHARD, ready) of
true -> EXPR;
false -> {error, recoverable, shard_unavailable}
end
).
@ -525,8 +525,9 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
[{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down).
do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
?IF_SHARD_READY(
DB,
Shard,
emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
).
@ -552,8 +553,9 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
?IF_SHARD_READY(
DB,
Shard,
emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
).
@ -587,8 +589,9 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
do_next_v1(DB, Shard, Iter, BatchSize) ->
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
?IF_SHARD_READY(
DB,
Shard,
emqx_ds_storage_layer:next(
ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
)
@ -620,8 +623,9 @@ do_add_generation_v2(_DB) ->
| emqx_ds:error(storage_down).
do_list_generations_with_lifetimes_v3(DB, Shard) ->
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
?IF_SHARD_READY(
DB,
Shard,
emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
).