diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index bc0765f27..eb4b1fc70 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -114,6 +114,7 @@ store_batch(DB, Messages, Opts) -> init([DB, Shard]) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), + logger:update_process_metadata(#{domain => [emqx, ds, egress, DB]}), MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard), ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId), S = #s{ @@ -234,19 +235,29 @@ do_flush( pending_replies = [] }; {error, recoverable, Reason} -> + %% Note: this is a hot loop, so we report error messages + %% with `debug' level to avoid wiping the logs. Instead, + %% error the detection must rely on the metrics. Debug + %% logging can be enabled for the particular egress server + %% via logger domain. + ?tp( + debug, + emqx_ds_replication_layer_egress_flush_failed, + #{db => DB, shard => Shard, reason => Reason, recoverable => true} + ), %% Retry sending the batch: emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics), erlang:garbage_collect(), %% We block the gen_server until the next retry. BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), timer:sleep(BlockTime), - ?tp( - warning, - emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Reason} - ), S; - Err = {error, unrecoverable, _} -> + Err = {error, unrecoverable, Reason} -> + ?tp( + debug, + emqx_ds_replication_layer_egress_flush_failed, + #{db => DB, shard => Shard, reason => Reason, recoverable => false} + ), emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics), lists:foreach(fun(From) -> gen_server:reply(From, Err) end, Replies), erlang:garbage_collect(),