From d8204021dca78e1914ee37a77c2622182f5dcbf0 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 21 Mar 2024 12:32:30 +0100 Subject: [PATCH 01/16] refactor(metrics): Move metrics worker to emqx_utils application --- apps/emqx_utils/README.md | 1 + apps/{emqx => emqx_utils}/src/emqx_metrics_worker.erl | 0 apps/{emqx => emqx_utils}/test/emqx_metrics_worker_SUITE.erl | 0 3 files changed, 1 insertion(+) rename apps/{emqx => emqx_utils}/src/emqx_metrics_worker.erl (100%) rename apps/{emqx => emqx_utils}/test/emqx_metrics_worker_SUITE.erl (100%) diff --git a/apps/emqx_utils/README.md b/apps/emqx_utils/README.md index f8c386f3d..d03b34c64 100644 --- a/apps/emqx_utils/README.md +++ b/apps/emqx_utils/README.md @@ -16,6 +16,7 @@ handling, data conversions, and more. - `emqx_utils_json`: JSON encoding and decoding - `emqx_utils_maps`: convenience functions for map lookup and manipulation like deep_get etc. +- `emqx_metrics`: counters, gauges, slides ## Contributing diff --git a/apps/emqx/src/emqx_metrics_worker.erl b/apps/emqx_utils/src/emqx_metrics_worker.erl similarity index 100% rename from apps/emqx/src/emqx_metrics_worker.erl rename to apps/emqx_utils/src/emqx_metrics_worker.erl diff --git a/apps/emqx/test/emqx_metrics_worker_SUITE.erl b/apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl similarity index 100% rename from apps/emqx/test/emqx_metrics_worker_SUITE.erl rename to apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl From c9de336234f5843fff674a3f0896a3fa86ea08fa Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 21 Mar 2024 12:33:34 +0100 Subject: [PATCH 02/16] feat(ds): Add metrics worker to the builtin db supervision tree --- .../src/emqx_ds_builtin_metrics.erl | 75 +++++++++++++++++++ .../src/emqx_ds_builtin_sup.erl | 3 +- 2 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl new file mode 100644 index 000000000..a47540360 --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -0,0 +1,75 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_ds_builtin_metrics). + +%% API: +-export([child_spec/0, init_for_db/1, init_for_shard/2]). + +%% behavior callbacks: +-export([]). + +%% internal exports: +-export([]). + +-export_type([]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +-define(WORKER, ?MODULE). + +-define(DB_METRICS, + [ + + ]). + +-define(SHARD_METRICS, + [ + 'egress.bytes', + 'egress.batches', + 'egress.messages', + {slide, 'egress.flush_time'} + ]). + +%%================================================================================ +%% API functions +%%================================================================================ + +-spec child_spec() -> supervisor:child_spec(). +child_spec() -> + emqx_metrics_worker:child_spec(?WORKER). + +-spec init_for_db(emqx_ds:db()) -> ok. +init_for_db(DB) -> + emqx_metrics_worker:create_metrics(?WORKER, DB, ?DB_METRICS, []). + +-spec init_for_shard(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok. +init_for_shard(DB, ShardId) -> + Id = iolist_to_binary([atom_to_list(DB), $/, ShardId]), + emqx_metrics_worker:create_metrics(?WORKER, Id, ?SHARD_METRICS, []). + +%%================================================================================ +%% behavior callbacks +%%================================================================================ + +%%================================================================================ +%% Internal exports +%%================================================================================ + +%%================================================================================ +%% Internal functions +%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl index 50ed18de1..45e81bdc9 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl @@ -81,6 +81,7 @@ stop_db(DB) -> %% Chidren are attached dynamically to this one. init(?top) -> %% Children: + MetricsWorker = emqx_ds_builtin_metrics:child_spec(), MetadataServer = #{ id => metadata_server, start => {emqx_ds_replication_layer_meta, start_link, []}, @@ -102,7 +103,7 @@ init(?top) -> period => 1, auto_shutdown => never }, - {ok, {SupFlags, [MetadataServer, DBsSup]}}; + {ok, {SupFlags, [MetricsWorker, MetadataServer, DBsSup]}}; init(?databases) -> %% Children are added dynamically: SupFlags = #{ From 606f2a88cd11e2eef067f247a73a89ff47699cd0 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 21 Mar 2024 15:03:33 +0100 Subject: [PATCH 03/16] feat(ds): Add egress metrics --- .../src/emqx_ds_builtin_metrics.erl | 71 +++++++++++++------ .../src/emqx_ds_replication_layer_egress.erl | 64 ++++++++++++++--- apps/emqx_prometheus/src/emqx_prometheus.erl | 13 ++++ 3 files changed, 115 insertions(+), 33 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index a47540360..8075238b3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -16,7 +16,14 @@ -module(emqx_ds_builtin_metrics). %% API: --export([child_spec/0, init_for_db/1, init_for_shard/2]). +-export([child_spec/0, init_for_db/1, shard_metric_id/2, init_for_shard/1]). +-export([ + inc_egress_batches/1, + inc_egress_batches_retry/1, + inc_egress_messages/2, + inc_egress_bytes/2, + observe_egress_flush_time/2 +]). %% behavior callbacks: -export([]). @@ -24,7 +31,7 @@ %% internal exports: -export([]). --export_type([]). +-export_type([shard_metrics_id/0]). %%================================================================================ %% Type declarations @@ -32,18 +39,17 @@ -define(WORKER, ?MODULE). --define(DB_METRICS, - [ +-define(DB_METRICS, []). - ]). +-define(SHARD_METRICS, [ + 'egress.batches', + 'egress.batches.retry', + 'egress.messages', + 'egress.bytes', + {slide, 'egress.flush_time'} +]). --define(SHARD_METRICS, - [ - 'egress.bytes', - 'egress.batches', - 'egress.messages', - {slide, 'egress.flush_time'} - ]). +-type shard_metrics_id() :: binary(). %%================================================================================ %% API functions @@ -57,18 +63,39 @@ child_spec() -> init_for_db(DB) -> emqx_metrics_worker:create_metrics(?WORKER, DB, ?DB_METRICS, []). --spec init_for_shard(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok. -init_for_shard(DB, ShardId) -> - Id = iolist_to_binary([atom_to_list(DB), $/, ShardId]), - emqx_metrics_worker:create_metrics(?WORKER, Id, ?SHARD_METRICS, []). +-spec shard_metric_id(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> shard_metrics_id(). +shard_metric_id(DB, ShardId) -> + iolist_to_binary([atom_to_list(DB), $/, ShardId]). -%%================================================================================ -%% behavior callbacks -%%================================================================================ +-spec init_for_shard(shard_metrics_id()) -> ok. +init_for_shard(ShardId) -> + emqx_metrics_worker:create_metrics(?WORKER, ShardId, ?SHARD_METRICS, []). -%%================================================================================ -%% Internal exports -%%================================================================================ +%% @doc Increase the number of successfully flushed batches +-spec inc_egress_batches(shard_metrics_id()) -> ok. +inc_egress_batches(Id) -> + emqx_metrics_worker:inc(?WORKER, Id, 'egress.batches'). + +%% @doc Increase the number of time the egress worker had to retry +%% flushing the batch +-spec inc_egress_batches_retry(shard_metrics_id()) -> ok. +inc_egress_batches_retry(Id) -> + emqx_metrics_worker:inc(?WORKER, Id, 'egress.batches.retry'). + +%% @doc Increase the number of messages successfully saved to the shard +-spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok. +inc_egress_messages(Id, NMessages) -> + emqx_metrics_worker:inc(?WORKER, Id, 'egress.messages', NMessages). + +%% @doc Increase the number of messages successfully saved to the shard +-spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok. +inc_egress_bytes(Id, NMessages) -> + emqx_metrics_worker:inc(?WORKER, Id, 'egress.bytes', NMessages). + +%% @doc Add a sample of time spent flushing the egress to the Raft log (in microseconds) +-spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. +observe_egress_flush_time(Id, FlushTime) -> + emqx_metrics_worker:observe(?WORKER, Id, 'egress.flush_time', FlushTime). %%================================================================================ %% Internal functions diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 128aeb380..3f9188312 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -40,6 +40,7 @@ -export_type([]). +-include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). %%================================================================================ @@ -49,8 +50,16 @@ -define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}). -define(flush, flush). --record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}). --record(enqueue_atomic_req, {batch :: [emqx_types:message()], sync :: boolean()}). +-record(enqueue_req, { + message :: emqx_types:message(), + sync :: boolean(), + payload_bytes :: non_neg_integer() +}). +-record(enqueue_atomic_req, { + batch :: [emqx_types:message()], + sync :: boolean(), + payload_bytes :: non_neg_integer() +}). %%================================================================================ %% API functions @@ -73,7 +82,8 @@ store_batch(DB, Messages, Opts) -> ?via(DB, Shard), #enqueue_req{ message = Message, - sync = Sync + sync = Sync, + payload_bytes = payload_size(Message) }, infinity ) @@ -83,11 +93,19 @@ store_batch(DB, Messages, Opts) -> true -> maps:foreach( fun(Shard, Batch) -> + PayloadBytes = lists:foldl( + fun(Msg, Acc) -> + Acc + payload_size(Msg) + end, + 0, + Batch + ), gen_server:call( ?via(DB, Shard), #enqueue_atomic_req{ batch = Batch, - sync = Sync + sync = Sync, + payload_bytes = PayloadBytes }, infinity ) @@ -108,7 +126,9 @@ store_batch(DB, Messages, Opts) -> -record(s, { db :: emqx_ds:db(), shard :: emqx_ds_replication_layer:shard_id(), + metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(), n = 0 :: non_neg_integer(), + n_bytes = 0 :: non_neg_integer(), tref :: reference(), batch = [] :: [emqx_types:message()], pending_replies = [] :: [gen_server:from()] @@ -117,18 +137,21 @@ store_batch(DB, Messages, Opts) -> init([DB, Shard]) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), + MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard), + ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId), S = #s{ db = DB, shard = Shard, + metrics_id = MetricsId, tref = start_timer() }, {ok, S}. -handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) -> - do_enqueue(From, Sync, Msg, S); -handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync}, From, S) -> +handle_call(#enqueue_req{message = Msg, sync = Sync, payload_bytes = NBytes}, From, S) -> + do_enqueue(From, Sync, Msg, NBytes, S); +handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync, payload_bytes = NBytes}, From, S) -> Len = length(Batch), - do_enqueue(From, Sync, {atomic, Len, Batch}, S); + do_enqueue(From, Sync, {atomic, Len, NBytes, Batch}, NBytes, S); handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -161,6 +184,11 @@ do_flush( ) -> case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of ok -> + emqx_ds_builtin_metrics:inc_egress_batches(S#s.metrics_id), + emqx_ds_builtin_metrics:inc_egress_messages(S#s.metrics_id, S#s.n), + emqx_ds_builtin_metrics:inc_egress_bytes(S#s.metrics_id, S#s.n_bytes), + lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), + true = erlang:garbage_collect(), ?tp( emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages} @@ -169,6 +197,7 @@ do_flush( true = erlang:garbage_collect(), ok; Error -> + emqx_ds_builtin_metrics:inc_egress_batches_retry(S#s.metrics_id), true = erlang:garbage_collect(), ?tp( warning, @@ -184,19 +213,27 @@ do_flush( end, S#s{ n = 0, + n_bytes = 0, batch = [], pending_replies = [], tref = start_timer() }. -do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> +do_enqueue( + From, + Sync, + MsgOrBatch, + BatchBytes, + S0 = #s{n = N, n_bytes = NBytes0, batch = Batch, pending_replies = Replies} +) -> + NBytes = NBytes0 + BatchBytes, NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), S1 = case MsgOrBatch of {atomic, NumMsgs, Msgs} -> - S0#s{n = N + NumMsgs, batch = Msgs ++ Batch}; + S0#s{n = N + NumMsgs, n_bytes = NBytes, batch = Msgs ++ Batch}; Msg -> - S0#s{n = N + 1, batch = [Msg | Batch]} + S0#s{n = N + 1, n_bytes = NBytes, batch = [Msg | Batch]} end, %% TODO: later we may want to delay the reply until the message is %% replicated, but it requies changes to the PUBACK/PUBREC flow to @@ -228,3 +265,8 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies start_timer() -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), erlang:send_after(Interval, self(), ?flush). + +%% @doc Return approximate size of the MQTT message (it doesn't take +%% all things into account, for example headers and extras) +payload_size(#message{payload = P, topic = T}) -> + size(P) + size(T). diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 8556e82d3..813f91dbd 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -212,6 +212,7 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) -> ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)), ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)), + ok = add_collect_family(Callback, ds_metric_meta(), ?MG(ds_data, RawData)), ok = maybe_license_add_collect_family(Callback, RawData), ok; collect_mf(_Registry, _Callback) -> @@ -1011,6 +1012,18 @@ catch_all(DataFun) -> _:_ -> undefined end. +%%======================================== +%% Durable storge +%%======================================== + +ds_metric_meta() -> + [ + {emqx_ds_egress_batches, counter, 'egress.batches'}, + {emqx_ds_egress_batches_retry, counter, 'egress.batches.retry'}, + {emqx_ds_egress_messages, counter, 'egress.messages'}, + {emqx_ds_egress_bytes, counter, 'egress.bytes'} + ]. + %%-------------------------------------------------------------------- %% Collect functions %%-------------------------------------------------------------------- From 044f3d4ef5d0863b90492fa52fd2cfb6ebe396f6 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 25 Mar 2024 20:58:24 +0100 Subject: [PATCH 04/16] fix(ds): Don't reverse entries in the atomic batch --- .../src/emqx_ds_replication_layer_egress.erl | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 3f9188312..1b2ac30dd 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -130,7 +130,7 @@ store_batch(DB, Messages, Opts) -> n = 0 :: non_neg_integer(), n_bytes = 0 :: non_neg_integer(), tref :: reference(), - batch = [] :: [emqx_types:message()], + queue :: queue:queue(emqx_types:message()), pending_replies = [] :: [gen_server:from()] }). @@ -143,7 +143,8 @@ init([DB, Shard]) -> db = DB, shard = Shard, metrics_id = MetricsId, - tref = start_timer() + tref = start_timer(), + queue = queue:new() }, {ok, S}. @@ -151,7 +152,7 @@ handle_call(#enqueue_req{message = Msg, sync = Sync, payload_bytes = NBytes}, Fr do_enqueue(From, Sync, Msg, NBytes, S); handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync, payload_bytes = NBytes}, From, S) -> Len = length(Batch), - do_enqueue(From, Sync, {atomic, Len, NBytes, Batch}, NBytes, S); + do_enqueue(From, Sync, {atomic, Len, Batch}, NBytes, S); handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -177,63 +178,59 @@ terminate(_Reason, _S) -> -define(COOLDOWN_MIN, 1000). -define(COOLDOWN_MAX, 5000). -do_flush(S = #s{batch = []}) -> - S#s{tref = start_timer()}; do_flush( - S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} + S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard} ) -> - case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of + Messages = queue:to_list(Q), + case emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages) of ok -> emqx_ds_builtin_metrics:inc_egress_batches(S#s.metrics_id), emqx_ds_builtin_metrics:inc_egress_messages(S#s.metrics_id, S#s.n), emqx_ds_builtin_metrics:inc_egress_bytes(S#s.metrics_id, S#s.n_bytes), lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), - true = erlang:garbage_collect(), ?tp( emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages} ), lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), - true = erlang:garbage_collect(), - ok; + erlang:garbage_collect(), + S#s{ + n = 0, + n_bytes = 0, + queue = queue:new(), + pending_replies = [], + tref = start_timer() + }; Error -> emqx_ds_builtin_metrics:inc_egress_batches_retry(S#s.metrics_id), - true = erlang:garbage_collect(), + erlang:garbage_collect(), ?tp( warning, emqx_ds_replication_layer_egress_flush_failed, #{db => DB, shard => Shard, reason => Error} ), Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), - ok = timer:sleep(Cooldown), - %% Since we drop the entire batch here, we at least reply callers with an - %% error so they don't hang indefinitely in the `gen_server' call with - %% `infinity' timeout. - lists:foreach(fun(From) -> gen_server:reply(From, {error, Error}) end, Replies) - end, - S#s{ - n = 0, - n_bytes = 0, - batch = [], - pending_replies = [], - tref = start_timer() - }. + S#s{ + tref = start_timer(Cooldown) + } + end. do_enqueue( From, Sync, MsgOrBatch, BatchBytes, - S0 = #s{n = N, n_bytes = NBytes0, batch = Batch, pending_replies = Replies} + S0 = #s{n = N, n_bytes = NBytes0, queue = Q0, pending_replies = Replies} ) -> NBytes = NBytes0 + BatchBytes, NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), S1 = case MsgOrBatch of {atomic, NumMsgs, Msgs} -> - S0#s{n = N + NumMsgs, n_bytes = NBytes, batch = Msgs ++ Batch}; + Q = lists:foldl(fun queue:in/2, Q0, Msgs), + S0#s{n = N + NumMsgs, n_bytes = NBytes, queue = Q}; Msg -> - S0#s{n = N + 1, n_bytes = NBytes, batch = [Msg | Batch]} + S0#s{n = N + 1, n_bytes = NBytes, queue = queue:in(Msg, Q0)} end, %% TODO: later we may want to delay the reply until the message is %% replicated, but it requies changes to the PUBACK/PUBREC flow to @@ -264,6 +261,9 @@ do_enqueue( start_timer() -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), + start_timer(Interval). + +start_timer(Interval) -> erlang:send_after(Interval, self(), ?flush). %% @doc Return approximate size of the MQTT message (it doesn't take From 0de255cac87bc3219029b4c3692ce57ebc6b939d Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 21 Mar 2024 16:15:31 +0100 Subject: [PATCH 05/16] feat(ds): Report egress flush time --- .../src/emqx_ds_replication_layer_egress.erl | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 1b2ac30dd..9651c029e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -179,14 +179,18 @@ terminate(_Reason, _S) -> -define(COOLDOWN_MAX, 5000). do_flush( - S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard} + S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard, metrics_id = Metrics} ) -> Messages = queue:to_list(Q), - case emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages) of + T0 = erlang:monotonic_time(microsecond), + Result = emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_egress_flush_time(Metrics, T1 - T0), + case Result of ok -> - emqx_ds_builtin_metrics:inc_egress_batches(S#s.metrics_id), - emqx_ds_builtin_metrics:inc_egress_messages(S#s.metrics_id, S#s.n), - emqx_ds_builtin_metrics:inc_egress_bytes(S#s.metrics_id, S#s.n_bytes), + emqx_ds_builtin_metrics:inc_egress_batches(Metrics), + emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n), + emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes), lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), ?tp( emqx_ds_replication_layer_egress_flush, From 75b092bf0ed2d65e1ed2687390c95154b218b3fc Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 22 Mar 2024 22:30:28 +0100 Subject: [PATCH 06/16] fix(ds): Actually retry sending batch --- .../src/emqx_ds_builtin_db_sup.erl | 10 +- .../src/emqx_ds_builtin_metrics.erl | 132 +++++++- .../src/emqx_ds_replication_layer_egress.erl | 286 +++++++++++------- .../src/emqx_ds_storage_layer.erl | 3 + .../test/emqx_ds_SUITE.erl | 103 ++++++- .../test/emqx_ds_test_helpers.erl | 11 +- apps/emqx_prometheus/src/emqx_prometheus.erl | 26 +- 7 files changed, 411 insertions(+), 160 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 79e2f6120..a697b9276 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -31,7 +31,7 @@ ensure_shard/1, ensure_egress/1 ]). --export([which_shards/1]). +-export([which_dbs/0, which_shards/1]). %% behaviour callbacks: -export([init/1]). @@ -104,6 +104,13 @@ ensure_egress(Shard) -> which_shards(DB) -> supervisor:which_children(?via(#?shards_sup{db = DB})). +%% @doc Return the list of builtin DS databases that are currently +%% active on the node. +-spec which_dbs() -> [emqx_ds:db()]. +which_dbs() -> + Key = {n, l, #?db_sup{_ = '_', db = '$1'}}, + gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]). + %%================================================================================ %% behaviour callbacks %%================================================================================ @@ -111,6 +118,7 @@ which_shards(DB) -> init({#?db_sup{db = DB}, DefaultOpts}) -> %% Spec for the top-level supervisor for the database: logger:notice("Starting DS DB ~p", [DB]), + emqx_ds_builtin_metrics:init_for_db(DB), Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), ok = start_ra_system(DB, Opts), Children = [ diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index 8075238b3..f0eac9652 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -15,11 +15,16 @@ %%-------------------------------------------------------------------- -module(emqx_ds_builtin_metrics). -%% API: +%% DS-facing API: -export([child_spec/0, init_for_db/1, shard_metric_id/2, init_for_shard/1]). + +%% Prometheus-facing API: +-export([prometheus_meta/0, prometheus_collect/1]). + -export([ inc_egress_batches/1, inc_egress_batches_retry/1, + inc_egress_batches_failed/1, inc_egress_messages/2, inc_egress_bytes/2, observe_egress_flush_time/2 @@ -42,11 +47,12 @@ -define(DB_METRICS, []). -define(SHARD_METRICS, [ - 'egress.batches', - 'egress.batches.retry', - 'egress.messages', - 'egress.bytes', - {slide, 'egress.flush_time'} + {counter, 'emqx_ds_egress_batches'}, + {counter, 'emqx_ds_egress_batches_retry'}, + {counter, 'emqx_ds_egress_batches_failed'}, + {counter, 'emqx_ds_egress_messages'}, + {counter, 'emqx_ds_egress_bytes'}, + {slide, 'emqx_ds_egress_flush_time'} ]). -type shard_metrics_id() :: binary(). @@ -59,14 +65,16 @@ child_spec() -> emqx_metrics_worker:child_spec(?WORKER). +%% @doc Initialize metrics that are global for a DS database -spec init_for_db(emqx_ds:db()) -> ok. -init_for_db(DB) -> - emqx_metrics_worker:create_metrics(?WORKER, DB, ?DB_METRICS, []). +init_for_db(_DB) -> + ok. -spec shard_metric_id(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> shard_metrics_id(). shard_metric_id(DB, ShardId) -> iolist_to_binary([atom_to_list(DB), $/, ShardId]). +%% @doc Initialize metrics that are specific for the shard. -spec init_for_shard(shard_metrics_id()) -> ok. init_for_shard(ShardId) -> emqx_metrics_worker:create_metrics(?WORKER, ShardId, ?SHARD_METRICS, []). @@ -74,28 +82,124 @@ init_for_shard(ShardId) -> %% @doc Increase the number of successfully flushed batches -spec inc_egress_batches(shard_metrics_id()) -> ok. inc_egress_batches(Id) -> - emqx_metrics_worker:inc(?WORKER, Id, 'egress.batches'). + emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches'). %% @doc Increase the number of time the egress worker had to retry %% flushing the batch -spec inc_egress_batches_retry(shard_metrics_id()) -> ok. inc_egress_batches_retry(Id) -> - emqx_metrics_worker:inc(?WORKER, Id, 'egress.batches.retry'). + emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry'). + +%% @doc Increase the number of time the egress worker encountered an +%% unrecoverable error while trying to flush the batch +-spec inc_egress_batches_failed(shard_metrics_id()) -> ok. +inc_egress_batches_failed(Id) -> + emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed'). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_messages(Id, NMessages) -> - emqx_metrics_worker:inc(?WORKER, Id, 'egress.messages', NMessages). + emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_bytes(Id, NMessages) -> - emqx_metrics_worker:inc(?WORKER, Id, 'egress.bytes', NMessages). + emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages). -%% @doc Add a sample of time spent flushing the egress to the Raft log (in microseconds) +%% @doc Add a sample of elapsed time spent flushing the egress to the +%% Raft log (in microseconds) -spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. observe_egress_flush_time(Id, FlushTime) -> - emqx_metrics_worker:observe(?WORKER, Id, 'egress.flush_time', FlushTime). + emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime). + +prometheus_meta() -> + lists:map( + fun + ({counter, A}) -> + {A, counter, A}; + ({slide, A}) -> + {A, counter, A} + end, + ?SHARD_METRICS + ). + +prometheus_collect(NodeOrAggr) -> + prometheus_per_shard(NodeOrAggr). + +%% This function returns the data in the following format: +%% ``` +%% #{emqx_ds_egress_batches => +%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],99408}, +%% {[{db,emqx_persistent_message},{shard,<<"0">>}],99409}], +%% emqx_ds_egress_batches_retry => +%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],0}, +%% {[{db,emqx_persistent_message},{shard,<<"0">>}],0}], +%% emqx_ds_egress_messages => +%% ... +%% } +%% ''' +%% +%% If `NodeOrAggr' = `node' then node name is appended to the list of +%% labels. +prometheus_per_shard(NodeOrAggr) -> + lists:foldl( + fun(DB, Acc0) -> + lists:foldl( + fun(Shard, Acc) -> + prometheus_per_shard(NodeOrAggr, DB, Shard, Acc) + end, + Acc0, + emqx_ds_replication_layer_meta:shards(DB) + ) + end, + #{}, + emqx_ds_builtin_db_sup:which_dbs() + ). + +prometheus_per_shard(NodeOrAggr, DB, Shard, Acc0) -> + Labels = [ + {db, DB}, + {shard, Shard} + | case NodeOrAggr of + node -> []; + _ -> [{node, node()}] + end + ], + #{counters := CC, slides := SS} = emqx_metrics_worker:get_metrics( + ?WORKER, shard_metric_id(DB, Shard) + ), + %% Collect counters: + Acc1 = maps:fold( + fun(MetricId, Value, Acc1) -> + append_to_key(MetricId, {Labels, Value}, Acc1) + end, + Acc0, + CC + ), + %% Collect slides: + maps:fold( + fun(MetricId, Value, Acc2) -> + Acc3 = append_to_key(MetricId, slide_value(current, Value, Labels), Acc2), + append_to_key(MetricId, slide_value(last5m, Value, Labels), Acc3) + end, + Acc1, + SS + ). + +-spec append_to_key(K, V, #{K => [V]}) -> #{K => [V]}. +append_to_key(Key, Value, Map) -> + maps:update_with( + Key, + fun(L) -> + [Value | L] + end, + [Value], + Map + ). + +slide_value(Interval, Value, Labels0) -> + Labels = [{interval, Interval} | Labels0], + {Labels, maps:get(Interval, Value, 0)}. %%================================================================================ %% Internal functions diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 9651c029e..667e1daa4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -51,13 +51,10 @@ -define(flush, flush). -record(enqueue_req, { - message :: emqx_types:message(), - sync :: boolean(), - payload_bytes :: non_neg_integer() -}). --record(enqueue_atomic_req, { - batch :: [emqx_types:message()], + messages :: [emqx_types:message()], sync :: boolean(), + atomic :: boolean(), + n_messages :: non_neg_integer(), payload_bytes :: non_neg_integer() }). @@ -70,53 +67,33 @@ start_link(DB, Shard) -> gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []). -spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> - ok. + emqx_ds:store_batch_result(). store_batch(DB, Messages, Opts) -> Sync = maps:get(sync, Opts, true), - case maps:get(atomic, Opts, false) of - false -> - lists:foreach( - fun(Message) -> - Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), - gen_server:call( - ?via(DB, Shard), - #enqueue_req{ - message = Message, - sync = Sync, - payload_bytes = payload_size(Message) - }, - infinity - ) - end, - Messages + Atomic = maps:get(atomic, Opts, false), + %% Usually we expect all messages in the batch to go into the + %% single shard, so this function is optimized for the happy case. + case shards_of_batch(DB, Messages) of + [{Shard, {NMsgs, NBytes}}] -> + %% Happy case: + gen_server:call( + ?via(DB, Shard), + #enqueue_req{ + messages = Messages, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes + }, + infinity ); - true -> - maps:foreach( - fun(Shard, Batch) -> - PayloadBytes = lists:foldl( - fun(Msg, Acc) -> - Acc + payload_size(Msg) - end, - 0, - Batch - ), - gen_server:call( - ?via(DB, Shard), - #enqueue_atomic_req{ - batch = Batch, - sync = Sync, - payload_bytes = PayloadBytes - }, - infinity - ) - end, - maps:groups_from_list( - fun(Message) -> - emqx_ds_replication_layer:shard_of_message(DB, Message, clientid) - end, - Messages - ) - ) + [_, _ | _] when Atomic -> + %% It's impossible to commit a batch to multiple shards + %% atomically + {error, unrecoverable, atomic_commit_to_multiple_shards}; + _Shards -> + %% Use a slower implementation for the unlikely case: + repackage_messages(DB, Messages, Sync, Atomic) end. %%================================================================================ @@ -129,7 +106,7 @@ store_batch(DB, Messages, Opts) -> metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(), n = 0 :: non_neg_integer(), n_bytes = 0 :: non_neg_integer(), - tref :: reference(), + tref :: undefined | reference(), queue :: queue:queue(emqx_types:message()), pending_replies = [] :: [gen_server:from()] }). @@ -143,16 +120,18 @@ init([DB, Shard]) -> db = DB, shard = Shard, metrics_id = MetricsId, - tref = start_timer(), queue = queue:new() }, - {ok, S}. + {ok, start_timer(S)}. -handle_call(#enqueue_req{message = Msg, sync = Sync, payload_bytes = NBytes}, From, S) -> - do_enqueue(From, Sync, Msg, NBytes, S); -handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync, payload_bytes = NBytes}, From, S) -> - Len = length(Batch), - do_enqueue(From, Sync, {atomic, Len, Batch}, NBytes, S); +handle_call( + #enqueue_req{ + messages = Msgs, sync = Sync, atomic = Atomic, n_messages = NMsgs, payload_bytes = NBytes + }, + From, + S +) -> + {noreply, enqueue(From, Sync, Atomic, Msgs, NMsgs, NBytes, S)}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -160,7 +139,7 @@ handle_cast(_Cast, S) -> {noreply, S}. handle_info(?flush, S) -> - {noreply, do_flush(S)}; + {noreply, flush(S)}; handle_info(_Info, S) -> {noreply, S}. @@ -175,9 +154,60 @@ terminate(_Reason, _S) -> %% Internal functions %%================================================================================ +enqueue( + From, + Sync, + Atomic, + Msgs, + BatchSize, + BatchBytes, + S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0, pending_replies = Replies0} +) -> + %% At this point we don't split the batches, even when they aren't + %% atomic. It wouldn't win us anything in terms of memory, and + %% EMQX currently feeds data to DS in very small batches, so + %% granularity should be fine enough. + NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), + NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity), + NMsgs = NMsgs0 + BatchSize, + NBytes = NBytes0 + BatchBytes, + case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of + true -> + %% Adding this batch would cause buffer to overflow. Flush + %% it now, and retry: + cancel_timer(S0), + S1 = flush(S0), + enqueue(From, Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); + false -> + %% The buffer is empty, we enqueue the atomic batch in its + %% entirety: + Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), + Replies = + case Sync of + true -> + [From | Replies0]; + false -> + gen_server:reply(From, ok), + Replies0 + end, + S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1, pending_replies = Replies}, + case NMsgs >= NMax orelse NBytes >= NBytes of + true -> + cancel_timer(S1), + flush(S1); + false -> + S1 + end + end. + -define(COOLDOWN_MIN, 1000). -define(COOLDOWN_MAX, 5000). +flush(S) -> + start_timer(do_flush(S)). + +do_flush(S0 = #s{n = 0}) -> + S0; do_flush( S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard, metrics_id = Metrics} ) -> @@ -202,73 +232,103 @@ do_flush( n = 0, n_bytes = 0, queue = queue:new(), - pending_replies = [], - tref = start_timer() + pending_replies = [] }; - Error -> - emqx_ds_builtin_metrics:inc_egress_batches_retry(S#s.metrics_id), + {error, recoverable, Reason} -> + %% Retry sending the batch: + emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics), erlang:garbage_collect(), + %% We block the gen_server until the next retry. + BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), + timer:sleep(BlockTime), ?tp( warning, emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Error} + #{db => DB, shard => Shard, reason => Reason} ), - Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), + S; + Err = {error, unrecoverable, _} -> + emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics), + lists:foreach(fun(From) -> gen_server:reply(From, Err) end, Replies), + erlang:garbage_collect(), S#s{ - tref = start_timer(Cooldown) + n = 0, + n_bytes = 0, + queue = queue:new(), + pending_replies = [] } end. -do_enqueue( - From, - Sync, - MsgOrBatch, - BatchBytes, - S0 = #s{n = N, n_bytes = NBytes0, queue = Q0, pending_replies = Replies} -) -> - NBytes = NBytes0 + BatchBytes, - NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), - S1 = - case MsgOrBatch of - {atomic, NumMsgs, Msgs} -> - Q = lists:foldl(fun queue:in/2, Q0, Msgs), - S0#s{n = N + NumMsgs, n_bytes = NBytes, queue = Q}; - Msg -> - S0#s{n = N + 1, n_bytes = NBytes, queue = queue:in(Msg, Q0)} - end, - %% TODO: later we may want to delay the reply until the message is - %% replicated, but it requies changes to the PUBACK/PUBREC flow to - %% allow for async replies. For now, we ack when the message is - %% _buffered_ rather than stored. - %% - %% Otherwise, the client would freeze for at least flush interval, - %% or until the buffer is filled. - S2 = - case Sync of - true -> - S1#s{pending_replies = [From | Replies]}; - false -> - gen_server:reply(From, ok), - S1 - end, - S = - case N >= NMax of - true -> - _ = erlang:cancel_timer(S2#s.tref), - do_flush(S2); - false -> - S2 - end, - %% TODO: add a backpressure mechanism for the server to avoid - %% building a long message queue. - {noreply, S}. +-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) -> + [{emqx_ds_replication_layer:shard_id(), {NMessages, NBytes}}] +when + NMessages :: non_neg_integer(), + NBytes :: non_neg_integer(). +shards_of_batch(DB, Messages) -> + maps:to_list( + lists:foldl( + fun(Message, Acc) -> + %% TODO: sharding strategy must be part of the DS DB schema: + Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), + Size = payload_size(Message), + maps:update_with( + Shard, + fun({N, S}) -> + {N + 1, S + Size} + end, + {1, Size}, + Acc + ) + end, + #{}, + Messages + ) + ). -start_timer() -> +repackage_messages(DB, Messages, Sync, Atomic) -> + Batches = lists:foldl( + fun(Message, Acc) -> + Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), + Size = payload_size(Message), + maps:update_with( + Shard, + fun({N, S, Msgs}) -> + {N + 1, S + Size, [Message | Msgs]} + end, + {1, Size, [Message]}, + Acc + ) + end, + #{}, + Messages + ), + maps:foreach( + fun(Shard, {NMsgs, ByteSize, RevMessages}) -> + gen_server:call( + ?via(DB, Shard), + #enqueue_req{ + messages = lists:reverse(RevMessages), + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = ByteSize + }, + infinity + ) + end, + Batches + ). + +start_timer(S) -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), - start_timer(Interval). + Tref = erlang:send_after(Interval, self(), ?flush), + S#s{tref = Tref}. -start_timer(Interval) -> - erlang:send_after(Interval, self(), ?flush). +cancel_timer(#s{tref = undefined}) -> + ok; +cancel_timer(#s{tref = TRef}) -> + _ = erlang:cancel_timer(TRef), + ok. %% @doc Return approximate size of the MQTT message (it doesn't take %% all things into account, for example headers and extras) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 6b85328b6..cbc0e9abf 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -254,6 +254,9 @@ drop_shard(Shard) -> store_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. + ?tp(emqx_ds_storage_layer_store_batch, #{ + shard => Shard, messages => Messages, options => Options + }), #{module := Mod, data := GenData} = generation_at(Shard, Time), Mod:store_batch(Shard, GenData, Messages, Options); store_batch(_Shard, [], _Options) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 3df16dc1c..33988a974 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -322,17 +322,10 @@ t_09_atomic_store_batch(_Config) -> sync => true }) ), - - ok + {ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush}), + ?assertMatch(#{batch := [_, _, _]}, Flush) end, - fun(Trace) -> - %% Must contain exactly one flush with all messages. - ?assertMatch( - [#{batch := [_, _, _]}], - ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) - ), - ok - end + [] ), ok. @@ -355,14 +348,15 @@ t_10_non_atomic_store_batch(_Config) -> sync => true }) ), - - ok + timer:sleep(1000) end, fun(Trace) -> %% Should contain one flush per message. + Batches = ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)), + ?assertMatch([_], Batches), ?assertMatch( - [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}], - ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) + [_, _, _], + lists:append(Batches) ), ok end @@ -681,10 +675,86 @@ t_error_mapping_replication_layer(_Config) -> length([error || {error, _, _} <- Results2]) > 0, Results2 ), - - snabbkaffe:stop(), meck:unload(). +%% This test suite verifies the behavior of `store_batch' operation +%% when the underlying code experiences recoverable or unrecoverable +%% problems. +t_store_batch_fail(_Config) -> + ?check_trace( + #{timetrap => 15_000}, + try + meck:new(emqx_ds_replication_layer, [passthrough, no_history]), + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), + %% Success: + Batch1 = [ + message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1), + message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})), + %% Inject unrecoverable error: + meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(_DB, _Shard, _Messages) -> + {error, unrecoverable, mock} + end), + Batch2 = [ + message(<<"C1">>, <<"foo/bar">>, <<"3">>, 1), + message(<<"C1">>, <<"foo/bar">>, <<"4">>, 1) + ], + ?assertMatch( + {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true}) + ), + %% Inject a recoverable error: + Batch3 = [ + message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2), + message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2), + message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3), + message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3) + ], + meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(DB, Shard, Messages) -> + try + ?tp(store_batch, #{messages => Messages}), + meck:passthrough([DB, Shard, Messages]) + catch + _:_ -> + {error, recoverable, mock} + end + end), + ?inject_crash(#{?snk_kind := store_batch}, snabbkaffe_nemesis:recover_after(3)), + ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})), + lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1)) + after + meck:unload() + end, + [ + {"number of successfull flushes after retry", fun(Trace) -> + ?assertMatch([_, _], ?of_kind(store_batch, Trace)) + end}, + {"number of retries", fun(Trace) -> + ?assertMatch([_, _, _], ?of_kind(snabbkaffe_crash, Trace)) + end}, + {"message ordering", fun(StoredMessages, _Trace) -> + [{_, Stream1}, {_, Stream2}] = StoredMessages, + ?assertMatch( + [ + #message{payload = <<"1">>}, + #message{payload = <<"2">>}, + #message{payload = <<"5">>}, + #message{payload = <<"7">>} + ], + Stream1 + ), + ?assertMatch( + [ + #message{payload = <<"6">>}, + #message{payload = <<"8">>} + ], + Stream2 + ) + end} + ] + ). + update_data_set() -> [ [ @@ -748,6 +818,7 @@ init_per_testcase(_TC, Config) -> Config. end_per_testcase(_TC, _Config) -> + snabbkaffe:stop(), ok = application:stop(emqx_durable_storage), mria:stop(), _ = mnesia:delete_schema([node()]), diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 44c45248b..4af1e9791 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -63,9 +63,16 @@ consume(DB, TopicFilter) -> consume(DB, TopicFilter, 0). consume(DB, TopicFilter, StartTime) -> - Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime), lists:flatmap( - fun({_Rank, Stream}) -> consume_stream(DB, Stream, TopicFilter, StartTime) end, + fun({_Stream, Msgs}) -> + Msgs + end, + consume_per_stream(DB, TopicFilter, StartTime)). + +consume_per_stream(DB, TopicFilter, StartTime) -> + Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime), + lists:map( + fun({_Rank, Stream}) -> {Stream, consume_stream(DB, Stream, TopicFilter, StartTime)} end, Streams ). diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 813f91dbd..044e701e3 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -212,7 +212,9 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) -> ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)), ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)), - ok = add_collect_family(Callback, ds_metric_meta(), ?MG(ds_data, RawData)), + ok = add_collect_family( + Callback, emqx_ds_builtin_metrics:prometheus_meta(), ?MG(ds_data, RawData) + ), ok = maybe_license_add_collect_family(Callback, RawData), ok; collect_mf(_Registry, _Callback) -> @@ -265,6 +267,7 @@ fetch_from_local_node(Mode) -> emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode), emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode), emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode), + ds_data => emqx_ds_builtin_metrics:prometheus_collect(Mode), mria_data => mria_data(Mode) }}. @@ -481,7 +484,14 @@ emqx_collect(K = emqx_mria_lag, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_bootstrap_time, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])). +emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])); +%% DS +emqx_collect(K = emqx_ds_egress_batches, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_egress_batches_retry, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_egress_batches_failed, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_egress_messages, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_egress_bytes, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, [])). %%-------------------------------------------------------------------- %% Indicators @@ -1012,18 +1022,6 @@ catch_all(DataFun) -> _:_ -> undefined end. -%%======================================== -%% Durable storge -%%======================================== - -ds_metric_meta() -> - [ - {emqx_ds_egress_batches, counter, 'egress.batches'}, - {emqx_ds_egress_batches_retry, counter, 'egress.batches.retry'}, - {emqx_ds_egress_messages, counter, 'egress.messages'}, - {emqx_ds_egress_bytes, counter, 'egress.bytes'} - ]. - %%-------------------------------------------------------------------- %% Collect functions %%-------------------------------------------------------------------- From b9ad241658f5cc283d35a39670f253433e27bc9b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 31 Mar 2024 15:44:31 +0200 Subject: [PATCH 07/16] feat(sessds): Add metrics for the number of persisted messages --- apps/emqx/src/emqx_metrics.erl | 5 ++++- apps/emqx/src/emqx_persistent_message.erl | 1 + apps/emqx_dashboard/include/emqx_dashboard.hrl | 6 ++++-- apps/emqx_dashboard/src/emqx_dashboard_monitor.erl | 3 ++- apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl | 4 ++++ apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl | 4 +++- 6 files changed, 18 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index 6b8b60209..13ac40c68 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -222,7 +222,9 @@ % Messages delivered {counter, 'messages.delivered'}, % Messages acked - {counter, 'messages.acked'} + {counter, 'messages.acked'}, + % Messages persistently stored + {counter, 'messages.persisted'} ] ). @@ -718,4 +720,5 @@ reserved_idx('overload_protection.gc') -> 403; reserved_idx('overload_protection.new_conn') -> 404; reserved_idx('messages.validation_succeeded') -> 405; reserved_idx('messages.validation_failed') -> 406; +reserved_idx('messages.persisted') -> 407; reserved_idx(_) -> undefined. diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index e3fa23296..10497216d 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -114,6 +114,7 @@ needs_persistence(Msg) -> -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result(). store_message(Msg) -> + emqx_metrics:inc('messages.persisted'), emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}). has_subscribers(#message{topic = Topic}) -> diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index 13458b4b4..40f2ba2b3 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -67,7 +67,8 @@ %, sent_bytes validation_succeeded, validation_failed, - dropped + dropped, + persisted ]). -define(GAUGE_SAMPLER_LIST, [ @@ -87,7 +88,8 @@ sent => sent_msg_rate, validation_succeeded => validation_succeeded_rate, validation_failed => validation_failed_rate, - dropped => dropped_msg_rate + dropped => dropped_msg_rate, + persisted => persisted_rate }). -define(CURRENT_SAMPLE_NON_RATE, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 6a9e868dd..fe0476e6d 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -428,7 +428,8 @@ stats(sent) -> emqx_metrics:val('messages.sent'); stats(sent_bytes) -> emqx_metrics:val('bytes.sent'); stats(validation_succeeded) -> emqx_metrics:val('messages.validation_succeeded'); stats(validation_failed) -> emqx_metrics:val('messages.validation_failed'); -stats(dropped) -> emqx_metrics:val('messages.dropped'). +stats(dropped) -> emqx_metrics:val('messages.dropped'); +stats(persisted) -> emqx_metrics:val('messages.persisted'). %% ------------------------------------------------------------------------------------------------- %% Retained && License Quota diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 3ffadc1b2..1b6773b87 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -192,6 +192,8 @@ swagger_desc(validation_succeeded) -> swagger_desc_format("Message validations succeeded "); swagger_desc(validation_failed) -> swagger_desc_format("Message validations failed "); +swagger_desc(persisted) -> + swagger_desc_format("Messages saved to the durable storage "); swagger_desc(subscriptions) -> <<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>; swagger_desc(topics) -> @@ -218,6 +220,8 @@ swagger_desc(validation_succeeded_rate) -> swagger_desc_format("Message validations succeeded ", per); swagger_desc(validation_failed_rate) -> swagger_desc_format("Message validations failed ", per); +swagger_desc(persisted_rate) -> + swagger_desc_format("Messages saved to the durable storage ", per); swagger_desc(retained_msg_count) -> <<"Retained messages count at the time of sampling.", ?APPROXIMATE_DESC>>; swagger_desc(shared_subscriptions) -> diff --git a/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl b/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl index a2c2b96c2..841610985 100644 --- a/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl +++ b/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl @@ -89,7 +89,9 @@ init_per_group(new_config, Config) -> Apps = emqx_cth_suite:start( [ %% coverage olp metrics - {emqx, "overload_protection.enable = true"}, + {emqx, + "overload_protection.enable = true\n" + "session_persistence.enable = true"}, {emqx_license, "license.key = default"}, {emqx_prometheus, #{config => config(default)}} ], From f41e538526be8357cf73afc2a16be28670b54dd3 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 31 Mar 2024 16:58:51 +0200 Subject: [PATCH 08/16] feat(sessds): Observe next time --- .../src/emqx_ds_builtin_metrics.erl | 20 +++++++++++++++++-- .../src/emqx_ds_replication_layer.erl | 6 +++++- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index f0eac9652..fc6de2861 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -27,7 +27,9 @@ inc_egress_batches_failed/1, inc_egress_messages/2, inc_egress_bytes/2, - observe_egress_flush_time/2 + + observe_egress_flush_time/2, + observe_next_time/3 ]). %% behavior callbacks: @@ -46,7 +48,7 @@ -define(DB_METRICS, []). --define(SHARD_METRICS, [ +-define(EGRESS_METRICS, [ {counter, 'emqx_ds_egress_batches'}, {counter, 'emqx_ds_egress_batches_retry'}, {counter, 'emqx_ds_egress_batches_failed'}, @@ -55,6 +57,12 @@ {slide, 'emqx_ds_egress_flush_time'} ]). +-define(INGRESS_METRICS, [ + {slide, 'emqx_ds_builtin_next_time'} +]). + +-define(SHARD_METRICS, ?EGRESS_METRICS ++ ?INGRESS_METRICS). + -type shard_metrics_id() :: binary(). %%================================================================================ @@ -112,6 +120,14 @@ inc_egress_bytes(Id, NMessages) -> observe_egress_flush_time(Id, FlushTime) -> emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime). +%% @doc Add a sample of elapsed time spent waiting for a +%% `emqx_ds_replication_layer:next' +-spec observe_next_time(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), non_neg_integer()) -> + ok. +observe_next_time(DB, Shard, NextTime) -> + Id = shard_metric_id(DB, Shard), + emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_builtin_next_time', NextTime). + prometheus_meta() -> lists:map( fun diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 14c2268b8..2d4982af3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -329,7 +329,11 @@ next(DB, Iter0, BatchSize) -> %% %% This kind of trickery should be probably done here in the %% replication layer. Or, perhaps, in the logic layer. - case ra_next(DB, Shard, StorageIter0, BatchSize) of + T0 = erlang:monotonic_time(microsecond), + Result = ra_next(DB, Shard, StorageIter0, BatchSize), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_next_time(DB, Shard, T1 - T0), + case Result of {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; From b379f331de614d53b1e809cf3bff73c5f4bd1321 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 31 Mar 2024 18:00:48 +0200 Subject: [PATCH 09/16] fix(sessds): Handle errors when storing messages --- apps/emqx/src/emqx_broker.erl | 8 +- apps/emqx/src/emqx_persistent_message.erl | 2 +- .../src/emqx_ds_builtin_metrics.erl | 102 ++++++++++++++---- .../src/emqx_ds_replication_layer.erl | 2 +- .../src/emqx_ds_storage_layer.erl | 6 +- apps/emqx_prometheus/src/emqx_prometheus.erl | 4 +- 6 files changed, 95 insertions(+), 29 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 1470b7d8b..ed29ea614 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -253,8 +253,12 @@ persist_publish(Msg) -> case emqx_persistent_message:persist(Msg) of ok -> [persisted]; - {_SkipOrError, _Reason} -> - % TODO: log errors? + {skipped, _} -> + []; + {error, Recoverable, Reason} -> + ?SLOG(debug, #{ + msg => "failed_to_persist_message", is_recoverable => Recoverable, reason => Reason + }), [] end. diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 10497216d..c909c5c5f 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -98,7 +98,7 @@ pre_config_update(_Root, _NewConf, _OldConf) -> %%-------------------------------------------------------------------- -spec persist(emqx_types:message()) -> - ok | {skipped, _Reason} | {error, _TODO}. + emqx_ds:store_batch_result() | {skipped, needs_no_persistence}. persist(Msg) -> ?WHEN_ENABLED( case needs_persistence(Msg) andalso has_subscribers(Msg) of diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index fc6de2861..833e39211 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -29,7 +29,10 @@ inc_egress_bytes/2, observe_egress_flush_time/2, - observe_next_time/3 + + observe_store_batch_time/2, + + observe_next_time/2 ]). %% behavior callbacks: @@ -46,7 +49,15 @@ -define(WORKER, ?MODULE). --define(DB_METRICS, []). +-define(STORAGE_LAYER_METRICS, [ + {slide, 'emqx_ds_store_batch_time'} +]). + +-define(FETCH_METRICS, [ + {slide, 'emqx_ds_builtin_next_time'} +]). + +-define(DB_METRICS, ?STORAGE_LAYER_METRICS ++ ?FETCH_METRICS). -define(EGRESS_METRICS, [ {counter, 'emqx_ds_egress_batches'}, @@ -57,14 +68,12 @@ {slide, 'emqx_ds_egress_flush_time'} ]). --define(INGRESS_METRICS, [ - {slide, 'emqx_ds_builtin_next_time'} -]). - --define(SHARD_METRICS, ?EGRESS_METRICS ++ ?INGRESS_METRICS). +-define(SHARD_METRICS, ?EGRESS_METRICS). -type shard_metrics_id() :: binary(). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + %%================================================================================ %% API functions %%================================================================================ @@ -75,8 +84,8 @@ child_spec() -> %% @doc Initialize metrics that are global for a DS database -spec init_for_db(emqx_ds:db()) -> ok. -init_for_db(_DB) -> - ok. +init_for_db(DB) -> + emqx_metrics_worker:create_metrics(?WORKER, DB, ?DB_METRICS, []). -spec shard_metric_id(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> shard_metrics_id(). shard_metric_id(DB, ShardId) -> @@ -90,43 +99,45 @@ init_for_shard(ShardId) -> %% @doc Increase the number of successfully flushed batches -spec inc_egress_batches(shard_metrics_id()) -> ok. inc_egress_batches(Id) -> - emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches'). + catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches'). %% @doc Increase the number of time the egress worker had to retry %% flushing the batch -spec inc_egress_batches_retry(shard_metrics_id()) -> ok. inc_egress_batches_retry(Id) -> - emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry'). + catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry'). %% @doc Increase the number of time the egress worker encountered an %% unrecoverable error while trying to flush the batch -spec inc_egress_batches_failed(shard_metrics_id()) -> ok. inc_egress_batches_failed(Id) -> - emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed'). + catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed'). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_messages(Id, NMessages) -> - emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages). + catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_bytes(Id, NMessages) -> - emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages). + catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages). %% @doc Add a sample of elapsed time spent flushing the egress to the %% Raft log (in microseconds) -spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. observe_egress_flush_time(Id, FlushTime) -> - emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime). + catch emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime). -%% @doc Add a sample of elapsed time spent waiting for a +-spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +observe_store_batch_time({DB, _}, StoreTime) -> + catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_store_batch_time', StoreTime). + +%% @doc Add a sample of elapsed time spent waiting for a batch %% `emqx_ds_replication_layer:next' --spec observe_next_time(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), non_neg_integer()) -> - ok. -observe_next_time(DB, Shard, NextTime) -> - Id = shard_metric_id(DB, Shard), - emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_builtin_next_time', NextTime). +-spec observe_next_time(emqx_ds:db(), non_neg_integer()) -> ok. +observe_next_time(DB, NextTime) -> + catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_builtin_next_time', NextTime). prometheus_meta() -> lists:map( @@ -136,11 +147,56 @@ prometheus_meta() -> ({slide, A}) -> {A, counter, A} end, - ?SHARD_METRICS + ?DB_METRICS ++ ?SHARD_METRICS ). prometheus_collect(NodeOrAggr) -> - prometheus_per_shard(NodeOrAggr). + maps:merge(prometheus_per_db(NodeOrAggr), prometheus_per_shard(NodeOrAggr)). + +prometheus_per_db(NodeOrAggr) -> + lists:foldl( + fun(DB, Acc) -> + prometheus_per_db(NodeOrAggr, DB, Acc) + end, + #{}, + emqx_ds_builtin_db_sup:which_dbs() + ). + +%% This function returns the data in the following format: +%% ``` +%% #{emqx_ds_store_batch_time => +%% [{[{db, emqx_persistent_message}], 42}], +%% ... +%% ''' +%% +%% If `NodeOrAggr' = `node' then node name is appended to the list of +%% labels. +prometheus_per_db(NodeOrAggr, DB, Acc0) -> + Labels = [ + {db, DB} + | case NodeOrAggr of + node -> []; + _ -> [{node, node()}] + end + ], + #{counters := CC, slides := SS} = emqx_metrics_worker:get_metrics(?WORKER, DB), + %% Collect counters: + Acc1 = maps:fold( + fun(MetricId, Value, Acc1) -> + append_to_key(MetricId, {Labels, Value}, Acc1) + end, + Acc0, + CC + ), + %% Collect slides: + maps:fold( + fun(MetricId, Value, Acc2) -> + Acc3 = append_to_key(MetricId, slide_value(current, Value, Labels), Acc2), + append_to_key(MetricId, slide_value(last5m, Value, Labels), Acc3) + end, + Acc1, + SS + ). %% This function returns the data in the following format: %% ``` diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 2d4982af3..f8478bb72 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -332,7 +332,7 @@ next(DB, Iter0, BatchSize) -> T0 = erlang:monotonic_time(microsecond), Result = ra_next(DB, Shard, StorageIter0, BatchSize), T1 = erlang:monotonic_time(microsecond), - emqx_ds_builtin_metrics:observe_next_time(DB, Shard, T1 - T0), + emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0), case Result of {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index cbc0e9abf..28ce1d943 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -258,7 +258,11 @@ store_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> shard => Shard, messages => Messages, options => Options }), #{module := Mod, data := GenData} = generation_at(Shard, Time), - Mod:store_batch(Shard, GenData, Messages, Options); + T0 = erlang:monotonic_time(microsecond), + Result = Mod:store_batch(Shard, GenData, Messages, Options), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), + Result; store_batch(_Shard, [], _Options) -> ok. diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 044e701e3..0ac032824 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -491,7 +491,9 @@ emqx_collect(K = emqx_ds_egress_batches_retry, D) -> counter_metrics(?MG(K, D, [ emqx_collect(K = emqx_ds_egress_batches_failed, D) -> counter_metrics(?MG(K, D, [])); emqx_collect(K = emqx_ds_egress_messages, D) -> counter_metrics(?MG(K, D, [])); emqx_collect(K = emqx_ds_egress_bytes, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, [])). +emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_store_batch_time, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_builtin_next_time, D) -> gauge_metrics(?MG(K, D, [])). %%-------------------------------------------------------------------- %% Indicators From f14c253dea4b8af720b2fdb6fd0b02af617695d5 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 1 Apr 2024 23:41:29 +0200 Subject: [PATCH 10/16] fix(prometheus): Don't add DS metrics when feature is disabled --- apps/emqx_prometheus/src/emqx_prometheus.erl | 25 +++++++++++++++---- .../test/emqx_prometheus_SUITE.erl | 4 +-- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 0ac032824..2327a7263 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -212,14 +212,30 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) -> ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)), ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)), - ok = add_collect_family( - Callback, emqx_ds_builtin_metrics:prometheus_meta(), ?MG(ds_data, RawData) - ), + ok = maybe_add_ds_collect_family(Callback, RawData), ok = maybe_license_add_collect_family(Callback, RawData), ok; collect_mf(_Registry, _Callback) -> ok. +maybe_add_ds_collect_family(Callback, RawData) -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + add_collect_family( + Callback, emqx_ds_builtin_metrics:prometheus_meta(), ?MG(ds_data, RawData) + ); + false -> + ok + end. + +maybe_collect_ds_data(Mode) -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + #{ds_data => emqx_ds_builtin_metrics:prometheus_collect(Mode)}; + false -> + #{} + end. + %% @private collect(<<"json">>) -> RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), @@ -254,7 +270,7 @@ add_collect_family(Name, Data, Callback, Type) -> %% behaviour fetch_from_local_node(Mode) -> - {node(), #{ + {node(), (maybe_collect_ds_data(Mode))#{ stats_data => stats_data(Mode), vm_data => vm_data(Mode), cluster_data => cluster_data(Mode), @@ -267,7 +283,6 @@ fetch_from_local_node(Mode) -> emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode), emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode), emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode), - ds_data => emqx_ds_builtin_metrics:prometheus_collect(Mode), mria_data => mria_data(Mode) }}. diff --git a/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl b/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl index 841610985..a2c2b96c2 100644 --- a/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl +++ b/apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl @@ -89,9 +89,7 @@ init_per_group(new_config, Config) -> Apps = emqx_cth_suite:start( [ %% coverage olp metrics - {emqx, - "overload_protection.enable = true\n" - "session_persistence.enable = true"}, + {emqx, "overload_protection.enable = true"}, {emqx_license, "license.key = default"}, {emqx_prometheus, #{config => config(default)}} ], From 94ca7ad0f86c6f924351f3852cf35bfb66b8bb02 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 2 Apr 2024 01:52:34 +0200 Subject: [PATCH 11/16] feat(ds): Report counters for LTS storage layout --- apps/emqx_durable_storage/include/emqx_ds.hrl | 4 +- .../include/emqx_ds_metrics.hrl | 49 ++++++++++++++++ .../src/emqx_ds_builtin_metrics.erl | 55 ++++++++++++------ .../src/emqx_ds_storage_bitfield_lts.erl | 57 ++++++++++++------- .../test/emqx_ds_SUITE.erl | 2 +- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 22 ++++--- apps/emqx_prometheus/src/emqx_prometheus.erl | 20 ++++--- 7 files changed, 150 insertions(+), 59 deletions(-) create mode 100644 apps/emqx_durable_storage/include/emqx_ds_metrics.hrl diff --git a/apps/emqx_durable_storage/include/emqx_ds.hrl b/apps/emqx_durable_storage/include/emqx_ds.hrl index f24605175..cc7a7431f 100644 --- a/apps/emqx_durable_storage/include/emqx_ds.hrl +++ b/apps/emqx_durable_storage/include/emqx_ds.hrl @@ -13,7 +13,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- --ifndef(EMQX_DS_HRL_HRL). --define(EMQX_DS_HRL_HRL, true). +-ifndef(EMQX_DS_HRL). +-define(EMQX_DS_HRL, true). -endif. diff --git a/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl b/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl new file mode 100644 index 000000000..0a82a6682 --- /dev/null +++ b/apps/emqx_durable_storage/include/emqx_ds_metrics.hrl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_DS_METRICS_HRL). +-define(EMQX_DS_METRICS_HRL, true). + +%%%% Egress metrics: + +%% Number of successfully flushed batches: +-define(DS_EGRESS_BATCHES, emqx_ds_egress_batches). +%% Number of batch flush retries: +-define(DS_EGRESS_BATCHES_RETRY, emqx_ds_egress_batches_retry). +%% Number of batches that weren't flushed due to unrecoverable errors: +-define(DS_EGRESS_BATCHES_FAILED, emqx_ds_egress_batches_failed). +%% Total number of messages that were successfully committed to the storage: +-define(DS_EGRESS_MESSAGES, emqx_ds_egress_messages). +%% Total size of payloads that were successfully committed to the storage: +-define(DS_EGRESS_BYTES, emqx_ds_egress_bytes). +%% Sliding average of flush time (microseconds): +-define(DS_EGRESS_FLUSH_TIME, emqx_ds_egress_flush_time). + +%%%% Storage layer metrics: +-define(DS_STORE_BATCH_TIME, emqx_ds_store_batch_time). +-define(DS_BUILTIN_NEXT_TIME, emqx_ds_builtin_next_time). + +%%% LTS Storage counters: + +%% This counter is incremented when the iterator seeks to the next interval: +-define(DS_LTS_SEEK_COUNTER, emqx_ds_storage_bitfield_lts_counter_seek). +%% This counter is incremented when the iterator proceeds to the next +%% key within the interval (this is is best case scenario): +-define(DS_LTS_NEXT_COUNTER, emqx_ds_storage_bitfield_lts_counter_next). +%% This counter is incremented when the key passes bitmask check, but +%% the value is rejected by the subsequent post-processing: +-define(DS_LTS_COLLISION_COUNTER, emqx_ds_storage_bitfield_lts_counter_collision). + +-endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index 833e39211..ce984db57 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -32,7 +32,11 @@ observe_store_batch_time/2, - observe_next_time/2 + observe_next_time/2, + + inc_lts_seek_counter/2, + inc_lts_next_counter/2, + inc_lts_collision_counter/2 ]). %% behavior callbacks: @@ -43,6 +47,8 @@ -export_type([shard_metrics_id/0]). +-include("emqx_ds_metrics.hrl"). + %%================================================================================ %% Type declarations %%================================================================================ @@ -50,22 +56,25 @@ -define(WORKER, ?MODULE). -define(STORAGE_LAYER_METRICS, [ - {slide, 'emqx_ds_store_batch_time'} + {slide, ?DS_STORE_BATCH_TIME}, + {counter, ?DS_LTS_SEEK_COUNTER}, + {counter, ?DS_LTS_NEXT_COUNTER}, + {counter, ?DS_LTS_COLLISION_COUNTER} ]). -define(FETCH_METRICS, [ - {slide, 'emqx_ds_builtin_next_time'} + {slide, ?DS_BUILTIN_NEXT_TIME} ]). -define(DB_METRICS, ?STORAGE_LAYER_METRICS ++ ?FETCH_METRICS). -define(EGRESS_METRICS, [ - {counter, 'emqx_ds_egress_batches'}, - {counter, 'emqx_ds_egress_batches_retry'}, - {counter, 'emqx_ds_egress_batches_failed'}, - {counter, 'emqx_ds_egress_messages'}, - {counter, 'emqx_ds_egress_bytes'}, - {slide, 'emqx_ds_egress_flush_time'} + {counter, ?DS_EGRESS_BATCHES}, + {counter, ?DS_EGRESS_BATCHES_RETRY}, + {counter, ?DS_EGRESS_BATCHES_FAILED}, + {counter, ?DS_EGRESS_MESSAGES}, + {counter, ?DS_EGRESS_BYTES}, + {slide, ?DS_EGRESS_FLUSH_TIME} ]). -define(SHARD_METRICS, ?EGRESS_METRICS). @@ -99,45 +108,57 @@ init_for_shard(ShardId) -> %% @doc Increase the number of successfully flushed batches -spec inc_egress_batches(shard_metrics_id()) -> ok. inc_egress_batches(Id) -> - catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches'). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES). %% @doc Increase the number of time the egress worker had to retry %% flushing the batch -spec inc_egress_batches_retry(shard_metrics_id()) -> ok. inc_egress_batches_retry(Id) -> - catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry'). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_RETRY). %% @doc Increase the number of time the egress worker encountered an %% unrecoverable error while trying to flush the batch -spec inc_egress_batches_failed(shard_metrics_id()) -> ok. inc_egress_batches_failed(Id) -> - catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed'). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BATCHES_FAILED). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_messages(Id, NMessages) -> - catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_MESSAGES, NMessages). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_bytes(Id, NMessages) -> - catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages). + catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_EGRESS_BYTES, NMessages). %% @doc Add a sample of elapsed time spent flushing the egress to the %% Raft log (in microseconds) -spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. observe_egress_flush_time(Id, FlushTime) -> - catch emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime). + catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_EGRESS_FLUSH_TIME, FlushTime). -spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. observe_store_batch_time({DB, _}, StoreTime) -> - catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_store_batch_time', StoreTime). + catch emqx_metrics_worker:observe(?WORKER, DB, ?DS_STORE_BATCH_TIME, StoreTime). %% @doc Add a sample of elapsed time spent waiting for a batch %% `emqx_ds_replication_layer:next' -spec observe_next_time(emqx_ds:db(), non_neg_integer()) -> ok. observe_next_time(DB, NextTime) -> - catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_builtin_next_time', NextTime). + catch emqx_metrics_worker:observe(?WORKER, DB, ?DS_BUILTIN_NEXT_TIME, NextTime). + +-spec inc_lts_seek_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +inc_lts_seek_counter({DB, _}, Inc) -> + catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_SEEK_COUNTER, Inc). + +-spec inc_lts_next_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +inc_lts_next_counter({DB, _}, Inc) -> + catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_NEXT_COUNTER, Inc). + +-spec inc_lts_collision_counter(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok. +inc_lts_collision_counter({DB, _}, Inc) -> + catch emqx_metrics_worker:inc(?WORKER, DB, ?DS_LTS_COLLISION_COUNTER, Inc). prometheus_meta() -> lists:map( diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 594854d21..2ec6674b6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -44,6 +44,7 @@ -export_type([options/0]). +-include("emqx_ds_metrics.hrl"). -include_lib("emqx_utils/include/emqx_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). @@ -115,8 +116,6 @@ ?last_seen_key := binary() }. --define(COUNTER, emqx_ds_storage_bitfield_lts_counter). - %% Limit on the number of wildcard levels in the learned topic trie: -define(WILDCARD_LIMIT, 10). @@ -140,6 +139,8 @@ -define(DIM_TOPIC, 1). -define(DIM_TS, 2). +-define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -347,13 +348,18 @@ update_iterator( ) -> {ok, OldIter#{?last_seen_key => DSKey}}. -next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> +next(Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. + init_counters(), Now = emqx_ds:timestamp_us(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, - next_until(Schema, It, SafeCutoffTime, BatchSize). + try + next_until(Schema, It, SafeCutoffTime, BatchSize) + after + report_counters(Shard) + end. next_until(_Schema, It = #{?tag := ?IT, ?start_time := StartTime}, SafeCutoffTime, _BatchSize) when StartTime >= SafeCutoffTime @@ -375,20 +381,23 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, filter := Filter } = prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers), try - put(?COUNTER, 0), next_loop(ITHandle, Keymapper, Filter, SafeCutoffTime, It, [], BatchSize) after - rocksdb:iterator_close(ITHandle), - erase(?COUNTER) + rocksdb:iterator_close(ITHandle) end. -delete_next(_Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> +delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. + init_counters(), Now = emqx_message:timestamp_now(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, - delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize). + try + delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) + after + report_counters(Shard) + end. delete_next_until( _Schema, @@ -417,7 +426,6 @@ delete_next_until( DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Keymappers ), try - put(?COUNTER, 0), LoopContext = LoopContext0#{ db => DB, cf => CF, @@ -430,8 +438,7 @@ delete_next_until( }, delete_next_loop(LoopContext) after - rocksdb:iterator_close(ITHandle), - erase(?COUNTER) + rocksdb:iterator_close(ITHandle) end. %%================================================================================ @@ -477,7 +484,6 @@ prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Key next_loop(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {ok, It, lists:reverse(Acc)}; next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> - inc_counter(), #{?tag := ?IT, ?last_seen_key := Key0} = It0, case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of overflow -> @@ -485,6 +491,7 @@ next_loop(ITHandle, KeyMapper, Filter, Cutoff, It0, Acc0, N0) -> Key1 -> %% assert true = Key1 > Key0, + inc_counter(?DS_LTS_SEEK_COUNTER), case rocksdb:iterator_move(ITHandle, {seek, Key1}) of {ok, Key, Val} -> {N, It, Acc} = traverse_interval( @@ -510,6 +517,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) - Acc = [{Key, Msg} | Acc0], traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N - 1); false -> + inc_counter(?DS_LTS_COLLISION_COUNTER), traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc0, N) end; overflow -> @@ -521,7 +529,7 @@ traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It0, Acc0, N) - traverse_interval(_ITHandle, _KeyMapper, _Filter, _Cutoff, It, Acc, 0) -> {0, It, Acc}; traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, It, Acc, N) -> - inc_counter(), + inc_counter(?DS_LTS_NEXT_COUNTER), case rocksdb:iterator_move(ITHandle, next) of {ok, Key, Val} -> traverse_interval(ITHandle, KeyMapper, Filter, Cutoff, Key, Val, It, Acc, N); @@ -541,7 +549,7 @@ delete_next_loop(LoopContext0) -> iterated_over := AccIter0, it_handle := ITHandle } = LoopContext0, - inc_counter(), + inc_counter(?DS_LTS_SEEK_COUNTER), #{?tag := ?DELETE_IT, ?last_seen_key := Key0} = It0, case emqx_ds_bitmask_keymapper:bin_increment(Filter, Key0) of overflow -> @@ -623,7 +631,7 @@ delete_traverse_interval1(LoopContext0) -> iterated_over := AccIter, storage_iter := It } = LoopContext0, - inc_counter(), + inc_counter(?DS_LTS_NEXT_COUNTER), case rocksdb:iterator_move(ITHandle, next) of {ok, Key, Val} -> delete_traverse_interval(LoopContext0#{ @@ -767,9 +775,20 @@ read_persisted_trie(IT, {ok, KeyB, ValB}) -> read_persisted_trie(_IT, {error, invalid_iterator}) -> []. -inc_counter() -> - N = get(?COUNTER), - put(?COUNTER, N + 1). +inc_counter(Counter) -> + N = get(Counter), + put(Counter, N + 1). + +init_counters() -> + _ = [put(I, 0) || I <- ?DS_LTS_COUNTERS], + ok. + +report_counters(Shard) -> + emqx_ds_builtin_metrics:inc_lts_seek_counter(Shard, get(?DS_LTS_SEEK_COUNTER)), + emqx_ds_builtin_metrics:inc_lts_next_counter(Shard, get(?DS_LTS_NEXT_COUNTER)), + emqx_ds_builtin_metrics:inc_lts_collision_counter(Shard, get(?DS_LTS_COLLISION_COUNTER)), + _ = [erase(I) || I <- ?DS_LTS_COUNTERS], + ok. %% @doc Generate a column family ID for the MQTT messages -spec data_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 33988a974..727f424b8 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -677,7 +677,7 @@ t_error_mapping_replication_layer(_Config) -> ), meck:unload(). -%% This test suite verifies the behavior of `store_batch' operation +%% This testcase verifies the behavior of `store_batch' operation %% when the underlying code experiences recoverable or unrecoverable %% problems. t_store_batch_fail(_Config) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 636b57b89..78838e675 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -261,8 +261,7 @@ t_atomic_store_batch(_Config) -> sync => true }) ), - - ok + timer:sleep(1000) end, fun(Trace) -> %% Must contain exactly one flush with all messages. @@ -293,19 +292,18 @@ t_non_atomic_store_batch(_Config) -> sync => true }) ), - - ok + Msgs end, - fun(Trace) -> - %% Should contain one flush per message. - ?assertMatch( - [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}], - ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) + fun(ExpectedMsgs, Trace) -> + ProcessedMsgs = lists:append( + ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)) ), - ok + ?assertEqual( + ExpectedMsgs, + ProcessedMsgs + ) end - ), - ok. + ). check(Shard, TopicFilter, StartTime, ExpectedMessages) -> ExpectedFiltered = lists:filter( diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 2327a7263..450033f18 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -37,6 +37,7 @@ -include_lib("public_key/include/public_key.hrl"). -include_lib("prometheus/include/prometheus_model.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_durable_storage/include/emqx_ds_metrics.hrl"). -import( prometheus_model_helpers, @@ -501,14 +502,17 @@ emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, []) emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])); %% DS -emqx_collect(K = emqx_ds_egress_batches, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_batches_retry, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_batches_failed, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_messages, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_bytes, D) -> counter_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_store_batch_time, D) -> gauge_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_ds_builtin_next_time, D) -> gauge_metrics(?MG(K, D, [])). +emqx_collect(K = ?DS_EGRESS_BATCHES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_BATCHES_RETRY, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_BATCHES_FAILED, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_MESSAGES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_BYTES, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_EGRESS_FLUSH_TIME, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_STORE_BATCH_TIME, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_BUILTIN_NEXT_TIME, D) -> gauge_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_LTS_SEEK_COUNTER, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_LTS_NEXT_COUNTER, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = ?DS_LTS_COLLISION_COUNTER, D) -> counter_metrics(?MG(K, D, [])). %%-------------------------------------------------------------------- %% Indicators From 4382971443548ed5f92bfb77eb644bf2805eb25c Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 2 Apr 2024 10:45:13 +0200 Subject: [PATCH 12/16] fix(ds): Preserve errors in the egress --- .../src/emqx_ds_replication_layer_egress.erl | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 667e1daa4..72b0a468b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -302,9 +302,9 @@ repackage_messages(DB, Messages, Sync, Atomic) -> #{}, Messages ), - maps:foreach( - fun(Shard, {NMsgs, ByteSize, RevMessages}) -> - gen_server:call( + maps:fold( + fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> + Err = gen_server:call( ?via(DB, Shard), #enqueue_req{ messages = lists:reverse(RevMessages), @@ -314,11 +314,22 @@ repackage_messages(DB, Messages, Sync, Atomic) -> payload_bytes = ByteSize }, infinity - ) + ), + compose_errors(ErrAcc, Err) end, + ok, Batches ). +compose_errors(ErrAcc, ok) -> + ErrAcc; +compose_errors(ok, Err) -> + Err; +compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) -> + {error, unrecoverable, Err}; +compose_errors(ErrAcc, _Err) -> + ErrAcc. + start_timer(S) -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), Tref = erlang:send_after(Interval, self(), ?flush), From ae5935e7f76eca57fab15459cd95537e3584cf6c Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 2 Apr 2024 17:41:38 +0200 Subject: [PATCH 13/16] test(ds): Attempt to stabilize metrics_worker tests in CI --- .../src/emqx_ds_replication_layer_egress.erl | 1 - .../test/emqx_ds_test_helpers.erl | 9 +++--- .../test/emqx_metrics_worker_SUITE.erl | 5 ++-- changes/ce/feat-12781.en.md | 29 +++++++++++++++++++ 4 files changed, 36 insertions(+), 8 deletions(-) create mode 100644 changes/ce/feat-12781.en.md diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 72b0a468b..bc0765f27 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -221,7 +221,6 @@ do_flush( emqx_ds_builtin_metrics:inc_egress_batches(Metrics), emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n), emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes), - lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), ?tp( emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages} diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 4af1e9791..be4f7bcdf 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -64,10 +64,11 @@ consume(DB, TopicFilter) -> consume(DB, TopicFilter, StartTime) -> lists:flatmap( - fun({_Stream, Msgs}) -> - Msgs - end, - consume_per_stream(DB, TopicFilter, StartTime)). + fun({_Stream, Msgs}) -> + Msgs + end, + consume_per_stream(DB, TopicFilter, StartTime) + ). consume_per_stream(DB, TopicFilter, StartTime) -> Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime), diff --git a/apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl b/apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl index 387e069cf..15866feb0 100644 --- a/apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl +++ b/apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl @@ -31,18 +31,17 @@ suite() -> -define(NAME, ?MODULE). init_per_suite(Config) -> - {ok, _} = emqx_metrics_worker:start_link(?NAME), Config. end_per_suite(_Config) -> - ok = emqx_metrics_worker:stop(?NAME). + ok. init_per_testcase(_, Config) -> - ok = emqx_metrics_worker:stop(?NAME), {ok, _} = emqx_metrics_worker:start_link(?NAME), Config. end_per_testcase(_, _Config) -> + ok = emqx_metrics_worker:stop(?NAME), ok. t_get_metrics(_) -> diff --git a/changes/ce/feat-12781.en.md b/changes/ce/feat-12781.en.md new file mode 100644 index 000000000..c884ccbc4 --- /dev/null +++ b/changes/ce/feat-12781.en.md @@ -0,0 +1,29 @@ +Added metrics related to EMQX durable storage to Prometheus. + +New metrics: + +- `emqx_ds_egress_batches` + +- `emqx_ds_egress_batches_retry` + +- `emqx_ds_egress_batches_failed` + +- `emqx_ds_egress_messages` + +- `emqx_ds_egress_bytes` + +- `emqx_ds_egress_flush_time` + +- `emqx_ds_store_batch_time` + +- `emqx_ds_builtin_next_time` + +- `emqx_ds_storage_bitfield_lts_counter_seek` + +- `emqx_ds_storage_bitfield_lts_counter_next` + +- `emqx_ds_storage_bitfield_lts_counter_collision` + +Note: these metrics are only visible when session persistence is enabled. + +Number of persisted messages has been also added to the dashboard. From 92ca90c0ca54c4c4f42d36d1e1a6bbdffaf32641 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 2 Apr 2024 22:29:10 +0200 Subject: [PATCH 14/16] fix(ds): Improve egress logging --- .../src/emqx_ds_replication_layer_egress.erl | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index bc0765f27..eb4b1fc70 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -114,6 +114,7 @@ store_batch(DB, Messages, Opts) -> init([DB, Shard]) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), + logger:update_process_metadata(#{domain => [emqx, ds, egress, DB]}), MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard), ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId), S = #s{ @@ -234,19 +235,29 @@ do_flush( pending_replies = [] }; {error, recoverable, Reason} -> + %% Note: this is a hot loop, so we report error messages + %% with `debug' level to avoid wiping the logs. Instead, + %% error the detection must rely on the metrics. Debug + %% logging can be enabled for the particular egress server + %% via logger domain. + ?tp( + debug, + emqx_ds_replication_layer_egress_flush_failed, + #{db => DB, shard => Shard, reason => Reason, recoverable => true} + ), %% Retry sending the batch: emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics), erlang:garbage_collect(), %% We block the gen_server until the next retry. BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), timer:sleep(BlockTime), - ?tp( - warning, - emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Reason} - ), S; - Err = {error, unrecoverable, _} -> + Err = {error, unrecoverable, Reason} -> + ?tp( + debug, + emqx_ds_replication_layer_egress_flush_failed, + #{db => DB, shard => Shard, reason => Reason, recoverable => false} + ), emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics), lists:foreach(fun(From) -> gen_server:reply(From, Err) end, Replies), erlang:garbage_collect(), From 2bbfada7af8a2fa6b2253adb1bdaf7c182b8f309 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 2 Apr 2024 23:02:47 +0200 Subject: [PATCH 15/16] fix(ds): Make async batches truly async --- .../src/emqx_ds_replication_layer_egress.erl | 58 +++++++++++-------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index eb4b1fc70..f328c7a99 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -76,7 +76,7 @@ store_batch(DB, Messages, Opts) -> case shards_of_batch(DB, Messages) of [{Shard, {NMsgs, NBytes}}] -> %% Happy case: - gen_server:call( + enqueue_call_or_cast( ?via(DB, Shard), #enqueue_req{ messages = Messages, @@ -84,8 +84,7 @@ store_batch(DB, Messages, Opts) -> atomic = Atomic, n_messages = NMsgs, payload_bytes = NBytes - }, - infinity + } ); [_, _ | _] when Atomic -> %% It's impossible to commit a batch to multiple shards @@ -93,7 +92,7 @@ store_batch(DB, Messages, Opts) -> {error, unrecoverable, atomic_commit_to_multiple_shards}; _Shards -> %% Use a slower implementation for the unlikely case: - repackage_messages(DB, Messages, Sync, Atomic) + repackage_messages(DB, Messages, Sync) end. %%================================================================================ @@ -127,15 +126,31 @@ init([DB, Shard]) -> handle_call( #enqueue_req{ - messages = Msgs, sync = Sync, atomic = Atomic, n_messages = NMsgs, payload_bytes = NBytes + messages = Msgs, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes }, From, - S + S0 = #s{pending_replies = Replies0} ) -> - {noreply, enqueue(From, Sync, Atomic, Msgs, NMsgs, NBytes, S)}; + S = S0#s{pending_replies = [From | Replies0]}, + {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. +handle_cast( + #enqueue_req{ + messages = Msgs, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes + }, + S +) -> + {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; handle_cast(_Cast, S) -> {noreply, S}. @@ -156,13 +171,12 @@ terminate(_Reason, _S) -> %%================================================================================ enqueue( - From, Sync, Atomic, Msgs, BatchSize, BatchBytes, - S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0, pending_replies = Replies0} + S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0} ) -> %% At this point we don't split the batches, even when they aren't %% atomic. It wouldn't win us anything in terms of memory, and @@ -178,20 +192,12 @@ enqueue( %% it now, and retry: cancel_timer(S0), S1 = flush(S0), - enqueue(From, Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); + enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); false -> %% The buffer is empty, we enqueue the atomic batch in its %% entirety: Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), - Replies = - case Sync of - true -> - [From | Replies0]; - false -> - gen_server:reply(From, ok), - Replies0 - end, - S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1, pending_replies = Replies}, + S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1}, case NMsgs >= NMax orelse NBytes >= NBytes of true -> cancel_timer(S1), @@ -295,7 +301,7 @@ shards_of_batch(DB, Messages) -> ) ). -repackage_messages(DB, Messages, Sync, Atomic) -> +repackage_messages(DB, Messages, Sync) -> Batches = lists:foldl( fun(Message, Acc) -> Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), @@ -314,16 +320,15 @@ repackage_messages(DB, Messages, Sync, Atomic) -> ), maps:fold( fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> - Err = gen_server:call( + Err = enqueue_call_or_cast( ?via(DB, Shard), #enqueue_req{ messages = lists:reverse(RevMessages), sync = Sync, - atomic = Atomic, + atomic = false, n_messages = NMsgs, payload_bytes = ByteSize - }, - infinity + } ), compose_errors(ErrAcc, Err) end, @@ -331,6 +336,11 @@ repackage_messages(DB, Messages, Sync, Atomic) -> Batches ). +enqueue_call_or_cast(To, Req = #enqueue_req{sync = true}) -> + gen_server:call(To, Req, infinity); +enqueue_call_or_cast(To, Req = #enqueue_req{sync = false}) -> + gen_server:cast(To, Req). + compose_errors(ErrAcc, ok) -> ErrAcc; compose_errors(ok, Err) -> From f37ed3a40a93896d3be2e1e58b8e073b214ec7b9 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 3 Apr 2024 13:57:16 +0200 Subject: [PATCH 16/16] fix(ds): Limit the number of retries in egress to 0 --- .../src/emqx_ds_replication_layer.erl | 2 + .../src/emqx_ds_replication_layer_egress.erl | 40 ++++++++++++++----- .../test/emqx_ds_SUITE.erl | 35 ++++++++-------- 3 files changed, 49 insertions(+), 28 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index f8478bb72..ecc6a492e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -551,6 +551,8 @@ list_nodes() -> end ). +-spec ra_store_batch(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), [emqx_types:message()]) -> + ok | {timeout, _} | {error, recoverable | unrecoverable, _Err} | _Err. ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index f328c7a99..4122d937d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -103,6 +103,11 @@ store_batch(DB, Messages, Opts) -> db :: emqx_ds:db(), shard :: emqx_ds_replication_layer:shard_id(), metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(), + n_retries = 0 :: non_neg_integer(), + %% FIXME: Currently max_retries is always 0, because replication + %% layer doesn't guarantee idempotency. Retrying would create + %% duplicate messages. + max_retries = 0 :: non_neg_integer(), n = 0 :: non_neg_integer(), n_bytes = 0 :: non_neg_integer(), tref :: undefined | reference(), @@ -216,7 +221,15 @@ flush(S) -> do_flush(S0 = #s{n = 0}) -> S0; do_flush( - S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard, metrics_id = Metrics} + S = #s{ + queue = Q, + pending_replies = Replies, + db = DB, + shard = Shard, + metrics_id = Metrics, + n_retries = Retries, + max_retries = MaxRetries + } ) -> Messages = queue:to_list(Q), T0 = erlang:monotonic_time(microsecond), @@ -240,7 +253,7 @@ do_flush( queue = queue:new(), pending_replies = [] }; - {error, recoverable, Reason} -> + {timeout, ServerId} when Retries < MaxRetries -> %% Note: this is a hot loop, so we report error messages %% with `debug' level to avoid wiping the logs. Instead, %% error the detection must rely on the metrics. Debug @@ -248,8 +261,8 @@ do_flush( %% via logger domain. ?tp( debug, - emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Reason, recoverable => true} + emqx_ds_replication_layer_egress_flush_retry, + #{db => DB, shard => Shard, reason => timeout, server_id => ServerId} ), %% Retry sending the batch: emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics), @@ -257,21 +270,30 @@ do_flush( %% We block the gen_server until the next retry. BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), timer:sleep(BlockTime), - S; - Err = {error, unrecoverable, Reason} -> + S#s{n_retries = Retries + 1}; + Err -> ?tp( debug, emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Reason, recoverable => false} + #{db => DB, shard => Shard, error => Err} ), emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics), - lists:foreach(fun(From) -> gen_server:reply(From, Err) end, Replies), + Reply = + case Err of + {error, _, _} -> Err; + {timeout, ServerId} -> {error, recoverable, {timeout, ServerId}}; + _ -> {error, unrecoverable, Err} + end, + lists:foreach( + fun(From) -> gen_server:reply(From, Reply) end, Replies + ), erlang:garbage_collect(), S#s{ n = 0, n_bytes = 0, queue = queue:new(), - pending_replies = [] + pending_replies = [], + n_retries = 0 } end. diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 727f424b8..1d2daacbb 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -684,7 +684,7 @@ t_store_batch_fail(_Config) -> ?check_trace( #{timetrap => 15_000}, try - meck:new(emqx_ds_replication_layer, [passthrough, no_history]), + meck:new(emqx_ds_storage_layer, [passthrough, no_history]), DB = ?FUNCTION_NAME, ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), %% Success: @@ -694,7 +694,7 @@ t_store_batch_fail(_Config) -> ], ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})), %% Inject unrecoverable error: - meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(_DB, _Shard, _Messages) -> + meck:expect(emqx_ds_storage_layer, store_batch, fun(_DB, _Shard, _Messages) -> {error, unrecoverable, mock} end), Batch2 = [ @@ -704,35 +704,32 @@ t_store_batch_fail(_Config) -> ?assertMatch( {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true}) ), - %% Inject a recoverable error: + meck:unload(emqx_ds_storage_layer), + %% Inject a recoveralbe error: + meck:new(ra, [passthrough, no_history]), + meck:expect(ra, process_command, fun(Servers, Shard, Command) -> + ?tp(ra_command, #{servers => Servers, shard => Shard, command => Command}), + {timeout, mock} + end), Batch3 = [ message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2), message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2), message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3), message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3) ], - meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(DB, Shard, Messages) -> - try - ?tp(store_batch, #{messages => Messages}), - meck:passthrough([DB, Shard, Messages]) - catch - _:_ -> - {error, recoverable, mock} - end - end), - ?inject_crash(#{?snk_kind := store_batch}, snabbkaffe_nemesis:recover_after(3)), + %% Note: due to idempotency issues the number of retries + %% is currently set to 0: + ?assertMatch( + {error, recoverable, {timeout, mock}}, + emqx_ds:store_batch(DB, Batch3, #{sync => true}) + ), + meck:unload(ra), ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})), lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1)) after meck:unload() end, [ - {"number of successfull flushes after retry", fun(Trace) -> - ?assertMatch([_, _], ?of_kind(store_batch, Trace)) - end}, - {"number of retries", fun(Trace) -> - ?assertMatch([_, _, _], ?of_kind(snabbkaffe_crash, Trace)) - end}, {"message ordering", fun(StoredMessages, _Trace) -> [{_, Stream1}, {_, Stream2}] = StoredMessages, ?assertMatch(