diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 1470b7d8b..ed29ea614 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -253,8 +253,12 @@ persist_publish(Msg) -> case emqx_persistent_message:persist(Msg) of ok -> [persisted]; - {_SkipOrError, _Reason} -> - % TODO: log errors? + {skipped, _} -> + []; + {error, Recoverable, Reason} -> + ?SLOG(debug, #{ + msg => "failed_to_persist_message", is_recoverable => Recoverable, reason => Reason + }), [] end. diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 10497216d..c909c5c5f 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -98,7 +98,7 @@ pre_config_update(_Root, _NewConf, _OldConf) -> %%-------------------------------------------------------------------- -spec persist(emqx_types:message()) -> - ok | {skipped, _Reason} | {error, _TODO}. + emqx_ds:store_batch_result() | {skipped, needs_no_persistence}. persist(Msg) -> ?WHEN_ENABLED( case needs_persistence(Msg) andalso has_subscribers(Msg) of diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index fc6de2861..833e39211 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -29,7 +29,10 @@ inc_egress_bytes/2, observe_egress_flush_time/2, - observe_next_time/3 + + observe_store_batch_time/2, + + observe_next_time/2 ]). %% behavior callbacks: @@ -46,7 +49,15 @@ -define(WORKER, ?MODULE). --define(DB_METRICS, []). +-define(STORAGE_LAYER_METRICS, [ + {slide, 'emqx_ds_store_batch_time'} +]). + +-define(FETCH_METRICS, [ + {slide, 'emqx_ds_builtin_next_time'} +]). + +-define(DB_METRICS, ?STORAGE_LAYER_METRICS ++ ?FETCH_METRICS). -define(EGRESS_METRICS, [ {counter, 'emqx_ds_egress_batches'}, @@ -57,14 +68,12 @@ {slide, 'emqx_ds_egress_flush_time'} ]). --define(INGRESS_METRICS, [ - {slide, 'emqx_ds_builtin_next_time'} -]). - --define(SHARD_METRICS, ?EGRESS_METRICS ++ ?INGRESS_METRICS). +-define(SHARD_METRICS, ?EGRESS_METRICS). -type shard_metrics_id() :: binary(). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + %%================================================================================ %% API functions %%================================================================================ @@ -75,8 +84,8 @@ child_spec() -> %% @doc Initialize metrics that are global for a DS database -spec init_for_db(emqx_ds:db()) -> ok. -init_for_db(_DB) -> - ok. +init_for_db(DB) -> + emqx_metrics_worker:create_metrics(?WORKER, DB, ?DB_METRICS, []). -spec shard_metric_id(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> shard_metrics_id(). shard_metric_id(DB, ShardId) -> @@ -90,43 +99,45 @@ init_for_shard(ShardId) -> %% @doc Increase the number of successfully flushed batches -spec inc_egress_batches(shard_metrics_id()) -> ok. inc_egress_batches(Id) -> - emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches'). + catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches'). %% @doc Increase the number of time the egress worker had to retry %% flushing the batch -spec inc_egress_batches_retry(shard_metrics_id()) -> ok. inc_egress_batches_retry(Id) -> - emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry'). + catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry'). %% @doc Increase the number of time the egress worker encountered an %% unrecoverable error while trying to flush the batch -spec inc_egress_batches_failed(shard_metrics_id()) -> ok. inc_egress_batches_failed(Id) -> - emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed'). + catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed'). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_messages(Id, NMessages) -> - emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages). + catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_bytes(Id, NMessages) -> - emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages). + catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages). %% @doc Add a sample of elapsed time spent flushing the egress to the %% Raft log (in microseconds) -spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. observe_egress_flush_time(Id, FlushTime) -> - emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime). + catch emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime). -%% @doc Add a sample of elapsed time spent waiting for a +-spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +observe_store_batch_time({DB, _}, StoreTime) -> + catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_store_batch_time', StoreTime). + +%% @doc Add a sample of elapsed time spent waiting for a batch %% `emqx_ds_replication_layer:next' --spec observe_next_time(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), non_neg_integer()) -> - ok. -observe_next_time(DB, Shard, NextTime) -> - Id = shard_metric_id(DB, Shard), - emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_builtin_next_time', NextTime). +-spec observe_next_time(emqx_ds:db(), non_neg_integer()) -> ok. +observe_next_time(DB, NextTime) -> + catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_builtin_next_time', NextTime). prometheus_meta() -> lists:map( @@ -136,11 +147,56 @@ prometheus_meta() -> ({slide, A}) -> {A, counter, A} end, - ?SHARD_METRICS + ?DB_METRICS ++ ?SHARD_METRICS ). prometheus_collect(NodeOrAggr) -> - prometheus_per_shard(NodeOrAggr). + maps:merge(prometheus_per_db(NodeOrAggr), prometheus_per_shard(NodeOrAggr)). + +prometheus_per_db(NodeOrAggr) -> + lists:foldl( + fun(DB, Acc) -> + prometheus_per_db(NodeOrAggr, DB, Acc) + end, + #{}, + emqx_ds_builtin_db_sup:which_dbs() + ). + +%% This function returns the data in the following format: +%% ``` +%% #{emqx_ds_store_batch_time => +%% [{[{db, emqx_persistent_message}], 42}], +%% ... +%% ''' +%% +%% If `NodeOrAggr' = `node' then node name is appended to the list of +%% labels. +prometheus_per_db(NodeOrAggr, DB, Acc0) -> + Labels = [ + {db, DB} + | case NodeOrAggr of + node -> []; + _ -> [{node, node()}] + end + ], + #{counters := CC, slides := SS} = emqx_metrics_worker:get_metrics(?WORKER, DB), + %% Collect counters: + Acc1 = maps:fold( + fun(MetricId, Value, Acc1) -> + append_to_key(MetricId, {Labels, Value}, Acc1) + end, + Acc0, + CC + ), + %% Collect slides: + maps:fold( + fun(MetricId, Value, Acc2) -> + Acc3 = append_to_key(MetricId, slide_value(current, Value, Labels), Acc2), + append_to_key(MetricId, slide_value(last5m, Value, Labels), Acc3) + end, + Acc1, + SS + ). %% This function returns the data in the following format: %% ``` diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 2d4982af3..f8478bb72 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -332,7 +332,7 @@ next(DB, Iter0, BatchSize) -> T0 = erlang:monotonic_time(microsecond), Result = ra_next(DB, Shard, StorageIter0, BatchSize), T1 = erlang:monotonic_time(microsecond), - emqx_ds_builtin_metrics:observe_next_time(DB, Shard, T1 - T0), + emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0), case Result of {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index cbc0e9abf..28ce1d943 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -258,7 +258,11 @@ store_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> shard => Shard, messages => Messages, options => Options }), #{module := Mod, data := GenData} = generation_at(Shard, Time), - Mod:store_batch(Shard, GenData, Messages, Options); + T0 = erlang:monotonic_time(microsecond), + Result = Mod:store_batch(Shard, GenData, Messages, Options), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), + Result; store_batch(_Shard, [], _Options) -> ok. diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 044e701e3..0ac032824 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -491,7 +491,9 @@ emqx_collect(K = emqx_ds_egress_batches_retry, D) -> counter_metrics(?MG(K, D, [ emqx_collect(K = emqx_ds_egress_batches_failed, D) -> counter_metrics(?MG(K, D, [])); emqx_collect(K = emqx_ds_egress_messages, D) -> counter_metrics(?MG(K, D, [])); emqx_collect(K = emqx_ds_egress_bytes, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, [])). +emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_store_batch_time, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_builtin_next_time, D) -> gauge_metrics(?MG(K, D, [])). %%-------------------------------------------------------------------- %% Indicators