diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index a47540360..8075238b3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -16,7 +16,14 @@ -module(emqx_ds_builtin_metrics). %% API: --export([child_spec/0, init_for_db/1, init_for_shard/2]). +-export([child_spec/0, init_for_db/1, shard_metric_id/2, init_for_shard/1]). +-export([ + inc_egress_batches/1, + inc_egress_batches_retry/1, + inc_egress_messages/2, + inc_egress_bytes/2, + observe_egress_flush_time/2 +]). %% behavior callbacks: -export([]). @@ -24,7 +31,7 @@ %% internal exports: -export([]). --export_type([]). +-export_type([shard_metrics_id/0]). %%================================================================================ %% Type declarations @@ -32,18 +39,17 @@ -define(WORKER, ?MODULE). --define(DB_METRICS, - [ +-define(DB_METRICS, []). - ]). +-define(SHARD_METRICS, [ + 'egress.batches', + 'egress.batches.retry', + 'egress.messages', + 'egress.bytes', + {slide, 'egress.flush_time'} +]). --define(SHARD_METRICS, - [ - 'egress.bytes', - 'egress.batches', - 'egress.messages', - {slide, 'egress.flush_time'} - ]). +-type shard_metrics_id() :: binary(). %%================================================================================ %% API functions @@ -57,18 +63,39 @@ child_spec() -> init_for_db(DB) -> emqx_metrics_worker:create_metrics(?WORKER, DB, ?DB_METRICS, []). --spec init_for_shard(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok. -init_for_shard(DB, ShardId) -> - Id = iolist_to_binary([atom_to_list(DB), $/, ShardId]), - emqx_metrics_worker:create_metrics(?WORKER, Id, ?SHARD_METRICS, []). +-spec shard_metric_id(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> shard_metrics_id(). +shard_metric_id(DB, ShardId) -> + iolist_to_binary([atom_to_list(DB), $/, ShardId]). -%%================================================================================ -%% behavior callbacks -%%================================================================================ +-spec init_for_shard(shard_metrics_id()) -> ok. +init_for_shard(ShardId) -> + emqx_metrics_worker:create_metrics(?WORKER, ShardId, ?SHARD_METRICS, []). -%%================================================================================ -%% Internal exports -%%================================================================================ +%% @doc Increase the number of successfully flushed batches +-spec inc_egress_batches(shard_metrics_id()) -> ok. +inc_egress_batches(Id) -> + emqx_metrics_worker:inc(?WORKER, Id, 'egress.batches'). + +%% @doc Increase the number of time the egress worker had to retry +%% flushing the batch +-spec inc_egress_batches_retry(shard_metrics_id()) -> ok. +inc_egress_batches_retry(Id) -> + emqx_metrics_worker:inc(?WORKER, Id, 'egress.batches.retry'). + +%% @doc Increase the number of messages successfully saved to the shard +-spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok. +inc_egress_messages(Id, NMessages) -> + emqx_metrics_worker:inc(?WORKER, Id, 'egress.messages', NMessages). + +%% @doc Increase the number of messages successfully saved to the shard +-spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok. +inc_egress_bytes(Id, NMessages) -> + emqx_metrics_worker:inc(?WORKER, Id, 'egress.bytes', NMessages). + +%% @doc Add a sample of time spent flushing the egress to the Raft log (in microseconds) +-spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. +observe_egress_flush_time(Id, FlushTime) -> + emqx_metrics_worker:observe(?WORKER, Id, 'egress.flush_time', FlushTime). %%================================================================================ %% Internal functions diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 128aeb380..3f9188312 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -40,6 +40,7 @@ -export_type([]). +-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). %%================================================================================ @@ -49,8 +50,16 @@ -define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}). -define(flush, flush). --record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}). --record(enqueue_atomic_req, {batch :: [emqx_types:message()], sync :: boolean()}). +-record(enqueue_req, { + message :: emqx_types:message(), + sync :: boolean(), + payload_bytes :: non_neg_integer() +}). +-record(enqueue_atomic_req, { + batch :: [emqx_types:message()], + sync :: boolean(), + payload_bytes :: non_neg_integer() +}). %%================================================================================ %% API functions @@ -73,7 +82,8 @@ store_batch(DB, Messages, Opts) -> ?via(DB, Shard), #enqueue_req{ message = Message, - sync = Sync + sync = Sync, + payload_bytes = payload_size(Message) }, infinity ) @@ -83,11 +93,19 @@ store_batch(DB, Messages, Opts) -> true -> maps:foreach( fun(Shard, Batch) -> + PayloadBytes = lists:foldl( + fun(Msg, Acc) -> + Acc + payload_size(Msg) + end, + 0, + Batch + ), gen_server:call( ?via(DB, Shard), #enqueue_atomic_req{ batch = Batch, - sync = Sync + sync = Sync, + payload_bytes = PayloadBytes }, infinity ) @@ -108,7 +126,9 @@ store_batch(DB, Messages, Opts) -> -record(s, { db :: emqx_ds:db(), shard :: emqx_ds_replication_layer:shard_id(), + metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(), n = 0 :: non_neg_integer(), + n_bytes = 0 :: non_neg_integer(), tref :: reference(), batch = [] :: [emqx_types:message()], pending_replies = [] :: [gen_server:from()] @@ -117,18 +137,21 @@ store_batch(DB, Messages, Opts) -> init([DB, Shard]) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), + MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard), + ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId), S = #s{ db = DB, shard = Shard, + metrics_id = MetricsId, tref = start_timer() }, {ok, S}. -handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) -> - do_enqueue(From, Sync, Msg, S); -handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync}, From, S) -> +handle_call(#enqueue_req{message = Msg, sync = Sync, payload_bytes = NBytes}, From, S) -> + do_enqueue(From, Sync, Msg, NBytes, S); +handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync, payload_bytes = NBytes}, From, S) -> Len = length(Batch), - do_enqueue(From, Sync, {atomic, Len, Batch}, S); + do_enqueue(From, Sync, {atomic, Len, NBytes, Batch}, NBytes, S); handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -161,6 +184,11 @@ do_flush( ) -> case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of ok -> + emqx_ds_builtin_metrics:inc_egress_batches(S#s.metrics_id), + emqx_ds_builtin_metrics:inc_egress_messages(S#s.metrics_id, S#s.n), + emqx_ds_builtin_metrics:inc_egress_bytes(S#s.metrics_id, S#s.n_bytes), + lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), + true = erlang:garbage_collect(), ?tp( emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages} @@ -169,6 +197,7 @@ do_flush( true = erlang:garbage_collect(), ok; Error -> + emqx_ds_builtin_metrics:inc_egress_batches_retry(S#s.metrics_id), true = erlang:garbage_collect(), ?tp( warning, @@ -184,19 +213,27 @@ do_flush( end, S#s{ n = 0, + n_bytes = 0, batch = [], pending_replies = [], tref = start_timer() }. -do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> +do_enqueue( + From, + Sync, + MsgOrBatch, + BatchBytes, + S0 = #s{n = N, n_bytes = NBytes0, batch = Batch, pending_replies = Replies} +) -> + NBytes = NBytes0 + BatchBytes, NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), S1 = case MsgOrBatch of {atomic, NumMsgs, Msgs} -> - S0#s{n = N + NumMsgs, batch = Msgs ++ Batch}; + S0#s{n = N + NumMsgs, n_bytes = NBytes, batch = Msgs ++ Batch}; Msg -> - S0#s{n = N + 1, batch = [Msg | Batch]} + S0#s{n = N + 1, n_bytes = NBytes, batch = [Msg | Batch]} end, %% TODO: later we may want to delay the reply until the message is %% replicated, but it requies changes to the PUBACK/PUBREC flow to @@ -228,3 +265,8 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies start_timer() -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), erlang:send_after(Interval, self(), ?flush). + +%% @doc Return approximate size of the MQTT message (it doesn't take +%% all things into account, for example headers and extras) +payload_size(#message{payload = P, topic = T}) -> + size(P) + size(T). diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 8556e82d3..813f91dbd 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -212,6 +212,7 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) -> ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)), ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)), + ok = add_collect_family(Callback, ds_metric_meta(), ?MG(ds_data, RawData)), ok = maybe_license_add_collect_family(Callback, RawData), ok; collect_mf(_Registry, _Callback) -> @@ -1011,6 +1012,18 @@ catch_all(DataFun) -> _:_ -> undefined end. +%%======================================== +%% Durable storge +%%======================================== + +ds_metric_meta() -> + [ + {emqx_ds_egress_batches, counter, 'egress.batches'}, + {emqx_ds_egress_batches_retry, counter, 'egress.batches.retry'}, + {emqx_ds_egress_messages, counter, 'egress.messages'}, + {emqx_ds_egress_bytes, counter, 'egress.bytes'} + ]. + %%-------------------------------------------------------------------- %% Collect functions %%--------------------------------------------------------------------