wip(dsrepl): handle RPC errors gracefully gracefully

This commit is contained in:
Andrew Mayorov 2024-03-26 17:13:38 +01:00
parent 663ea69574
commit 534e177e7c
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 100 additions and 46 deletions

View File

@ -202,12 +202,19 @@ list_generations_with_lifetimes(DB) ->
Shards = list_shards(DB), Shards = list_shards(DB),
lists:foldl( lists:foldl(
fun(Shard, GensAcc) -> fun(Shard, GensAcc) ->
case ra_list_generations_with_lifetimes(DB, Shard) of
Gens = #{} ->
ok;
{error, _Class, _Reason} ->
%% TODO: log error
Gens = #{}
end,
maps:fold( maps:fold(
fun(GenId, Data, AccInner) -> fun(GenId, Data, AccInner) ->
AccInner#{{Shard, GenId} => Data} AccInner#{{Shard, GenId} => Data}
end, end,
GensAcc, GensAcc,
ra_list_generations_with_lifetimes(DB, Shard) Gens
) )
end, end,
#{}, #{},
@ -242,13 +249,12 @@ get_streams(DB, TopicFilter, StartTime) ->
Shards = list_shards(DB), Shards = list_shards(DB),
lists:flatmap( lists:flatmap(
fun(Shard) -> fun(Shard) ->
Streams = case ra_get_streams(DB, Shard, TopicFilter, StartTime) of
try Streams when is_list(Streams) ->
ra_get_streams(DB, Shard, TopicFilter, StartTime) ok;
catch {error, _Class, _Reason} ->
error:{erpc, _} -> %% TODO: log error
%% TODO: log? Streams = []
[]
end, end,
lists:map( lists:map(
fun({RankY, StorageLayerStream}) -> fun({RankY, StorageLayerStream}) ->
@ -283,14 +289,11 @@ get_delete_streams(DB, TopicFilter, StartTime) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
make_iterator(DB, Stream, TopicFilter, StartTime) -> make_iterator(DB, Stream, TopicFilter, StartTime) ->
?stream_v2(Shard, StorageStream) = Stream, ?stream_v2(Shard, StorageStream) = Stream,
try ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of case ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
Error = {error, _, _} -> Error = {error, _, _} ->
Error Error
catch
error:RPCError = {erpc, _} ->
{error, recoverable, RPCError}
end. end.
-spec make_delete_iterator(emqx_ds:db(), delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> -spec make_delete_iterator(emqx_ds:db(), delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
@ -300,22 +303,19 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) ->
case ra_make_delete_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of case ra_make_delete_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of
{ok, Iter} -> {ok, Iter} ->
{ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}}; {ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}};
Err = {error, _} -> Error = {error, _, _} ->
Err Error
end. end.
-spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) -> -spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) ->
emqx_ds:make_iterator_result(iterator()). emqx_ds:make_iterator_result(iterator()).
update_iterator(DB, OldIter, DSKey) -> update_iterator(DB, OldIter, DSKey) ->
#{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
try ra_update_iterator(DB, Shard, StorageIter, DSKey) of case ra_update_iterator(DB, Shard, StorageIter, DSKey) of
{ok, Iter} -> {ok, Iter} ->
{ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
Error = {error, _, _} -> Error = {error, _, _} ->
Error Error
catch
error:RPCError = {erpc, _} ->
{error, recoverable, RPCError}
end. end.
-spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()).
@ -375,6 +375,19 @@ foreach_shard(DB, Fun) ->
%% Internal exports (RPC targets) %% Internal exports (RPC targets)
%%================================================================================ %%================================================================================
%% NOTE
%% Target node may still be in the process of starting up when RPCs arrive, it's
%% good to have them handled gracefully.
%% TODO
%% There's a possibility of race condition: storage may shut down right after we
%% ask for its status.
-define(IF_STORAGE_RUNNING(SHARDID, EXPR),
case emqx_ds_storage_layer:shard_info(SHARDID, status) of
running -> EXPR;
down -> {error, recoverable, storage_down}
end
).
-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}. -spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}.
do_drop_db_v1(DB) -> do_drop_db_v1(DB) ->
MyShards = emqx_ds_replication_layer_meta:my_shards(DB), MyShards = emqx_ds_replication_layer_meta:my_shards(DB),
@ -406,11 +419,18 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
error(obsolete_api). error(obsolete_api).
-spec do_get_streams_v2( -spec do_get_streams_v2(
emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() emqx_ds:db(),
emqx_ds_replication_layer:shard_id(),
emqx_ds:topic_filter(),
emqx_ds:time()
) -> ) ->
[{integer(), emqx_ds_storage_layer:stream()}]. [{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down).
do_get_streams_v2(DB, Shard, TopicFilter, StartTime) -> do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime). ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
).
-dialyzer({nowarn_function, do_make_iterator_v1/5}). -dialyzer({nowarn_function, do_make_iterator_v1/5}).
-spec do_make_iterator_v1( -spec do_make_iterator_v1(
@ -433,7 +453,11 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
) -> ) ->
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) -> do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime). ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
).
-spec do_make_delete_iterator_v4( -spec do_make_delete_iterator_v4(
emqx_ds:db(), emqx_ds:db(),
@ -454,9 +478,7 @@ do_make_delete_iterator_v4(DB, Shard, Stream, TopicFilter, StartTime) ->
) -> ) ->
emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()).
do_update_iterator_v2(DB, Shard, OldIter, DSKey) -> do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
emqx_ds_storage_layer:update_iterator( emqx_ds_storage_layer:update_iterator({DB, Shard}, OldIter, DSKey).
{DB, Shard}, OldIter, DSKey
).
-spec do_next_v1( -spec do_next_v1(
emqx_ds:db(), emqx_ds:db(),
@ -466,7 +488,11 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
) -> ) ->
emqx_ds:next_result(emqx_ds_storage_layer:iterator()). emqx_ds:next_result(emqx_ds_storage_layer:iterator()).
do_next_v1(DB, Shard, Iter, BatchSize) -> do_next_v1(DB, Shard, Iter, BatchSize) ->
emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize). ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
emqx_ds_storage_layer:next(ShardId, Iter, BatchSize)
).
-spec do_delete_next_v4( -spec do_delete_next_v4(
emqx_ds:db(), emqx_ds:db(),
@ -484,9 +510,14 @@ do_add_generation_v2(_DB) ->
error(obsolete_api). error(obsolete_api).
-spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) -> -spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) ->
#{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}. #{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}
do_list_generations_with_lifetimes_v3(DB, ShardId) -> | emqx_ds:error(storage_down).
emqx_ds_storage_layer:list_generations_with_lifetimes({DB, ShardId}). do_list_generations_with_lifetimes_v3(DB, Shard) ->
ShardId = {DB, Shard},
?IF_STORAGE_RUNNING(
ShardId,
emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
).
-spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) -> -spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) ->
no_return(). no_return().
@ -511,6 +542,15 @@ list_nodes() ->
%% Too large for normal operation, need better backpressure mechanism. %% Too large for normal operation, need better backpressure mechanism.
-define(RA_TIMEOUT, 60 * 1000). -define(RA_TIMEOUT, 60 * 1000).
-define(SAFERPC(EXPR),
try
EXPR
catch
error:RPCError = {erpc, _} ->
{error, recoverable, RPCError}
end
).
ra_store_batch(DB, Shard, Messages) -> ra_store_batch(DB, Shard, Messages) ->
Command = #{ Command = #{
?tag => ?BATCH, ?tag => ?BATCH,
@ -564,24 +604,25 @@ ra_drop_generation(DB, Shard, GenId) ->
ra_get_streams(DB, Shard, TopicFilter, Time) -> ra_get_streams(DB, Shard, TopicFilter, Time) ->
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
TimestampUs = timestamp_to_timeus(Time), TimestampUs = timestamp_to_timeus(Time),
emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs). ?SAFERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)).
ra_get_delete_streams(DB, Shard, TopicFilter, Time) -> ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time). ?SAFERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)).
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
TimestampUs = timestamp_to_timeus(StartTime), TimeUs = timestamp_to_timeus(StartTime),
emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimestampUs). ?SAFERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). TimeUs = timestamp_to_timeus(StartTime),
?SAFERPC(emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)).
ra_update_iterator(DB, Shard, Iter, DSKey) -> ra_update_iterator(DB, Shard, Iter, DSKey) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey). ?SAFERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)).
ra_next(DB, Shard, Iter, BatchSize) -> ra_next(DB, Shard, Iter, BatchSize) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
@ -593,7 +634,8 @@ ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
ra_list_generations_with_lifetimes(DB, Shard) -> ra_list_generations_with_lifetimes(DB, Shard) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
Gens = emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard), case ?SAFERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) of
Gens = #{} ->
maps:map( maps:map(
fun(_GenId, Data = #{since := Since, until := Until}) -> fun(_GenId, Data = #{since := Since, until := Until}) ->
Data#{ Data#{
@ -602,7 +644,10 @@ ra_list_generations_with_lifetimes(DB, Shard) ->
} }
end, end,
Gens Gens
). );
Error ->
Error
end.
ra_drop_shard(DB, Shard) -> ra_drop_shard(DB, Shard) ->
ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT). ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT).

View File

@ -22,6 +22,7 @@
%% Lifecycle %% Lifecycle
start_link/2, start_link/2,
drop_shard/1, drop_shard/1,
shard_info/2,
%% Data %% Data
store_batch/3, store_batch/3,
@ -458,6 +459,14 @@ accept_snapshot(ShardId) ->
ok = drop_shard(ShardId), ok = drop_shard(ShardId),
handle_accept_snapshot(ShardId). handle_accept_snapshot(ShardId).
-spec shard_info(shard_id(), status) -> running | down.
shard_info(ShardId, status) ->
try get_schema_runtime(ShardId) of
#{} -> running
catch
error:badarg -> down
end.
%%================================================================================ %%================================================================================
%% gen_server for the shard %% gen_server for the shard
%%================================================================================ %%================================================================================