fix(dsrepl): handle RPC errors gracefully when storage is down

This commit is contained in:
Andrew Mayorov 2024-03-26 17:13:38 +01:00
parent 99ea63b25a
commit fa66a640c3
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 108 additions and 53 deletions

View File

@ -181,12 +181,19 @@ list_generations_with_lifetimes(DB) ->
Shards = list_shards(DB),
lists:foldl(
fun(Shard, GensAcc) ->
case ra_list_generations_with_lifetimes(DB, Shard) of
Gens = #{} ->
ok;
{error, _Class, _Reason} ->
%% TODO: log error
Gens = #{}
end,
maps:fold(
fun(GenId, Data, AccInner) ->
AccInner#{{Shard, GenId} => Data}
end,
GensAcc,
ra_list_generations_with_lifetimes(DB, Shard)
Gens
)
end,
#{},
@ -221,13 +228,12 @@ get_streams(DB, TopicFilter, StartTime) ->
Shards = list_shards(DB),
lists:flatmap(
fun(Shard) ->
Streams =
try
ra_get_streams(DB, Shard, TopicFilter, StartTime)
catch
error:{erpc, _} ->
%% TODO: log?
[]
case ra_get_streams(DB, Shard, TopicFilter, StartTime) of
Streams when is_list(Streams) ->
ok;
{error, _Class, _Reason} ->
%% TODO: log error
Streams = []
end,
lists:map(
fun({RankY, StorageLayerStream}) ->
@ -262,14 +268,11 @@ get_delete_streams(DB, TopicFilter, StartTime) ->
emqx_ds:make_iterator_result(iterator()).
make_iterator(DB, Stream, TopicFilter, StartTime) ->
?stream_v2(Shard, StorageStream) = Stream,
try ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
case ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
{ok, Iter} ->
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
Error = {error, _, _} ->
Error
catch
error:RPCError = {erpc, _} ->
{error, recoverable, RPCError}
end.
-spec make_delete_iterator(emqx_ds:db(), delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
@ -279,22 +282,19 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
case ra_make_delete_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
{ok, Iter} ->
{ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}};
Err = {error, _} ->
Err
Error = {error, _, _} ->
Error
end.
-spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) ->
emqx_ds:make_iterator_result(iterator()).
update_iterator(DB, OldIter, DSKey) ->
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
try ra_update_iterator(DB, Shard, StorageIter, DSKey) of
case ra_update_iterator(DB, Shard, StorageIter, DSKey) of
{ok, Iter} ->
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
Error = {error, _, _} ->
Error
catch
error:RPCError = {erpc, _} ->
{error, recoverable, RPCError}
end.
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
@ -312,12 +312,8 @@ next(DB, Iter0, BatchSize) ->
{ok, StorageIter, Batch} ->
Iter = Iter0#{?enc := StorageIter},
{ok, Iter, Batch};
Ok = {ok, _} ->
Ok;
Error = {error, _, _} ->
Error;
RPCError = {badrpc, _} ->
{error, recoverable, RPCError}
Other ->
Other
end.
-spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) ->
@ -354,6 +350,19 @@ foreach_shard(DB, Fun) ->
%% Internal exports (RPC targets)
%%================================================================================
%% NOTE
%% Target node may still be in the process of starting up when RPCs arrive, it's
%% good to have them handled gracefully.
%% TODO
%% There's a possibility of race condition: storage may shut down right after we
%% ask for its status.
-define(IF_STORAGE_RUNNING(SHARDID, EXPR),
case emqx_ds_storage_layer:shard_info(SHARDID, status) of
running -> EXPR;
down -> {error, recoverable, storage_down}
end
).
-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}.
do_drop_db_v1(DB) ->
MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
@ -386,11 +395,18 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
error(obsolete_api).
-spec do_get_streams_v2(
emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds:topic_filter(),
emqx_ds:time()
) ->
[{integer(), emqx_ds_storage_layer:stream()}].
[{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down).
do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime).
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
).
-dialyzer({nowarn_function, do_make_iterator_v1/5}).
-spec do_make_iterator_v1(
@ -413,7 +429,11 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
) ->
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
).
-spec do_make_delete_iterator_v4(
emqx_ds:db(),
@ -434,9 +454,7 @@ do_make_delete_iterator_v4(DB, Shard, Stream, TopicFilter, StartTime) ->
) ->
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
emqx_ds_storage_layer:update_iterator(
{DB, Shard}, OldIter, DSKey
).
emqx_ds_storage_layer:update_iterator({DB, Shard}, OldIter, DSKey).
-spec do_next_v1(
emqx_ds:db(),
@ -446,7 +464,11 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
) ->
emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
do_next_v1(DB, Shard, Iter, BatchSize) ->
emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize).
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
emqx_ds_storage_layer:next(ShardId, Iter, BatchSize)
).
-spec do_delete_next_v4(
emqx_ds:db(),
@ -464,9 +486,14 @@ do_add_generation_v2(_DB) ->
error(obsolete_api).
-spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) ->
#{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}.
do_list_generations_with_lifetimes_v3(DB, ShardId) ->
emqx_ds_storage_layer:list_generations_with_lifetimes({DB, ShardId}).
#{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}
| emqx_ds:error(storage_down).
do_list_generations_with_lifetimes_v3(DB, Shard) ->
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
).
-spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) ->
ok | {error, _}.
@ -491,6 +518,15 @@ list_nodes() ->
%% Too large for normal operation, need better backpressure mechanism.
-define(RA_TIMEOUT, 60 * 1000).
-define(SAFERPC(EXPR),
try
EXPR
catch
error:RPCError = {erpc, _} ->
{error, recoverable, RPCError}
end
).
ra_store_batch(DB, Shard, Messages) ->
Command = #{
?tag => ?BATCH,
@ -544,28 +580,34 @@ ra_drop_generation(DB, Shard, GenId) ->
ra_get_streams(DB, Shard, TopicFilter, Time) ->
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
TimestampUs = timestamp_to_timeus(Time),
emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs).
?SAFERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)).
ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time).
?SAFERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)).
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
TimestampUs = timestamp_to_timeus(StartTime),
emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimestampUs).
TimeUs = timestamp_to_timeus(StartTime),
?SAFERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
TimeUs = timestamp_to_timeus(StartTime),
?SAFERPC(emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
ra_update_iterator(DB, Shard, Iter, DSKey) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey).
?SAFERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)).
ra_next(DB, Shard, Iter, BatchSize) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize).
case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of
RPCError = {badrpc, _} ->
{error, recoverable, RPCError};
Other ->
Other
end.
ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
@ -573,7 +615,8 @@ ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
ra_list_generations_with_lifetimes(DB, Shard) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
Gens = emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard),
case ?SAFERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) of
Gens = #{} ->
maps:map(
fun(_GenId, Data = #{since := Since, until := Until}) ->
Data#{
@ -582,7 +625,10 @@ ra_list_generations_with_lifetimes(DB, Shard) ->
}
end,
Gens
).
);
Error ->
Error
end.
ra_drop_shard(DB, Shard) ->
ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT).

View File

@ -21,6 +21,7 @@
-export([
open_shard/2,
drop_shard/1,
shard_info/2,
store_batch/3,
get_streams/3,
get_delete_streams/3,
@ -436,6 +437,14 @@ list_generations_with_lifetimes(ShardId) ->
drop_generation(ShardId, GenId) ->
gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity).
-spec shard_info(shard_id(), status) -> running | down.
shard_info(ShardId, status) ->
try get_schema_runtime(ShardId) of
#{} -> running
catch
error:badarg -> down
end.
%%================================================================================
%% gen_server for the shard
%%================================================================================