diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 1b2ac30dd..9651c029e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -179,14 +179,18 @@ terminate(_Reason, _S) -> -define(COOLDOWN_MAX, 5000). do_flush( - S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard} + S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard, metrics_id = Metrics} ) -> Messages = queue:to_list(Q), - case emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages) of + T0 = erlang:monotonic_time(microsecond), + Result = emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_egress_flush_time(Metrics, T1 - T0), + case Result of ok -> - emqx_ds_builtin_metrics:inc_egress_batches(S#s.metrics_id), - emqx_ds_builtin_metrics:inc_egress_messages(S#s.metrics_id, S#s.n), - emqx_ds_builtin_metrics:inc_egress_bytes(S#s.metrics_id, S#s.n_bytes), + emqx_ds_builtin_metrics:inc_egress_batches(Metrics), + emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n), + emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes), lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), ?tp( emqx_ds_replication_layer_egress_flush,