diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 286d32ef4..486508f37 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -208,6 +208,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> ?SLOG(debug, #{ msg => new_stream, key => Key, stream => Stream }), + %% TODO: It's unlikely to fail but it's still possible. {ok, Iterator} = emqx_ds:make_iterator( ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ), diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index cd1dd3cc1..19cf43474 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -381,6 +381,10 @@ list_nodes() -> %% +-define(SAFERPC(SERVER, EXPR), make_safe_rpc(SERVER, fun() -> EXPR end)). +-define(SAFERPC(SERVER, EXPR, RET), make_safe_rpc(SERVER, fun() -> EXPR end, RET)). +-define(GENRPC(SERVER, EXPR), make_gen_rpc(SERVER, fun() -> EXPR end)). + ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, @@ -425,20 +429,27 @@ ra_drop_generation(DB, Shard, GenId) -> end. ra_get_streams(DB, Shard, TopicFilter, Time) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time). + Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + ?SAFERPC( + Server, + emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time), + [] + ). ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). + Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + ?SAFERPC( + Server, + emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) + ). ra_update_iterator(DB, Shard, Iter, DSKey) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey). + Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + ?SAFERPC(Server, emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)). ra_next(DB, Shard, Iter, BatchSize) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize). + Server = {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), + ?GENRPC(Server, emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize)). ra_list_generations_with_lifetimes(DB, Shard) -> {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), @@ -448,6 +459,47 @@ ra_drop_shard(DB, Shard) -> LocalServer = emqx_ds_replication_layer_shard:server(DB, Shard, local), ra:force_delete_server(_System = default, LocalServer). +make_safe_rpc(Server, Fun) -> + try + Fun() + catch + C:Reason when C == error orelse C == exit -> + _ = logger:warning(#{ + msg => "RPC failed", + server => Server, + exception => C, + context => Reason + }), + {error, Reason} + end. + +make_safe_rpc(Server, Fun, Ret) -> + try + Fun() + catch + C:Reason when C == error orelse C == exit -> + _ = logger:warning(#{ + msg => "RPC failed", + server => Server, + exception => C, + context => Reason + }), + Ret + end. + +make_gen_rpc(Server, Fun) -> + case Fun() of + {badrpc, Reason} -> + _ = logger:warning(#{ + msg => "RPC failed", + server => Server, + context => Reason + }), + {error, Reason}; + Ret -> + Ret + end. + %% init(#{db := DB, shard := Shard}) ->