diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 1470b7d8b..ed29ea614 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -253,8 +253,12 @@ persist_publish(Msg) -> case emqx_persistent_message:persist(Msg) of ok -> [persisted]; - {_SkipOrError, _Reason} -> - % TODO: log errors? + {skipped, _} -> + []; + {error, Recoverable, Reason} -> + ?SLOG(debug, #{ + msg => "failed_to_persist_message", is_recoverable => Recoverable, reason => Reason + }), [] end. diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index 6b8b60209..13ac40c68 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -222,7 +222,9 @@ % Messages delivered {counter, 'messages.delivered'}, % Messages acked - {counter, 'messages.acked'} + {counter, 'messages.acked'}, + % Messages persistently stored + {counter, 'messages.persisted'} ] ). @@ -718,4 +720,5 @@ reserved_idx('overload_protection.gc') -> 403; reserved_idx('overload_protection.new_conn') -> 404; reserved_idx('messages.validation_succeeded') -> 405; reserved_idx('messages.validation_failed') -> 406; +reserved_idx('messages.persisted') -> 407; reserved_idx(_) -> undefined. diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index e3fa23296..c909c5c5f 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -98,7 +98,7 @@ pre_config_update(_Root, _NewConf, _OldConf) -> %%-------------------------------------------------------------------- -spec persist(emqx_types:message()) -> - ok | {skipped, _Reason} | {error, _TODO}. + emqx_ds:store_batch_result() | {skipped, needs_no_persistence}. persist(Msg) -> ?WHEN_ENABLED( case needs_persistence(Msg) andalso has_subscribers(Msg) of @@ -114,6 +114,7 @@ needs_persistence(Msg) -> -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result(). store_message(Msg) -> + emqx_metrics:inc('messages.persisted'), emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}). has_subscribers(#message{topic = Topic}) -> diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index 13458b4b4..40f2ba2b3 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -67,7 +67,8 @@ %, sent_bytes validation_succeeded, validation_failed, - dropped + dropped, + persisted ]). -define(GAUGE_SAMPLER_LIST, [ @@ -87,7 +88,8 @@ sent => sent_msg_rate, validation_succeeded => validation_succeeded_rate, validation_failed => validation_failed_rate, - dropped => dropped_msg_rate + dropped => dropped_msg_rate, + persisted => persisted_rate }). -define(CURRENT_SAMPLE_NON_RATE, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 6a9e868dd..fe0476e6d 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -428,7 +428,8 @@ stats(sent) -> emqx_metrics:val('messages.sent'); stats(sent_bytes) -> emqx_metrics:val('bytes.sent'); stats(validation_succeeded) -> emqx_metrics:val('messages.validation_succeeded'); stats(validation_failed) -> emqx_metrics:val('messages.validation_failed'); -stats(dropped) -> emqx_metrics:val('messages.dropped'). +stats(dropped) -> emqx_metrics:val('messages.dropped'); +stats(persisted) -> emqx_metrics:val('messages.persisted'). %% ------------------------------------------------------------------------------------------------- %% Retained && License Quota diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 3ffadc1b2..1b6773b87 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -192,6 +192,8 @@ swagger_desc(validation_succeeded) -> swagger_desc_format("Message validations succeeded "); swagger_desc(validation_failed) -> swagger_desc_format("Message validations failed "); +swagger_desc(persisted) -> + swagger_desc_format("Messages saved to the durable storage "); swagger_desc(subscriptions) -> <<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>; swagger_desc(topics) -> @@ -218,6 +220,8 @@ swagger_desc(validation_succeeded_rate) -> swagger_desc_format("Message validations succeeded ", per); swagger_desc(validation_failed_rate) -> swagger_desc_format("Message validations failed ", per); +swagger_desc(persisted_rate) -> + swagger_desc_format("Messages saved to the durable storage ", per); swagger_desc(retained_msg_count) -> <<"Retained messages count at the time of sampling.", ?APPROXIMATE_DESC>>; swagger_desc(shared_subscriptions) -> diff --git a/apps/emqx_durable_storage/include/emqx_ds.hrl b/apps/emqx_durable_storage/include/emqx_ds.hrl index f24605175..cc7a7431f 100644 --- a/apps/emqx_durable_storage/include/emqx_ds.hrl +++ b/apps/emqx_durable_storage/include/emqx_ds.hrl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --ifndef(EMQX_DS_HRL_HRL). --define(EMQX_DS_HRL_HRL, true). +-ifndef(EMQX_DS_HRL). +-define(EMQX_DS_HRL, true). -endif. diff --git a/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl b/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl new file mode 100644 index 000000000..0a82a6682 --- /dev/null +++ b/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_DS_METRICS_HRL). +-define(EMQX_DS_METRICS_HRL, true). + +%%%% Egress metrics: + +%% Number of successfully flushed batches: +-define(DS_EGRESS_BATCHES, emqx_ds_egress_batches). +%% Number of batch flush retries: +-define(DS_EGRESS_BATCHES_RETRY, emqx_ds_egress_batches_retry). +%% Number of batches that weren't flushed due to unrecoverable errors: +-define(DS_EGRESS_BATCHES_FAILED, emqx_ds_egress_batches_failed). +%% Total number of messages that were successfully committed to the storage: +-define(DS_EGRESS_MESSAGES, emqx_ds_egress_messages). +%% Total size of payloads that were successfully committed to the storage: +-define(DS_EGRESS_BYTES, emqx_ds_egress_bytes). +%% Sliding average of flush time (microseconds): +-define(DS_EGRESS_FLUSH_TIME, emqx_ds_egress_flush_time). + +%%%% Storage layer metrics: +-define(DS_STORE_BATCH_TIME, emqx_ds_store_batch_time). +-define(DS_BUILTIN_NEXT_TIME, emqx_ds_builtin_next_time). + +%%% LTS Storage counters: + +%% This counter is incremented when the iterator seeks to the next interval: +-define(DS_LTS_SEEK_COUNTER, emqx_ds_storage_bitfield_lts_counter_seek). +%% This counter is incremented when the iterator proceeds to the next +%% key within the interval (this is is best case scenario): +-define(DS_LTS_NEXT_COUNTER, emqx_ds_storage_bitfield_lts_counter_next). +%% This counter is incremented when the key passes bitmask check, but +%% the value is rejected by the subsequent post-processing: +-define(DS_LTS_COLLISION_COUNTER, emqx_ds_storage_bitfield_lts_counter_collision). + +-endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 79e2f6120..a697b9276 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -31,7 +31,7 @@ ensure_shard/1, ensure_egress/1 ]). --export([which_shards/1]). +-export([which_dbs/0, which_shards/1]). %% behaviour callbacks: -export([init/1]). @@ -104,6 +104,13 @@ ensure_egress(Shard) -> which_shards(DB) -> supervisor:which_children(?via(#?shards_sup{db = DB})). +%% @doc Return the list of builtin DS databases that are currently +%% active on the node. +-spec which_dbs() -> [emqx_ds:db()]. +which_dbs() -> + Key = {n, l, #?db_sup{_ = '_', db = '$1'}}, + gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]). + %%================================================================================ %% behaviour callbacks %%================================================================================ @@ -111,6 +118,7 @@ which_shards(DB) -> init({#?db_sup{db = DB}, DefaultOpts}) -> %% Spec for the top-level supervisor for the database: logger:notice("Starting DS DB ~p", [DB]), + emqx_ds_builtin_metrics:init_for_db(DB), Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), ok = start_ra_system(DB, Opts), Children = [ diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl new file mode 100644 index 000000000..ce984db57 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -0,0 +1,299 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_builtin_metrics). + +%% DS-facing API: +-export([child_spec/0, init_for_db/1, shard_metric_id/2, init_for_shard/1]). + +%% Prometheus-facing API: +-export([prometheus_meta/0, prometheus_collect/1]). + +-export([ + inc_egress_batches/1, + inc_egress_batches_retry/1, + inc_egress_batches_failed/1, + inc_egress_messages/2, + inc_egress_bytes/2, + + observe_egress_flush_time/2, + + observe_store_batch_time/2, + + observe_next_time/2, + + inc_lts_seek_counter/2, + inc_lts_next_counter/2, + inc_lts_collision_counter/2 +]). + +%% behavior callbacks: +-export([]). + +%% internal exports: +-export([]). + +-export_type([shard_metrics_id/0]). + +-include("emqx_ds_metrics.hrl"). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(WORKER, ?MODULE). + +-define(STORAGE_LAYER_METRICS, [ + {slide, ?DS_STORE_BATCH_TIME}, + {counter, ?DS_LTS_SEEK_COUNTER}, + {counter, ?DS_LTS_NEXT_COUNTER}, + {counter, ?DS_LTS_COLLISION_COUNTER} +]). + +-define(FETCH_METRICS, [ + {slide, ?DS_BUILTIN_NEXT_TIME} +]). + +-define(DB_METRICS, ?STORAGE_LAYER_METRICS ++ ?FETCH_METRICS). + +-define(EGRESS_METRICS, [ + {counter, ?DS_EGRESS_BATCHES}, + {counter, ?DS_EGRESS_BATCHES_RETRY}, + {counter, ?DS_EGRESS_BATCHES_FAILED}, + {counter, ?DS_EGRESS_MESSAGES}, + {counter, ?DS_EGRESS_BYTES}, + {slide, ?DS_EGRESS_FLUSH_TIME} +]). + +-define(SHARD_METRICS, ?EGRESS_METRICS). + +-type shard_metrics_id() :: binary(). + +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + +%%================================================================================ +%% API functions +%%================================================================================ + +-spec child_spec() -> supervisor:child_spec(). +child_spec() -> + emqx_metrics_worker:child_spec(?WORKER). + +%% @doc Initialize metrics that are global for a DS database +-spec init_for_db(emqx_ds:db()) -> ok. +init_for_db(DB) -> + emqx_metrics_worker:create_metrics(?WORKER, DB, ?DB_METRICS, []). + +-spec shard_metric_id(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> shard_metrics_id(). +shard_metric_id(DB, ShardId) -> + iolist_to_binary([atom_to_list(DB), $/, ShardId]). + +%% @doc Initialize metrics that are specific for the shard. +-spec init_for_shard(shard_metrics_id()) -> ok. +init_for_shard(ShardId) -> + emqx_metrics_worker:create_metrics(?WORKER, ShardId, ?SHARD_METRICS, []). + +%% @doc Increase the number of successfully flushed batches +-spec inc_egress_batches(shard_metrics_id()) -> ok. +inc_egress_batches(Id) -> + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES). + +%% @doc Increase the number of time the egress worker had to retry +%% flushing the batch +-spec inc_egress_batches_retry(shard_metrics_id()) -> ok. +inc_egress_batches_retry(Id) -> + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_RETRY). + +%% @doc Increase the number of time the egress worker encountered an +%% unrecoverable error while trying to flush the batch +-spec inc_egress_batches_failed(shard_metrics_id()) -> ok. +inc_egress_batches_failed(Id) -> + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_FAILED). + +%% @doc Increase the number of messages successfully saved to the shard +-spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok. +inc_egress_messages(Id, NMessages) -> + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_MESSAGES, NMessages). + +%% @doc Increase the number of messages successfully saved to the shard +-spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok. +inc_egress_bytes(Id, NMessages) -> + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BYTES, NMessages). + +%% @doc Add a sample of elapsed time spent flushing the egress to the +%% Raft log (in microseconds) +-spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. +observe_egress_flush_time(Id, FlushTime) -> + catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_EGRESS_FLUSH_TIME, FlushTime). + +-spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +observe_store_batch_time({DB, _}, StoreTime) -> + catch emqx_metrics_worker:observe(?WORKER, DB, ?DS_STORE_BATCH_TIME, StoreTime). + +%% @doc Add a sample of elapsed time spent waiting for a batch +%% `emqx_ds_replication_layer:next' +-spec observe_next_time(emqx_ds:db(), non_neg_integer()) -> ok. +observe_next_time(DB, NextTime) -> + catch emqx_metrics_worker:observe(?WORKER, DB, ?DS_BUILTIN_NEXT_TIME, NextTime). + +-spec inc_lts_seek_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +inc_lts_seek_counter({DB, _}, Inc) -> + catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_SEEK_COUNTER, Inc). + +-spec inc_lts_next_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +inc_lts_next_counter({DB, _}, Inc) -> + catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_NEXT_COUNTER, Inc). + +-spec inc_lts_collision_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +inc_lts_collision_counter({DB, _}, Inc) -> + catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_COLLISION_COUNTER, Inc). + +prometheus_meta() -> + lists:map( + fun + ({counter, A}) -> + {A, counter, A}; + ({slide, A}) -> + {A, counter, A} + end, + ?DB_METRICS ++ ?SHARD_METRICS + ). + +prometheus_collect(NodeOrAggr) -> + maps:merge(prometheus_per_db(NodeOrAggr), prometheus_per_shard(NodeOrAggr)). + +prometheus_per_db(NodeOrAggr) -> + lists:foldl( + fun(DB, Acc) -> + prometheus_per_db(NodeOrAggr, DB, Acc) + end, + #{}, + emqx_ds_builtin_db_sup:which_dbs() + ). + +%% This function returns the data in the following format: +%% ``` +%% #{emqx_ds_store_batch_time => +%% [{[{db, emqx_persistent_message}], 42}], +%% ... +%% ''' +%% +%% If `NodeOrAggr' = `node' then node name is appended to the list of +%% labels. +prometheus_per_db(NodeOrAggr, DB, Acc0) -> + Labels = [ + {db, DB} + | case NodeOrAggr of + node -> []; + _ -> [{node, node()}] + end + ], + #{counters := CC, slides := SS} = emqx_metrics_worker:get_metrics(?WORKER, DB), + %% Collect counters: + Acc1 = maps:fold( + fun(MetricId, Value, Acc1) -> + append_to_key(MetricId, {Labels, Value}, Acc1) + end, + Acc0, + CC + ), + %% Collect slides: + maps:fold( + fun(MetricId, Value, Acc2) -> + Acc3 = append_to_key(MetricId, slide_value(current, Value, Labels), Acc2), + append_to_key(MetricId, slide_value(last5m, Value, Labels), Acc3) + end, + Acc1, + SS + ). + +%% This function returns the data in the following format: +%% ``` +%% #{emqx_ds_egress_batches => +%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],99408}, +%% {[{db,emqx_persistent_message},{shard,<<"0">>}],99409}], +%% emqx_ds_egress_batches_retry => +%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],0}, +%% {[{db,emqx_persistent_message},{shard,<<"0">>}],0}], +%% emqx_ds_egress_messages => +%% ... +%% } +%% ''' +%% +%% If `NodeOrAggr' = `node' then node name is appended to the list of +%% labels. +prometheus_per_shard(NodeOrAggr) -> + lists:foldl( + fun(DB, Acc0) -> + lists:foldl( + fun(Shard, Acc) -> + prometheus_per_shard(NodeOrAggr, DB, Shard, Acc) + end, + Acc0, + emqx_ds_replication_layer_meta:shards(DB) + ) + end, + #{}, + emqx_ds_builtin_db_sup:which_dbs() + ). + +prometheus_per_shard(NodeOrAggr, DB, Shard, Acc0) -> + Labels = [ + {db, DB}, + {shard, Shard} + | case NodeOrAggr of + node -> []; + _ -> [{node, node()}] + end + ], + #{counters := CC, slides := SS} = emqx_metrics_worker:get_metrics( + ?WORKER, shard_metric_id(DB, Shard) + ), + %% Collect counters: + Acc1 = maps:fold( + fun(MetricId, Value, Acc1) -> + append_to_key(MetricId, {Labels, Value}, Acc1) + end, + Acc0, + CC + ), + %% Collect slides: + maps:fold( + fun(MetricId, Value, Acc2) -> + Acc3 = append_to_key(MetricId, slide_value(current, Value, Labels), Acc2), + append_to_key(MetricId, slide_value(last5m, Value, Labels), Acc3) + end, + Acc1, + SS + ). + +-spec append_to_key(K, V, #{K => [V]}) -> #{K => [V]}. +append_to_key(Key, Value, Map) -> + maps:update_with( + Key, + fun(L) -> + [Value | L] + end, + [Value], + Map + ). + +slide_value(Interval, Value, Labels0) -> + Labels = [{interval, Interval} | Labels0], + {Labels, maps:get(Interval, Value, 0)}. + +%%================================================================================ +%% Internal functions +%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl index 50ed18de1..45e81bdc9 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl @@ -81,6 +81,7 @@ stop_db(DB) -> %% Chidren are attached dynamically to this one. init(?top) -> %% Children: + MetricsWorker = emqx_ds_builtin_metrics:child_spec(), MetadataServer = #{ id => metadata_server, start => {emqx_ds_replication_layer_meta, start_link, []}, @@ -102,7 +103,7 @@ init(?top) -> period => 1, auto_shutdown => never }, - {ok, {SupFlags, [MetadataServer, DBsSup]}}; + {ok, {SupFlags, [MetricsWorker, MetadataServer, DBsSup]}}; init(?databases) -> %% Children are added dynamically: SupFlags = #{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 14c2268b8..ecc6a492e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -329,7 +329,11 @@ next(DB, Iter0, BatchSize) -> %% %% This kind of trickery should be probably done here in the %% replication layer. Or, perhaps, in the logic layer. - case ra_next(DB, Shard, StorageIter0, BatchSize) of + T0 = erlang:monotonic_time(microsecond), + Result = ra_next(DB, Shard, StorageIter0, BatchSize), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0), + case Result of {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; @@ -547,6 +551,8 @@ list_nodes() -> end ). +-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) -> + ok | {timeout, _} | {error, recoverable | unrecoverable, _Err} | _Err. ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 128aeb380..4122d937d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -40,6 +40,7 @@ -export_type([]). +-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). %%================================================================================ @@ -49,8 +50,13 @@ -define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}). -define(flush, flush). --record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}). --record(enqueue_atomic_req, {batch :: [emqx_types:message()], sync :: boolean()}). +-record(enqueue_req, { + messages :: [emqx_types:message()], + sync :: boolean(), + atomic :: boolean(), + n_messages :: non_neg_integer(), + payload_bytes :: non_neg_integer() +}). %%================================================================================ %% API functions @@ -61,44 +67,32 @@ start_link(DB, Shard) -> gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []). -spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> - ok. + emqx_ds:store_batch_result(). store_batch(DB, Messages, Opts) -> Sync = maps:get(sync, Opts, true), - case maps:get(atomic, Opts, false) of - false -> - lists:foreach( - fun(Message) -> - Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), - gen_server:call( - ?via(DB, Shard), - #enqueue_req{ - message = Message, - sync = Sync - }, - infinity - ) - end, - Messages + Atomic = maps:get(atomic, Opts, false), + %% Usually we expect all messages in the batch to go into the + %% single shard, so this function is optimized for the happy case. + case shards_of_batch(DB, Messages) of + [{Shard, {NMsgs, NBytes}}] -> + %% Happy case: + enqueue_call_or_cast( + ?via(DB, Shard), + #enqueue_req{ + messages = Messages, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes + } ); - true -> - maps:foreach( - fun(Shard, Batch) -> - gen_server:call( - ?via(DB, Shard), - #enqueue_atomic_req{ - batch = Batch, - sync = Sync - }, - infinity - ) - end, - maps:groups_from_list( - fun(Message) -> - emqx_ds_replication_layer:shard_of_message(DB, Message, clientid) - end, - Messages - ) - ) + [_, _ | _] when Atomic -> + %% It's impossible to commit a batch to multiple shards + %% atomically + {error, unrecoverable, atomic_commit_to_multiple_shards}; + _Shards -> + %% Use a slower implementation for the unlikely case: + repackage_messages(DB, Messages, Sync) end. %%================================================================================ @@ -108,35 +102,65 @@ store_batch(DB, Messages, Opts) -> -record(s, { db :: emqx_ds:db(), shard :: emqx_ds_replication_layer:shard_id(), + metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(), + n_retries = 0 :: non_neg_integer(), + %% FIXME: Currently max_retries is always 0, because replication + %% layer doesn't guarantee idempotency. Retrying would create + %% duplicate messages. + max_retries = 0 :: non_neg_integer(), n = 0 :: non_neg_integer(), - tref :: reference(), - batch = [] :: [emqx_types:message()], + n_bytes = 0 :: non_neg_integer(), + tref :: undefined | reference(), + queue :: queue:queue(emqx_types:message()), pending_replies = [] :: [gen_server:from()] }). init([DB, Shard]) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), + logger:update_process_metadata(#{domain => [emqx, ds, egress, DB]}), + MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard), + ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId), S = #s{ db = DB, shard = Shard, - tref = start_timer() + metrics_id = MetricsId, + queue = queue:new() }, - {ok, S}. + {ok, start_timer(S)}. -handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) -> - do_enqueue(From, Sync, Msg, S); -handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync}, From, S) -> - Len = length(Batch), - do_enqueue(From, Sync, {atomic, Len, Batch}, S); +handle_call( + #enqueue_req{ + messages = Msgs, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes + }, + From, + S0 = #s{pending_replies = Replies0} +) -> + S = S0#s{pending_replies = [From | Replies0]}, + {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. +handle_cast( + #enqueue_req{ + messages = Msgs, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes + }, + S +) -> + {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; handle_cast(_Cast, S) -> {noreply, S}. handle_info(?flush, S) -> - {noreply, do_flush(S)}; + {noreply, flush(S)}; handle_info(_Info, S) -> {noreply, S}. @@ -151,80 +175,215 @@ terminate(_Reason, _S) -> %% Internal functions %%================================================================================ +enqueue( + Sync, + Atomic, + Msgs, + BatchSize, + BatchBytes, + S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0} +) -> + %% At this point we don't split the batches, even when they aren't + %% atomic. It wouldn't win us anything in terms of memory, and + %% EMQX currently feeds data to DS in very small batches, so + %% granularity should be fine enough. + NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), + NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity), + NMsgs = NMsgs0 + BatchSize, + NBytes = NBytes0 + BatchBytes, + case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of + true -> + %% Adding this batch would cause buffer to overflow. Flush + %% it now, and retry: + cancel_timer(S0), + S1 = flush(S0), + enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); + false -> + %% The buffer is empty, we enqueue the atomic batch in its + %% entirety: + Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), + S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1}, + case NMsgs >= NMax orelse NBytes >= NBytes of + true -> + cancel_timer(S1), + flush(S1); + false -> + S1 + end + end. + -define(COOLDOWN_MIN, 1000). -define(COOLDOWN_MAX, 5000). -do_flush(S = #s{batch = []}) -> - S#s{tref = start_timer()}; +flush(S) -> + start_timer(do_flush(S)). + +do_flush(S0 = #s{n = 0}) -> + S0; do_flush( - S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} + S = #s{ + queue = Q, + pending_replies = Replies, + db = DB, + shard = Shard, + metrics_id = Metrics, + n_retries = Retries, + max_retries = MaxRetries + } ) -> - case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of + Messages = queue:to_list(Q), + T0 = erlang:monotonic_time(microsecond), + Result = emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_egress_flush_time(Metrics, T1 - T0), + case Result of ok -> + emqx_ds_builtin_metrics:inc_egress_batches(Metrics), + emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n), + emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes), ?tp( emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages} ), lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), - true = erlang:garbage_collect(), - ok; - Error -> - true = erlang:garbage_collect(), + erlang:garbage_collect(), + S#s{ + n = 0, + n_bytes = 0, + queue = queue:new(), + pending_replies = [] + }; + {timeout, ServerId} when Retries < MaxRetries -> + %% Note: this is a hot loop, so we report error messages + %% with `debug' level to avoid wiping the logs. Instead, + %% error the detection must rely on the metrics. Debug + %% logging can be enabled for the particular egress server + %% via logger domain. ?tp( - warning, - emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Error} + debug, + emqx_ds_replication_layer_egress_flush_retry, + #{db => DB, shard => Shard, reason => timeout, server_id => ServerId} ), - Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), - ok = timer:sleep(Cooldown), - %% Since we drop the entire batch here, we at least reply callers with an - %% error so they don't hang indefinitely in the `gen_server' call with - %% `infinity' timeout. - lists:foreach(fun(From) -> gen_server:reply(From, {error, Error}) end, Replies) - end, - S#s{ - n = 0, - batch = [], - pending_replies = [], - tref = start_timer() - }. + %% Retry sending the batch: + emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics), + erlang:garbage_collect(), + %% We block the gen_server until the next retry. + BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), + timer:sleep(BlockTime), + S#s{n_retries = Retries + 1}; + Err -> + ?tp( + debug, + emqx_ds_replication_layer_egress_flush_failed, + #{db => DB, shard => Shard, error => Err} + ), + emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics), + Reply = + case Err of + {error, _, _} -> Err; + {timeout, ServerId} -> {error, recoverable, {timeout, ServerId}}; + _ -> {error, unrecoverable, Err} + end, + lists:foreach( + fun(From) -> gen_server:reply(From, Reply) end, Replies + ), + erlang:garbage_collect(), + S#s{ + n = 0, + n_bytes = 0, + queue = queue:new(), + pending_replies = [], + n_retries = 0 + } + end. -do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> - NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), - S1 = - case MsgOrBatch of - {atomic, NumMsgs, Msgs} -> - S0#s{n = N + NumMsgs, batch = Msgs ++ Batch}; - Msg -> - S0#s{n = N + 1, batch = [Msg | Batch]} - end, - %% TODO: later we may want to delay the reply until the message is - %% replicated, but it requies changes to the PUBACK/PUBREC flow to - %% allow for async replies. For now, we ack when the message is - %% _buffered_ rather than stored. - %% - %% Otherwise, the client would freeze for at least flush interval, - %% or until the buffer is filled. - S2 = - case Sync of - true -> - S1#s{pending_replies = [From | Replies]}; - false -> - gen_server:reply(From, ok), - S1 - end, - S = - case N >= NMax of - true -> - _ = erlang:cancel_timer(S2#s.tref), - do_flush(S2); - false -> - S2 - end, - %% TODO: add a backpressure mechanism for the server to avoid - %% building a long message queue. - {noreply, S}. +-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) -> + [{emqx_ds_replication_layer:shard_id(), {NMessages, NBytes}}] +when + NMessages :: non_neg_integer(), + NBytes :: non_neg_integer(). +shards_of_batch(DB, Messages) -> + maps:to_list( + lists:foldl( + fun(Message, Acc) -> + %% TODO: sharding strategy must be part of the DS DB schema: + Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), + Size = payload_size(Message), + maps:update_with( + Shard, + fun({N, S}) -> + {N + 1, S + Size} + end, + {1, Size}, + Acc + ) + end, + #{}, + Messages + ) + ). -start_timer() -> +repackage_messages(DB, Messages, Sync) -> + Batches = lists:foldl( + fun(Message, Acc) -> + Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), + Size = payload_size(Message), + maps:update_with( + Shard, + fun({N, S, Msgs}) -> + {N + 1, S + Size, [Message | Msgs]} + end, + {1, Size, [Message]}, + Acc + ) + end, + #{}, + Messages + ), + maps:fold( + fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> + Err = enqueue_call_or_cast( + ?via(DB, Shard), + #enqueue_req{ + messages = lists:reverse(RevMessages), + sync = Sync, + atomic = false, + n_messages = NMsgs, + payload_bytes = ByteSize + } + ), + compose_errors(ErrAcc, Err) + end, + ok, + Batches + ). + +enqueue_call_or_cast(To, Req = #enqueue_req{sync = true}) -> + gen_server:call(To, Req, infinity); +enqueue_call_or_cast(To, Req = #enqueue_req{sync = false}) -> + gen_server:cast(To, Req). + +compose_errors(ErrAcc, ok) -> + ErrAcc; +compose_errors(ok, Err) -> + Err; +compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) -> + {error, unrecoverable, Err}; +compose_errors(ErrAcc, _Err) -> + ErrAcc. + +start_timer(S) -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), - erlang:send_after(Interval, self(), ?flush). + Tref = erlang:send_after(Interval, self(), ?flush), + S#s{tref = Tref}. + +cancel_timer(#s{tref = undefined}) -> + ok; +cancel_timer(#s{tref = TRef}) -> + _ = erlang:cancel_timer(TRef), + ok. + +%% @doc Return approximate size of the MQTT message (it doesn't take +%% all things into account, for example headers and extras) +payload_size(#message{payload = P, topic = T}) -> + size(P) + size(T). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 594854d21..2ec6674b6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -44,6 +44,7 @@ -export_type([options/0]). +-include("emqx_ds_metrics.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). @@ -115,8 +116,6 @@ ?last_seen_key := binary() }. --define(COUNTER, emqx_ds_storage_bitfield_lts_counter). - %% Limit on the number of wildcard levels in the learned topic trie: -define(WILDCARD_LIMIT, 10). @@ -140,6 +139,8 @@ -define(DIM_TOPIC, 1). -define(DIM_TS, 2). +-define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -347,13 +348,18 @@ update_iterator( ) -> {ok, OldIter#{?last_seen_key => DSKey}}. -next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> +next(Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. + init_counters(), Now = emqx_ds:timestamp_us(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, - next_until(Schema, It, SafeCutoffTime, BatchSize). + try + next_until(Schema, It, SafeCutoffTime, BatchSize) + after + report_counters(Shard) + end. next_until(_Schema, It = #{?tag := ?IT, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when StartTime >= SafeCutoffTime @@ -375,20 +381,23 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, filter := Filter } = prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers), try - put(?COUNTER, 0), next_loop(ITHandle, Keymapper, Filter, SafeCutoffTime, It, [], BatchSize) after - rocksdb:iterator_close(ITHandle), - erase(?COUNTER) + rocksdb:iterator_close(ITHandle) end. -delete_next(_Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> +delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. + init_counters(), Now = emqx_message:timestamp_now(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, - delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize). + try + delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) + after + report_counters(Shard) + end. delete_next_until( _Schema, @@ -417,7 +426,6 @@ delete_next_until( DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers ), try - put(?COUNTER, 0), LoopContext = LoopContext0#{ db => DB, cf => CF, @@ -430,8 +438,7 @@ delete_next_until( }, delete_next_loop(LoopContext) after - rocksdb:iterator_close(ITHandle), - erase(?COUNTER) + rocksdb:iterator_close(ITHandle) end. %%================================================================================ @@ -477,7 +484,6 @@ prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Key next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {ok, It, lists:reverse(Acc)}; next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> - inc_counter(), #{?tag := ?IT, ?last_seen_key := Key0} = It0, case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of overflow -> @@ -485,6 +491,7 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> Key1 -> %% assert true = Key1 > Key0, + inc_counter(?DS_LTS_SEEK_COUNTER), case rocksdb:iterator_move(ITHandle, {seek, Key1}) of {ok, Key, Val} -> {N, It, Acc} = traverse_interval( @@ -510,6 +517,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) - Acc = [{Key, Msg} | Acc0], traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1); false -> + inc_counter(?DS_LTS_COLLISION_COUNTER), traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N) end; overflow -> @@ -521,7 +529,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) - traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {0, It, Acc}; traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) -> - inc_counter(), + inc_counter(?DS_LTS_NEXT_COUNTER), case rocksdb:iterator_move(ITHandle, next) of {ok, Key, Val} -> traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N); @@ -541,7 +549,7 @@ delete_next_loop(LoopContext0) -> iterated_over := AccIter0, it_handle := ITHandle } = LoopContext0, - inc_counter(), + inc_counter(?DS_LTS_SEEK_COUNTER), #{?tag := ?DELETE_IT, ?last_seen_key := Key0} = It0, case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of overflow -> @@ -623,7 +631,7 @@ delete_traverse_interval1(LoopContext0) -> iterated_over := AccIter, storage_iter := It } = LoopContext0, - inc_counter(), + inc_counter(?DS_LTS_NEXT_COUNTER), case rocksdb:iterator_move(ITHandle, next) of {ok, Key, Val} -> delete_traverse_interval(LoopContext0#{ @@ -767,9 +775,20 @@ read_persisted_trie(IT, {ok, KeyB, ValB}) -> read_persisted_trie(_IT, {error, invalid_iterator}) -> []. -inc_counter() -> - N = get(?COUNTER), - put(?COUNTER, N + 1). +inc_counter(Counter) -> + N = get(Counter), + put(Counter, N + 1). + +init_counters() -> + _ = [put(I, 0) || I <- ?DS_LTS_COUNTERS], + ok. + +report_counters(Shard) -> + emqx_ds_builtin_metrics:inc_lts_seek_counter(Shard, get(?DS_LTS_SEEK_COUNTER)), + emqx_ds_builtin_metrics:inc_lts_next_counter(Shard, get(?DS_LTS_NEXT_COUNTER)), + emqx_ds_builtin_metrics:inc_lts_collision_counter(Shard, get(?DS_LTS_COLLISION_COUNTER)), + _ = [erase(I) || I <- ?DS_LTS_COUNTERS], + ok. %% @doc Generate a column family ID for the MQTT messages -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 6b85328b6..28ce1d943 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -254,8 +254,15 @@ drop_shard(Shard) -> store_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. + ?tp(emqx_ds_storage_layer_store_batch, #{ + shard => Shard, messages => Messages, options => Options + }), #{module := Mod, data := GenData} = generation_at(Shard, Time), - Mod:store_batch(Shard, GenData, Messages, Options); + T0 = erlang:monotonic_time(microsecond), + Result = Mod:store_batch(Shard, GenData, Messages, Options), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), + Result; store_batch(_Shard, [], _Options) -> ok. diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 3df16dc1c..1d2daacbb 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -322,17 +322,10 @@ t_09_atomic_store_batch(_Config) -> sync => true }) ), - - ok + {ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush}), + ?assertMatch(#{batch := [_, _, _]}, Flush) end, - fun(Trace) -> - %% Must contain exactly one flush with all messages. - ?assertMatch( - [#{batch := [_, _, _]}], - ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) - ), - ok - end + [] ), ok. @@ -355,14 +348,15 @@ t_10_non_atomic_store_batch(_Config) -> sync => true }) ), - - ok + timer:sleep(1000) end, fun(Trace) -> %% Should contain one flush per message. + Batches = ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)), + ?assertMatch([_], Batches), ?assertMatch( - [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}], - ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) + [_, _, _], + lists:append(Batches) ), ok end @@ -681,10 +675,83 @@ t_error_mapping_replication_layer(_Config) -> length([error || {error, _, _} <- Results2]) > 0, Results2 ), - - snabbkaffe:stop(), meck:unload(). +%% This testcase verifies the behavior of `store_batch' operation +%% when the underlying code experiences recoverable or unrecoverable +%% problems. +t_store_batch_fail(_Config) -> + ?check_trace( + #{timetrap => 15_000}, + try + meck:new(emqx_ds_storage_layer, [passthrough, no_history]), + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), + %% Success: + Batch1 = [ + message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1), + message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})), + %% Inject unrecoverable error: + meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) -> + {error, unrecoverable, mock} + end), + Batch2 = [ + message(<<"C1">>, <<"foo/bar">>, <<"3">>, 1), + message(<<"C1">>, <<"foo/bar">>, <<"4">>, 1) + ], + ?assertMatch( + {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true}) + ), + meck:unload(emqx_ds_storage_layer), + %% Inject a recoveralbe error: + meck:new(ra, [passthrough, no_history]), + meck:expect(ra, process_command, fun(Servers, Shard, Command) -> + ?tp(ra_command, #{servers => Servers, shard => Shard, command => Command}), + {timeout, mock} + end), + Batch3 = [ + message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2), + message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2), + message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3), + message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3) + ], + %% Note: due to idempotency issues the number of retries + %% is currently set to 0: + ?assertMatch( + {error, recoverable, {timeout, mock}}, + emqx_ds:store_batch(DB, Batch3, #{sync => true}) + ), + meck:unload(ra), + ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})), + lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1)) + after + meck:unload() + end, + [ + {"message ordering", fun(StoredMessages, _Trace) -> + [{_, Stream1}, {_, Stream2}] = StoredMessages, + ?assertMatch( + [ + #message{payload = <<"1">>}, + #message{payload = <<"2">>}, + #message{payload = <<"5">>}, + #message{payload = <<"7">>} + ], + Stream1 + ), + ?assertMatch( + [ + #message{payload = <<"6">>}, + #message{payload = <<"8">>} + ], + Stream2 + ) + end} + ] + ). + update_data_set() -> [ [ @@ -748,6 +815,7 @@ init_per_testcase(_TC, Config) -> Config. end_per_testcase(_TC, _Config) -> + snabbkaffe:stop(), ok = application:stop(emqx_durable_storage), mria:stop(), _ = mnesia:delete_schema([node()]), diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 636b57b89..78838e675 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -261,8 +261,7 @@ t_atomic_store_batch(_Config) -> sync => true }) ), - - ok + timer:sleep(1000) end, fun(Trace) -> %% Must contain exactly one flush with all messages. @@ -293,19 +292,18 @@ t_non_atomic_store_batch(_Config) -> sync => true }) ), - - ok + Msgs end, - fun(Trace) -> - %% Should contain one flush per message. - ?assertMatch( - [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}], - ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) + fun(ExpectedMsgs, Trace) -> + ProcessedMsgs = lists:append( + ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)) ), - ok + ?assertEqual( + ExpectedMsgs, + ProcessedMsgs + ) end - ), - ok. + ). check(Shard, TopicFilter, StartTime, ExpectedMessages) -> ExpectedFiltered = lists:filter( diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 44c45248b..be4f7bcdf 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -63,9 +63,17 @@ consume(DB, TopicFilter) -> consume(DB, TopicFilter, 0). consume(DB, TopicFilter, StartTime) -> - Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime), lists:flatmap( - fun({_Rank, Stream}) -> consume_stream(DB, Stream, TopicFilter, StartTime) end, + fun({_Stream, Msgs}) -> + Msgs + end, + consume_per_stream(DB, TopicFilter, StartTime) + ). + +consume_per_stream(DB, TopicFilter, StartTime) -> + Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime), + lists:map( + fun({_Rank, Stream}) -> {Stream, consume_stream(DB, Stream, TopicFilter, StartTime)} end, Streams ). diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 8556e82d3..450033f18 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -37,6 +37,7 @@ -include_lib("public_key/include/public_key.hrl"). -include_lib("prometheus/include/prometheus_model.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_durable_storage/include/emqx_ds_metrics.hrl"). -import( prometheus_model_helpers, @@ -212,11 +213,30 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) -> ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)), ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)), + ok = maybe_add_ds_collect_family(Callback, RawData), ok = maybe_license_add_collect_family(Callback, RawData), ok; collect_mf(_Registry, _Callback) -> ok. +maybe_add_ds_collect_family(Callback, RawData) -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + add_collect_family( + Callback, emqx_ds_builtin_metrics:prometheus_meta(), ?MG(ds_data, RawData) + ); + false -> + ok + end. + +maybe_collect_ds_data(Mode) -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + #{ds_data => emqx_ds_builtin_metrics:prometheus_collect(Mode)}; + false -> + #{} + end. + %% @private collect(<<"json">>) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), @@ -251,7 +271,7 @@ add_collect_family(Name, Data, Callback, Type) -> %% behaviour fetch_from_local_node(Mode) -> - {node(), #{ + {node(), (maybe_collect_ds_data(Mode))#{ stats_data => stats_data(Mode), vm_data => vm_data(Mode), cluster_data => cluster_data(Mode), @@ -480,7 +500,19 @@ emqx_collect(K = emqx_mria_lag, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_bootstrap_time, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])). +emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])); +%% DS +emqx_collect(K = ?DS_EGRESS_BATCHES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_BATCHES_RETRY, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_BATCHES_FAILED, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_MESSAGES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_BYTES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_FLUSH_TIME, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_STORE_BATCH_TIME, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_BUILTIN_NEXT_TIME, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_LTS_SEEK_COUNTER, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_LTS_NEXT_COUNTER, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_LTS_COLLISION_COUNTER, D) -> counter_metrics(?MG(K, D, [])). %%-------------------------------------------------------------------- %% Indicators diff --git a/apps/emqx_utils/README.md b/apps/emqx_utils/README.md index f8c386f3d..d03b34c64 100644 --- a/apps/emqx_utils/README.md +++ b/apps/emqx_utils/README.md @@ -16,6 +16,7 @@ handling, data conversions, and more. - `emqx_utils_json`: JSON encoding and decoding - `emqx_utils_maps`: convenience functions for map lookup and manipulation like deep_get etc. +- `emqx_metrics`: counters, gauges, slides ## Contributing diff --git a/apps/emqx/src/emqx_metrics_worker.erl b/apps/emqx_utils/src/emqx_metrics_worker.erl similarity index 100% rename from apps/emqx/src/emqx_metrics_worker.erl rename to apps/emqx_utils/src/emqx_metrics_worker.erl diff --git a/apps/emqx/test/emqx_metrics_worker_SUITE.erl b/apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl similarity index 99% rename from apps/emqx/test/emqx_metrics_worker_SUITE.erl rename to apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl index 387e069cf..15866feb0 100644 --- a/apps/emqx/test/emqx_metrics_worker_SUITE.erl +++ b/apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl @@ -31,18 +31,17 @@ suite() -> -define(NAME, ?MODULE). init_per_suite(Config) -> - {ok, _} = emqx_metrics_worker:start_link(?NAME), Config. end_per_suite(_Config) -> - ok = emqx_metrics_worker:stop(?NAME). + ok. init_per_testcase(_, Config) -> - ok = emqx_metrics_worker:stop(?NAME), {ok, _} = emqx_metrics_worker:start_link(?NAME), Config. end_per_testcase(_, _Config) -> + ok = emqx_metrics_worker:stop(?NAME), ok. t_get_metrics(_) -> diff --git a/changes/ce/feat-12781.en.md b/changes/ce/feat-12781.en.md new file mode 100644 index 000000000..c884ccbc4 --- /dev/null +++ b/changes/ce/feat-12781.en.md @@ -0,0 +1,29 @@ +Added metrics related to EMQX durable storage to Prometheus. + +New metrics: + +- `emqx_ds_egress_batches` + +- `emqx_ds_egress_batches_retry` + +- `emqx_ds_egress_batches_failed` + +- `emqx_ds_egress_messages` + +- `emqx_ds_egress_bytes` + +- `emqx_ds_egress_flush_time` + +- `emqx_ds_store_batch_time` + +- `emqx_ds_builtin_next_time` + +- `emqx_ds_storage_bitfield_lts_counter_seek` + +- `emqx_ds_storage_bitfield_lts_counter_next` + +- `emqx_ds_storage_bitfield_lts_counter_collision` + +Note: these metrics are only visible when session persistence is enabled. + +Number of persisted messages has been also added to the dashboard.