chore(dsrepl): provide more information in rebalancing log messages

This commit is contained in:
Andrew Mayorov 2024-05-22 17:24:08 +02:00
parent c355c9ad50
commit e6c5c1b598
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 30 additions and 6 deletions

View File

@ -229,6 +229,7 @@ handle_transition(DB, Shard, Trans, Handler) ->
domain => [emqx, ds, DB, shard_transition] domain => [emqx, ds, DB, shard_transition]
}), }),
?tp( ?tp(
debug,
dsrepl_shard_transition_begin, dsrepl_shard_transition_begin,
#{shard => Shard, db => DB, transition => Trans, pid => self()} #{shard => Shard, db => DB, transition => Trans, pid => self()}
), ),
@ -240,7 +241,12 @@ apply_handler(Fun, DB, Shard, Trans) ->
erlang:apply(Fun, [DB, Shard, Trans]). erlang:apply(Fun, [DB, Shard, Trans]).
trans_add_local(DB, Shard, {add, Site}) -> trans_add_local(DB, Shard, {add, Site}) ->
logger:info(#{msg => "Adding new local shard replica", site => Site}), logger:info(#{
msg => "Adding new local shard replica",
site => Site,
db => DB,
shard => Shard
}),
do_add_local(membership, DB, Shard). do_add_local(membership, DB, Shard).
do_add_local(membership = Stage, DB, Shard) -> do_add_local(membership = Stage, DB, Shard) ->
@ -251,6 +257,8 @@ do_add_local(membership = Stage, DB, Shard) ->
{error, recoverable, Reason} -> {error, recoverable, Reason} ->
logger:warning(#{ logger:warning(#{
msg => "Shard membership change failed", msg => "Shard membership change failed",
db => DB,
shard => Shard,
reason => Reason, reason => Reason,
retry_in => ?TRANS_RETRY_TIMEOUT retry_in => ?TRANS_RETRY_TIMEOUT
}), }),
@ -261,10 +269,12 @@ do_add_local(readiness = Stage, DB, Shard) ->
LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard), LocalServer = emqx_ds_replication_layer_shard:local_server(DB, Shard),
case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of case emqx_ds_replication_layer_shard:server_info(readiness, LocalServer) of
ready -> ready ->
logger:info(#{msg => "Local shard replica ready"}); logger:info(#{msg => "Local shard replica ready", db => DB, shard => Shard});
Status -> Status ->
logger:warning(#{ logger:warning(#{
msg => "Still waiting for local shard replica to be ready", msg => "Still waiting for local shard replica to be ready",
db => DB,
shard => Shard,
status => Status, status => Status,
retry_in => ?TRANS_RETRY_TIMEOUT retry_in => ?TRANS_RETRY_TIMEOUT
}), }),
@ -273,7 +283,12 @@ do_add_local(readiness = Stage, DB, Shard) ->
end. end.
trans_drop_local(DB, Shard, {del, Site}) -> trans_drop_local(DB, Shard, {del, Site}) ->
logger:info(#{msg => "Dropping local shard replica", site => Site}), logger:info(#{
msg => "Dropping local shard replica",
site => Site,
db => DB,
shard => Shard
}),
do_drop_local(DB, Shard). do_drop_local(DB, Shard).
do_drop_local(DB, Shard) -> do_drop_local(DB, Shard) ->
@ -293,17 +308,24 @@ do_drop_local(DB, Shard) ->
end. end.
trans_rm_unresponsive(DB, Shard, {del, Site}) -> trans_rm_unresponsive(DB, Shard, {del, Site}) ->
logger:info(#{msg => "Removing unresponsive shard replica", site => Site}), logger:info(#{
msg => "Removing unresponsive shard replica",
site => Site,
db => DB,
shard => Shard
}),
do_rm_unresponsive(DB, Shard, Site). do_rm_unresponsive(DB, Shard, Site).
do_rm_unresponsive(DB, Shard, Site) -> do_rm_unresponsive(DB, Shard, Site) ->
Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site),
case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of case emqx_ds_replication_layer_shard:remove_server(DB, Shard, Server) of
ok -> ok ->
logger:info(#{msg => "Unresponsive shard replica removed"}); logger:info(#{msg => "Unresponsive shard replica removed", db => DB, shard => Shard});
{error, recoverable, Reason} -> {error, recoverable, Reason} ->
logger:warning(#{ logger:warning(#{
msg => "Shard membership change failed", msg => "Shard membership change failed",
db => DB,
shard => Shard,
reason => Reason, reason => Reason,
retry_in => ?TRANS_RETRY_TIMEOUT retry_in => ?TRANS_RETRY_TIMEOUT
}), }),
@ -341,6 +363,7 @@ handle_exit(Pid, Reason, State0 = #{db := DB, transitions := Ts}) ->
case maps:to_list(maps:filter(fun(_, TH) -> TH#transhdl.pid == Pid end, Ts)) of case maps:to_list(maps:filter(fun(_, TH) -> TH#transhdl.pid == Pid end, Ts)) of
[{Track, #transhdl{shard = Shard, trans = Trans}}] -> [{Track, #transhdl{shard = Shard, trans = Trans}}] ->
?tp( ?tp(
debug,
dsrepl_shard_transition_end, dsrepl_shard_transition_end,
#{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason} #{shard => Shard, db => DB, transition => Trans, pid => Pid, reason => Reason}
), ),
@ -361,9 +384,10 @@ handle_transition_exit(Shard, Trans, normal, State = #{db := DB}) ->
State; State;
handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) -> handle_transition_exit(_Shard, _Trans, {shutdown, skipped}, State) ->
State; State;
handle_transition_exit(Shard, Trans, Reason, State) -> handle_transition_exit(Shard, Trans, Reason, State = #{db := DB}) ->
logger:warning(#{ logger:warning(#{
msg => "Shard membership transition failed", msg => "Shard membership transition failed",
db => DB,
shard => Shard, shard => Shard,
transition => Trans, transition => Trans,
reason => Reason, reason => Reason,