wip: catch and mask badrpcs in the read path

This commit is contained in:
Andrew Mayorov 2024-02-15 16:51:52 +01:00
parent 58bd42bfc1
commit 0e832db1d4
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 61 additions and 8 deletions

View File

@ -208,6 +208,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
?SLOG(debug, #{
msg => new_stream, key => Key, stream => Stream
}),
%% TODO: It's unlikely to fail but it's still possible.
{ok, Iterator} = emqx_ds:make_iterator(
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
),

View File

@ -381,6 +381,10 @@ list_nodes() ->
%%
-define(SAFERPC(SERVER, EXPR), make_safe_rpc(SERVER, fun() -> EXPR end)).
-define(SAFERPC(SERVER, EXPR, RET), make_safe_rpc(SERVER, fun() -> EXPR end, RET)).
-define(GENRPC(SERVER, EXPR), make_gen_rpc(SERVER, fun() -> EXPR end)).
ra_store_batch(DB, Shard, Messages) ->
Command = #{
?tag => ?BATCH,
@ -425,20 +429,27 @@ ra_drop_generation(DB, Shard, GenId) ->
end.
ra_get_streams(DB, Shard, TopicFilter, Time) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time).
Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?SAFERPC(
Server,
emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time),
[]
).
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?SAFERPC(
Server,
emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime)
).
ra_update_iterator(DB, Shard, Iter, DSKey) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey).
Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?SAFERPC(Server, emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)).
ra_next(DB, Shard, Iter, BatchSize) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize).
Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
?GENRPC(Server, emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize)).
ra_list_generations_with_lifetimes(DB, Shard) ->
{_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred),
@ -448,6 +459,47 @@ ra_drop_shard(DB, Shard) ->
LocalServer = emqx_ds_replication_layer_shard:server(DB, Shard, local),
ra:force_delete_server(_System = default, LocalServer).
make_safe_rpc(Server, Fun) ->
try
Fun()
catch
C:Reason when C == error orelse C == exit ->
_ = logger:warning(#{
msg => "RPC failed",
server => Server,
exception => C,
context => Reason
}),
{error, Reason}
end.
make_safe_rpc(Server, Fun, Ret) ->
try
Fun()
catch
C:Reason when C == error orelse C == exit ->
_ = logger:warning(#{
msg => "RPC failed",
server => Server,
exception => C,
context => Reason
}),
Ret
end.
make_gen_rpc(Server, Fun) ->
case Fun() of
{badrpc, Reason} ->
_ = logger:warning(#{
msg => "RPC failed",
server => Server,
context => Reason
}),
{error, Reason};
Ret ->
Ret
end.
%%
init(#{db := DB, shard := Shard}) ->