From fa66a640c32e35f0a173df491027486b648fb8f3 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 26 Mar 2024 17:13:38 +0100 Subject: [PATCH 1/2] fix(dsrepl): handle RPC errors gracefully when storage is down --- .../src/emqx_ds_replication_layer.erl | 152 ++++++++++++------ .../src/emqx_ds_storage_layer.erl | 9 ++ 2 files changed, 108 insertions(+), 53 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 72f142b8f..f8c4980d0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -181,12 +181,19 @@ list_generations_with_lifetimes(DB) -> Shards = list_shards(DB), lists:foldl( fun(Shard, GensAcc) -> + case ra_list_generations_with_lifetimes(DB, Shard) of + Gens = #{} -> + ok; + {error, _Class, _Reason} -> + %% TODO: log error + Gens = #{} + end, maps:fold( fun(GenId, Data, AccInner) -> AccInner#{{Shard, GenId} => Data} end, GensAcc, - ra_list_generations_with_lifetimes(DB, Shard) + Gens ) end, #{}, @@ -221,14 +228,13 @@ get_streams(DB, TopicFilter, StartTime) -> Shards = list_shards(DB), lists:flatmap( fun(Shard) -> - Streams = - try - ra_get_streams(DB, Shard, TopicFilter, StartTime) - catch - error:{erpc, _} -> - %% TODO: log? - [] - end, + case ra_get_streams(DB, Shard, TopicFilter, StartTime) of + Streams when is_list(Streams) -> + ok; + {error, _Class, _Reason} -> + %% TODO: log error + Streams = [] + end, lists:map( fun({RankY, StorageLayerStream}) -> RankX = Shard, @@ -262,14 +268,11 @@ get_delete_streams(DB, TopicFilter, StartTime) -> emqx_ds:make_iterator_result(iterator()). make_iterator(DB, Stream, TopicFilter, StartTime) -> ?stream_v2(Shard, StorageStream) = Stream, - try ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of + case ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; Error = {error, _, _} -> Error - catch - error:RPCError = {erpc, _} -> - {error, recoverable, RPCError} end. -spec make_delete_iterator(emqx_ds:db(), delete_stream(), emqx_ds:topic_filter(), emqx_ds:time()) -> @@ -279,22 +282,19 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) -> case ra_make_delete_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}}; - Err = {error, _} -> - Err + Error = {error, _, _} -> + Error end. -spec update_iterator(emqx_ds:db(), iterator(), emqx_ds:message_key()) -> emqx_ds:make_iterator_result(iterator()). update_iterator(DB, OldIter, DSKey) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, - try ra_update_iterator(DB, Shard, StorageIter, DSKey) of + case ra_update_iterator(DB, Shard, StorageIter, DSKey) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; Error = {error, _, _} -> Error - catch - error:RPCError = {erpc, _} -> - {error, recoverable, RPCError} end. -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). @@ -312,12 +312,8 @@ next(DB, Iter0, BatchSize) -> {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; - Ok = {ok, _} -> - Ok; - Error = {error, _, _} -> - Error; - RPCError = {badrpc, _} -> - {error, recoverable, RPCError} + Other -> + Other end. -spec delete_next(emqx_ds:db(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> @@ -354,6 +350,19 @@ foreach_shard(DB, Fun) -> %% Internal exports (RPC targets) %%================================================================================ +%% NOTE +%% Target node may still be in the process of starting up when RPCs arrive, it's +%% good to have them handled gracefully. +%% TODO +%% There's a possibility of race condition: storage may shut down right after we +%% ask for its status. +-define(IF_STORAGE_RUNNING(SHARDID, EXPR), + case emqx_ds_storage_layer:shard_info(SHARDID, status) of + running -> EXPR; + down -> {error, recoverable, storage_down} + end +). + -spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}. do_drop_db_v1(DB) -> MyShards = emqx_ds_replication_layer_meta:my_shards(DB), @@ -386,11 +395,18 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) -> error(obsolete_api). -spec do_get_streams_v2( - emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time() + emqx_ds:db(), + emqx_ds_replication_layer:shard_id(), + emqx_ds:topic_filter(), + emqx_ds:time() ) -> - [{integer(), emqx_ds_storage_layer:stream()}]. + [{integer(), emqx_ds_storage_layer:stream()}] | emqx_ds:error(storage_down). do_get_streams_v2(DB, Shard, TopicFilter, StartTime) -> - emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime). + ShardId = {DB, Shard}, + ?IF_STORAGE_RUNNING( + ShardId, + emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime) + ). -dialyzer({nowarn_function, do_make_iterator_v1/5}). -spec do_make_iterator_v1( @@ -413,7 +429,11 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) -> ) -> emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) -> - emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime). + ShardId = {DB, Shard}, + ?IF_STORAGE_RUNNING( + ShardId, + emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime) + ). -spec do_make_delete_iterator_v4( emqx_ds:db(), @@ -434,9 +454,7 @@ do_make_delete_iterator_v4(DB, Shard, Stream, TopicFilter, StartTime) -> ) -> emqx_ds:make_iterator_result(emqx_ds_storage_layer:iterator()). do_update_iterator_v2(DB, Shard, OldIter, DSKey) -> - emqx_ds_storage_layer:update_iterator( - {DB, Shard}, OldIter, DSKey - ). + emqx_ds_storage_layer:update_iterator({DB, Shard}, OldIter, DSKey). -spec do_next_v1( emqx_ds:db(), @@ -446,7 +464,11 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) -> ) -> emqx_ds:next_result(emqx_ds_storage_layer:iterator()). do_next_v1(DB, Shard, Iter, BatchSize) -> - emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize). + ShardId = {DB, Shard}, + ?IF_STORAGE_RUNNING( + ShardId, + emqx_ds_storage_layer:next(ShardId, Iter, BatchSize) + ). -spec do_delete_next_v4( emqx_ds:db(), @@ -464,9 +486,14 @@ do_add_generation_v2(_DB) -> error(obsolete_api). -spec do_list_generations_with_lifetimes_v3(emqx_ds:db(), shard_id()) -> - #{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()}. -do_list_generations_with_lifetimes_v3(DB, ShardId) -> - emqx_ds_storage_layer:list_generations_with_lifetimes({DB, ShardId}). + #{emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()} + | emqx_ds:error(storage_down). +do_list_generations_with_lifetimes_v3(DB, Shard) -> + ShardId = {DB, Shard}, + ?IF_STORAGE_RUNNING( + ShardId, + emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId) + ). -spec do_drop_generation_v3(emqx_ds:db(), shard_id(), emqx_ds_storage_layer:gen_id()) -> ok | {error, _}. @@ -491,6 +518,15 @@ list_nodes() -> %% Too large for normal operation, need better backpressure mechanism. -define(RA_TIMEOUT, 60 * 1000). +-define(SAFERPC(EXPR), + try + EXPR + catch + error:RPCError = {erpc, _} -> + {error, recoverable, RPCError} + end +). + ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, @@ -544,28 +580,34 @@ ra_drop_generation(DB, Shard, GenId) -> ra_get_streams(DB, Shard, TopicFilter, Time) -> {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), TimestampUs = timestamp_to_timeus(Time), - emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs). + ?SAFERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)). ra_get_delete_streams(DB, Shard, TopicFilter, Time) -> {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time). + ?SAFERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)). ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - TimestampUs = timestamp_to_timeus(StartTime), - emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimestampUs). + TimeUs = timestamp_to_timeus(StartTime), + ?SAFERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)). ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). + TimeUs = timestamp_to_timeus(StartTime), + ?SAFERPC(emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)). ra_update_iterator(DB, Shard, Iter, DSKey) -> {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey). + ?SAFERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)). ra_next(DB, Shard, Iter, BatchSize) -> {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize). + case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of + RPCError = {badrpc, _} -> + {error, recoverable, RPCError}; + Other -> + Other + end. ra_delete_next(DB, Shard, Iter, Selector, BatchSize) -> {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), @@ -573,16 +615,20 @@ ra_delete_next(DB, Shard, Iter, Selector, BatchSize) -> ra_list_generations_with_lifetimes(DB, Shard) -> {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - Gens = emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard), - maps:map( - fun(_GenId, Data = #{since := Since, until := Until}) -> - Data#{ - since := timeus_to_timestamp(Since), - until := emqx_maybe:apply(fun timeus_to_timestamp/1, Until) - } - end, - Gens - ). + case ?SAFERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) of + Gens = #{} -> + maps:map( + fun(_GenId, Data = #{since := Since, until := Until}) -> + Data#{ + since := timeus_to_timestamp(Since), + until := emqx_maybe:apply(fun timeus_to_timestamp/1, Until) + } + end, + Gens + ); + Error -> + Error + end. ra_drop_shard(DB, Shard) -> ra:delete_cluster(emqx_ds_replication_layer_shard:shard_servers(DB, Shard), ?RA_TIMEOUT). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 69f5b8231..5319458e2 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -21,6 +21,7 @@ -export([ open_shard/2, drop_shard/1, + shard_info/2, store_batch/3, get_streams/3, get_delete_streams/3, @@ -436,6 +437,14 @@ list_generations_with_lifetimes(ShardId) -> drop_generation(ShardId, GenId) -> gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity). +-spec shard_info(shard_id(), status) -> running | down. +shard_info(ShardId, status) -> + try get_schema_runtime(ShardId) of + #{} -> running + catch + error:badarg -> down + end. + %%================================================================================ %% gen_server for the shard %%================================================================================ From 35c43eb8a07141313d34ef88cb7d84bb514790de Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 26 Mar 2024 18:23:08 +0100 Subject: [PATCH 2/2] feat(sessds): handle recoverable errors in stream scheduler --- apps/emqx/src/emqx_persistent_session_ds.erl | 6 ++-- ...persistent_session_ds_stream_scheduler.erl | 28 ++++++++++++------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index c1ed6aabd..83ed5d465 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -571,7 +571,7 @@ replay(ClientInfo, [], Session0 = #{s := S0}) -> Session = replay_streams(Session0#{replay => Streams}, ClientInfo), {ok, [], Session}. -replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) -> +replay_streams(Session0 = #{replay := [{StreamKey, Srs0} | Rest]}, ClientInfo) -> case replay_batch(Srs0, Session0, ClientInfo) of Session = #{} -> replay_streams(Session#{replay := Rest}, ClientInfo); @@ -579,7 +579,7 @@ replay_streams(Session0 = #{replay := [{_StreamKey, Srs0} | Rest]}, ClientInfo) RetryTimeout = ?TIMEOUT_RETRY_REPLAY, ?SLOG(warning, #{ msg => "failed_to_fetch_replay_batch", - stream => Srs0, + stream => StreamKey, reason => Reason, class => recoverable, retry_in_ms => RetryTimeout @@ -867,7 +867,7 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> %% TODO: Handle unrecoverable error. ?SLOG(info, #{ msg => "failed_to_fetch_batch", - stream => Srs1, + stream => StreamKey, reason => Reason, class => Class }), diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 286d32ef4..154f59b44 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -208,16 +208,24 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> ?SLOG(debug, #{ msg => new_stream, key => Key, stream => Stream }), - {ok, Iterator} = emqx_ds:make_iterator( - ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime - ), - NewStreamState = #srs{ - rank_x = RankX, - rank_y = RankY, - it_begin = Iterator, - it_end = Iterator - }, - emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); + case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of + {ok, Iterator} -> + NewStreamState = #srs{ + rank_x = RankX, + rank_y = RankY, + it_begin = Iterator, + it_end = Iterator + }, + emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); + {error, recoverable, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_initialize_stream_iterator", + stream => Stream, + class => recoverable, + reason => Reason + }), + S + end; #srs{} -> S end.