From c355c9ad500b63de5dcc2774436d4f1afb7ef6a6 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 22 May 2024 17:22:55 +0200 Subject: [PATCH 1/2] fix(dsrepl): properly handle transaction abort during forget site --- .../src/emqx_ds_replication_layer_meta.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index fa53ecced..387dddbcf 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -694,12 +694,12 @@ ensure_site() -> forget_node(Node) -> Sites = node_sites(Node), - Results = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]), - case [Reason || {error, Reason} <- Results] of - [] -> + Result = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]), + case Result of + Ok when is_list(Ok) -> ok; - Errors -> - logger:error("Failed to forget leaving node ~p: ~p", [Node, Errors]) + {error, Reason} -> + logger:error("Failed to forget leaving node ~p: ~p", [Node, Reason]) end. %% @doc Returns sorted list of sites shards are replicated across. From e6c5c1b598c3df95f5c7a256ec7e3b864ed0aa00 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 22 May 2024 17:24:08 +0200 Subject: [PATCH 2/2] chore(dsrepl): provide more information in rebalancing log messages --- .../emqx_ds_replication_shard_allocator.erl | 36 +++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl index 6d8db94e3..d198b2ddd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_shard_allocator.erl @@ -229,6 +229,7 @@ handle_transition(DB, Shard, Trans, Handler) -> domain => [emqx, ds, DB, shard_transition] }), ?tp( + debug, dsrepl_shard_transition_begin, #{shard => Shard, db => DB, transition => Trans, pid => self()} ), @@ -240,7 +241,12 @@ apply_handler(Fun, DB, Shard, Trans) -> erlang:apply(Fun, [DB, Shard, Trans]). trans_add_local(DB, Shard, {add, Site}) -> - logger:info(#{msg => "Adding new local shard replica", site => Site}), + logger:info(#{ + msg => "Adding new local shard replica", + site => Site, + db => DB, + shard => Shard + }), do_add_local(membership, DB, Shard). do_add_local(membership = Stage, DB, Shard) -> @@ -251,6 +257,8 @@ do_add_local(membership = Stage, DB, Shard) -> {error, recoverable, Reason} -> logger:warning(#{ msg => "Shard membership change failed", + db => DB, + shard => Shard, reason => Reason, retry_in => ?TRANS_RETRY_TIMEOUT }), @@ -261,10 +269,12 @@ do_add_local(readiness = Stage, DB, Shard) -> LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard), case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of ready -> - logger:info(#{msg => "Local shard replica ready"}); + logger:info(#{msg => "Local shard replica ready", db => DB, shard => Shard}); Status -> logger:warning(#{ msg => "Still waiting for local shard replica to be ready", + db => DB, + shard => Shard, status => Status, retry_in => ?TRANS_RETRY_TIMEOUT }), @@ -273,7 +283,12 @@ do_add_local(readiness = Stage, DB, Shard) -> end. trans_drop_local(DB, Shard, {del, Site}) -> - logger:info(#{msg => "Dropping local shard replica", site => Site}), + logger:info(#{ + msg => "Dropping local shard replica", + site => Site, + db => DB, + shard => Shard + }), do_drop_local(DB, Shard). do_drop_local(DB, Shard) -> @@ -293,17 +308,24 @@ do_drop_local(DB, Shard) -> end. trans_rm_unresponsive(DB, Shard, {del, Site}) -> - logger:info(#{msg => "Removing unresponsive shard replica", site => Site}), + logger:info(#{ + msg => "Removing unresponsive shard replica", + site => Site, + db => DB, + shard => Shard + }), do_rm_unresponsive(DB, Shard, Site). do_rm_unresponsive(DB, Shard, Site) -> Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of ok -> - logger:info(#{msg => "Unresponsive shard replica removed"}); + logger:info(#{msg => "Unresponsive shard replica removed", db => DB, shard => Shard}); {error, recoverable, Reason} -> logger:warning(#{ msg => "Shard membership change failed", + db => DB, + shard => Shard, reason => Reason, retry_in => ?TRANS_RETRY_TIMEOUT }), @@ -341,6 +363,7 @@ handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) -> case maps:to_list(maps:filter(fun(_, TH) -> TH#transhdl.pid == Pid end, Ts)) of [{Track, #transhdl{shard = Shard, trans = Trans}}] -> ?tp( + debug, dsrepl_shard_transition_end, #{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason} ), @@ -361,9 +384,10 @@ handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) -> State; handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) -> State; -handle_transition_exit(Shard, Trans, Reason, State) -> +handle_transition_exit(Shard, Trans, Reason, State = #{db := DB}) -> logger:warning(#{ msg => "Shard membership transition failed", + db => DB, shard => Shard, transition => Trans, reason => Reason,