From 75b092bf0ed2d65e1ed2687390c95154b218b3fc Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 22 Mar 2024 22:30:28 +0100 Subject: [PATCH] fix(ds): Actually retry sending batch --- .../src/emqx_ds_builtin_db_sup.erl | 10 +- .../src/emqx_ds_builtin_metrics.erl | 132 +++++++- .../src/emqx_ds_replication_layer_egress.erl | 286 +++++++++++------- .../src/emqx_ds_storage_layer.erl | 3 + .../test/emqx_ds_SUITE.erl | 103 ++++++- .../test/emqx_ds_test_helpers.erl | 11 +- apps/emqx_prometheus/src/emqx_prometheus.erl | 26 +- 7 files changed, 411 insertions(+), 160 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 79e2f6120..a697b9276 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -31,7 +31,7 @@ ensure_shard/1, ensure_egress/1 ]). --export([which_shards/1]). +-export([which_dbs/0, which_shards/1]). %% behaviour callbacks: -export([init/1]). @@ -104,6 +104,13 @@ ensure_egress(Shard) -> which_shards(DB) -> supervisor:which_children(?via(#?shards_sup{db = DB})). +%% @doc Return the list of builtin DS databases that are currently +%% active on the node. +-spec which_dbs() -> [emqx_ds:db()]. +which_dbs() -> + Key = {n, l, #?db_sup{_ = '_', db = '$1'}}, + gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]). + %%================================================================================ %% behaviour callbacks %%================================================================================ @@ -111,6 +118,7 @@ which_shards(DB) -> init({#?db_sup{db = DB}, DefaultOpts}) -> %% Spec for the top-level supervisor for the database: logger:notice("Starting DS DB ~p", [DB]), + emqx_ds_builtin_metrics:init_for_db(DB), Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), ok = start_ra_system(DB, Opts), Children = [ diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index 8075238b3..f0eac9652 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -15,11 +15,16 @@ %%-------------------------------------------------------------------- -module(emqx_ds_builtin_metrics). -%% API: +%% DS-facing API: -export([child_spec/0, init_for_db/1, shard_metric_id/2, init_for_shard/1]). + +%% Prometheus-facing API: +-export([prometheus_meta/0, prometheus_collect/1]). + -export([ inc_egress_batches/1, inc_egress_batches_retry/1, + inc_egress_batches_failed/1, inc_egress_messages/2, inc_egress_bytes/2, observe_egress_flush_time/2 @@ -42,11 +47,12 @@ -define(DB_METRICS, []). -define(SHARD_METRICS, [ - 'egress.batches', - 'egress.batches.retry', - 'egress.messages', - 'egress.bytes', - {slide, 'egress.flush_time'} + {counter, 'emqx_ds_egress_batches'}, + {counter, 'emqx_ds_egress_batches_retry'}, + {counter, 'emqx_ds_egress_batches_failed'}, + {counter, 'emqx_ds_egress_messages'}, + {counter, 'emqx_ds_egress_bytes'}, + {slide, 'emqx_ds_egress_flush_time'} ]). -type shard_metrics_id() :: binary(). @@ -59,14 +65,16 @@ child_spec() -> emqx_metrics_worker:child_spec(?WORKER). +%% @doc Initialize metrics that are global for a DS database -spec init_for_db(emqx_ds:db()) -> ok. -init_for_db(DB) -> - emqx_metrics_worker:create_metrics(?WORKER, DB, ?DB_METRICS, []). +init_for_db(_DB) -> + ok. -spec shard_metric_id(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> shard_metrics_id(). shard_metric_id(DB, ShardId) -> iolist_to_binary([atom_to_list(DB), $/, ShardId]). +%% @doc Initialize metrics that are specific for the shard. -spec init_for_shard(shard_metrics_id()) -> ok. init_for_shard(ShardId) -> emqx_metrics_worker:create_metrics(?WORKER, ShardId, ?SHARD_METRICS, []). @@ -74,28 +82,124 @@ init_for_shard(ShardId) -> %% @doc Increase the number of successfully flushed batches -spec inc_egress_batches(shard_metrics_id()) -> ok. inc_egress_batches(Id) -> - emqx_metrics_worker:inc(?WORKER, Id, 'egress.batches'). + emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches'). %% @doc Increase the number of time the egress worker had to retry %% flushing the batch -spec inc_egress_batches_retry(shard_metrics_id()) -> ok. inc_egress_batches_retry(Id) -> - emqx_metrics_worker:inc(?WORKER, Id, 'egress.batches.retry'). + emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry'). + +%% @doc Increase the number of time the egress worker encountered an +%% unrecoverable error while trying to flush the batch +-spec inc_egress_batches_failed(shard_metrics_id()) -> ok. +inc_egress_batches_failed(Id) -> + emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed'). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_messages(Id, NMessages) -> - emqx_metrics_worker:inc(?WORKER, Id, 'egress.messages', NMessages). + emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages). %% @doc Increase the number of messages successfully saved to the shard -spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok. inc_egress_bytes(Id, NMessages) -> - emqx_metrics_worker:inc(?WORKER, Id, 'egress.bytes', NMessages). + emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages). -%% @doc Add a sample of time spent flushing the egress to the Raft log (in microseconds) +%% @doc Add a sample of elapsed time spent flushing the egress to the +%% Raft log (in microseconds) -spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok. observe_egress_flush_time(Id, FlushTime) -> - emqx_metrics_worker:observe(?WORKER, Id, 'egress.flush_time', FlushTime). + emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime). + +prometheus_meta() -> + lists:map( + fun + ({counter, A}) -> + {A, counter, A}; + ({slide, A}) -> + {A, counter, A} + end, + ?SHARD_METRICS + ). + +prometheus_collect(NodeOrAggr) -> + prometheus_per_shard(NodeOrAggr). + +%% This function returns the data in the following format: +%% ``` +%% #{emqx_ds_egress_batches => +%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],99408}, +%% {[{db,emqx_persistent_message},{shard,<<"0">>}],99409}], +%% emqx_ds_egress_batches_retry => +%% [{[{db,emqx_persistent_message},{shard,<<"1">>}],0}, +%% {[{db,emqx_persistent_message},{shard,<<"0">>}],0}], +%% emqx_ds_egress_messages => +%% ... +%% } +%% ''' +%% +%% If `NodeOrAggr' = `node' then node name is appended to the list of +%% labels. +prometheus_per_shard(NodeOrAggr) -> + lists:foldl( + fun(DB, Acc0) -> + lists:foldl( + fun(Shard, Acc) -> + prometheus_per_shard(NodeOrAggr, DB, Shard, Acc) + end, + Acc0, + emqx_ds_replication_layer_meta:shards(DB) + ) + end, + #{}, + emqx_ds_builtin_db_sup:which_dbs() + ). + +prometheus_per_shard(NodeOrAggr, DB, Shard, Acc0) -> + Labels = [ + {db, DB}, + {shard, Shard} + | case NodeOrAggr of + node -> []; + _ -> [{node, node()}] + end + ], + #{counters := CC, slides := SS} = emqx_metrics_worker:get_metrics( + ?WORKER, shard_metric_id(DB, Shard) + ), + %% Collect counters: + Acc1 = maps:fold( + fun(MetricId, Value, Acc1) -> + append_to_key(MetricId, {Labels, Value}, Acc1) + end, + Acc0, + CC + ), + %% Collect slides: + maps:fold( + fun(MetricId, Value, Acc2) -> + Acc3 = append_to_key(MetricId, slide_value(current, Value, Labels), Acc2), + append_to_key(MetricId, slide_value(last5m, Value, Labels), Acc3) + end, + Acc1, + SS + ). + +-spec append_to_key(K, V, #{K => [V]}) -> #{K => [V]}. +append_to_key(Key, Value, Map) -> + maps:update_with( + Key, + fun(L) -> + [Value | L] + end, + [Value], + Map + ). + +slide_value(Interval, Value, Labels0) -> + Labels = [{interval, Interval} | Labels0], + {Labels, maps:get(Interval, Value, 0)}. %%================================================================================ %% Internal functions diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 9651c029e..667e1daa4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -51,13 +51,10 @@ -define(flush, flush). -record(enqueue_req, { - message :: emqx_types:message(), - sync :: boolean(), - payload_bytes :: non_neg_integer() -}). --record(enqueue_atomic_req, { - batch :: [emqx_types:message()], + messages :: [emqx_types:message()], sync :: boolean(), + atomic :: boolean(), + n_messages :: non_neg_integer(), payload_bytes :: non_neg_integer() }). @@ -70,53 +67,33 @@ start_link(DB, Shard) -> gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []). -spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> - ok. + emqx_ds:store_batch_result(). store_batch(DB, Messages, Opts) -> Sync = maps:get(sync, Opts, true), - case maps:get(atomic, Opts, false) of - false -> - lists:foreach( - fun(Message) -> - Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), - gen_server:call( - ?via(DB, Shard), - #enqueue_req{ - message = Message, - sync = Sync, - payload_bytes = payload_size(Message) - }, - infinity - ) - end, - Messages + Atomic = maps:get(atomic, Opts, false), + %% Usually we expect all messages in the batch to go into the + %% single shard, so this function is optimized for the happy case. + case shards_of_batch(DB, Messages) of + [{Shard, {NMsgs, NBytes}}] -> + %% Happy case: + gen_server:call( + ?via(DB, Shard), + #enqueue_req{ + messages = Messages, + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = NBytes + }, + infinity ); - true -> - maps:foreach( - fun(Shard, Batch) -> - PayloadBytes = lists:foldl( - fun(Msg, Acc) -> - Acc + payload_size(Msg) - end, - 0, - Batch - ), - gen_server:call( - ?via(DB, Shard), - #enqueue_atomic_req{ - batch = Batch, - sync = Sync, - payload_bytes = PayloadBytes - }, - infinity - ) - end, - maps:groups_from_list( - fun(Message) -> - emqx_ds_replication_layer:shard_of_message(DB, Message, clientid) - end, - Messages - ) - ) + [_, _ | _] when Atomic -> + %% It's impossible to commit a batch to multiple shards + %% atomically + {error, unrecoverable, atomic_commit_to_multiple_shards}; + _Shards -> + %% Use a slower implementation for the unlikely case: + repackage_messages(DB, Messages, Sync, Atomic) end. %%================================================================================ @@ -129,7 +106,7 @@ store_batch(DB, Messages, Opts) -> metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(), n = 0 :: non_neg_integer(), n_bytes = 0 :: non_neg_integer(), - tref :: reference(), + tref :: undefined | reference(), queue :: queue:queue(emqx_types:message()), pending_replies = [] :: [gen_server:from()] }). @@ -143,16 +120,18 @@ init([DB, Shard]) -> db = DB, shard = Shard, metrics_id = MetricsId, - tref = start_timer(), queue = queue:new() }, - {ok, S}. + {ok, start_timer(S)}. -handle_call(#enqueue_req{message = Msg, sync = Sync, payload_bytes = NBytes}, From, S) -> - do_enqueue(From, Sync, Msg, NBytes, S); -handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync, payload_bytes = NBytes}, From, S) -> - Len = length(Batch), - do_enqueue(From, Sync, {atomic, Len, Batch}, NBytes, S); +handle_call( + #enqueue_req{ + messages = Msgs, sync = Sync, atomic = Atomic, n_messages = NMsgs, payload_bytes = NBytes + }, + From, + S +) -> + {noreply, enqueue(From, Sync, Atomic, Msgs, NMsgs, NBytes, S)}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -160,7 +139,7 @@ handle_cast(_Cast, S) -> {noreply, S}. handle_info(?flush, S) -> - {noreply, do_flush(S)}; + {noreply, flush(S)}; handle_info(_Info, S) -> {noreply, S}. @@ -175,9 +154,60 @@ terminate(_Reason, _S) -> %% Internal functions %%================================================================================ +enqueue( + From, + Sync, + Atomic, + Msgs, + BatchSize, + BatchBytes, + S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0, pending_replies = Replies0} +) -> + %% At this point we don't split the batches, even when they aren't + %% atomic. It wouldn't win us anything in terms of memory, and + %% EMQX currently feeds data to DS in very small batches, so + %% granularity should be fine enough. + NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), + NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity), + NMsgs = NMsgs0 + BatchSize, + NBytes = NBytes0 + BatchBytes, + case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of + true -> + %% Adding this batch would cause buffer to overflow. Flush + %% it now, and retry: + cancel_timer(S0), + S1 = flush(S0), + enqueue(From, Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); + false -> + %% The buffer is empty, we enqueue the atomic batch in its + %% entirety: + Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), + Replies = + case Sync of + true -> + [From | Replies0]; + false -> + gen_server:reply(From, ok), + Replies0 + end, + S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1, pending_replies = Replies}, + case NMsgs >= NMax orelse NBytes >= NBytes of + true -> + cancel_timer(S1), + flush(S1); + false -> + S1 + end + end. + -define(COOLDOWN_MIN, 1000). -define(COOLDOWN_MAX, 5000). +flush(S) -> + start_timer(do_flush(S)). + +do_flush(S0 = #s{n = 0}) -> + S0; do_flush( S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard, metrics_id = Metrics} ) -> @@ -202,73 +232,103 @@ do_flush( n = 0, n_bytes = 0, queue = queue:new(), - pending_replies = [], - tref = start_timer() + pending_replies = [] }; - Error -> - emqx_ds_builtin_metrics:inc_egress_batches_retry(S#s.metrics_id), + {error, recoverable, Reason} -> + %% Retry sending the batch: + emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics), erlang:garbage_collect(), + %% We block the gen_server until the next retry. + BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), + timer:sleep(BlockTime), ?tp( warning, emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, reason => Error} + #{db => DB, shard => Shard, reason => Reason} ), - Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), + S; + Err = {error, unrecoverable, _} -> + emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics), + lists:foreach(fun(From) -> gen_server:reply(From, Err) end, Replies), + erlang:garbage_collect(), S#s{ - tref = start_timer(Cooldown) + n = 0, + n_bytes = 0, + queue = queue:new(), + pending_replies = [] } end. -do_enqueue( - From, - Sync, - MsgOrBatch, - BatchBytes, - S0 = #s{n = N, n_bytes = NBytes0, queue = Q0, pending_replies = Replies} -) -> - NBytes = NBytes0 + BatchBytes, - NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), - S1 = - case MsgOrBatch of - {atomic, NumMsgs, Msgs} -> - Q = lists:foldl(fun queue:in/2, Q0, Msgs), - S0#s{n = N + NumMsgs, n_bytes = NBytes, queue = Q}; - Msg -> - S0#s{n = N + 1, n_bytes = NBytes, queue = queue:in(Msg, Q0)} - end, - %% TODO: later we may want to delay the reply until the message is - %% replicated, but it requies changes to the PUBACK/PUBREC flow to - %% allow for async replies. For now, we ack when the message is - %% _buffered_ rather than stored. - %% - %% Otherwise, the client would freeze for at least flush interval, - %% or until the buffer is filled. - S2 = - case Sync of - true -> - S1#s{pending_replies = [From | Replies]}; - false -> - gen_server:reply(From, ok), - S1 - end, - S = - case N >= NMax of - true -> - _ = erlang:cancel_timer(S2#s.tref), - do_flush(S2); - false -> - S2 - end, - %% TODO: add a backpressure mechanism for the server to avoid - %% building a long message queue. - {noreply, S}. +-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) -> + [{emqx_ds_replication_layer:shard_id(), {NMessages, NBytes}}] +when + NMessages :: non_neg_integer(), + NBytes :: non_neg_integer(). +shards_of_batch(DB, Messages) -> + maps:to_list( + lists:foldl( + fun(Message, Acc) -> + %% TODO: sharding strategy must be part of the DS DB schema: + Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), + Size = payload_size(Message), + maps:update_with( + Shard, + fun({N, S}) -> + {N + 1, S + Size} + end, + {1, Size}, + Acc + ) + end, + #{}, + Messages + ) + ). -start_timer() -> +repackage_messages(DB, Messages, Sync, Atomic) -> + Batches = lists:foldl( + fun(Message, Acc) -> + Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), + Size = payload_size(Message), + maps:update_with( + Shard, + fun({N, S, Msgs}) -> + {N + 1, S + Size, [Message | Msgs]} + end, + {1, Size, [Message]}, + Acc + ) + end, + #{}, + Messages + ), + maps:foreach( + fun(Shard, {NMsgs, ByteSize, RevMessages}) -> + gen_server:call( + ?via(DB, Shard), + #enqueue_req{ + messages = lists:reverse(RevMessages), + sync = Sync, + atomic = Atomic, + n_messages = NMsgs, + payload_bytes = ByteSize + }, + infinity + ) + end, + Batches + ). + +start_timer(S) -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), - start_timer(Interval). + Tref = erlang:send_after(Interval, self(), ?flush), + S#s{tref = Tref}. -start_timer(Interval) -> - erlang:send_after(Interval, self(), ?flush). +cancel_timer(#s{tref = undefined}) -> + ok; +cancel_timer(#s{tref = TRef}) -> + _ = erlang:cancel_timer(TRef), + ok. %% @doc Return approximate size of the MQTT message (it doesn't take %% all things into account, for example headers and extras) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 6b85328b6..cbc0e9abf 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -254,6 +254,9 @@ drop_shard(Shard) -> store_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. + ?tp(emqx_ds_storage_layer_store_batch, #{ + shard => Shard, messages => Messages, options => Options + }), #{module := Mod, data := GenData} = generation_at(Shard, Time), Mod:store_batch(Shard, GenData, Messages, Options); store_batch(_Shard, [], _Options) -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 3df16dc1c..33988a974 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -322,17 +322,10 @@ t_09_atomic_store_batch(_Config) -> sync => true }) ), - - ok + {ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush}), + ?assertMatch(#{batch := [_, _, _]}, Flush) end, - fun(Trace) -> - %% Must contain exactly one flush with all messages. - ?assertMatch( - [#{batch := [_, _, _]}], - ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) - ), - ok - end + [] ), ok. @@ -355,14 +348,15 @@ t_10_non_atomic_store_batch(_Config) -> sync => true }) ), - - ok + timer:sleep(1000) end, fun(Trace) -> %% Should contain one flush per message. + Batches = ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)), + ?assertMatch([_], Batches), ?assertMatch( - [#{batch := [_]}, #{batch := [_]}, #{batch := [_]}], - ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) + [_, _, _], + lists:append(Batches) ), ok end @@ -681,10 +675,86 @@ t_error_mapping_replication_layer(_Config) -> length([error || {error, _, _} <- Results2]) > 0, Results2 ), - - snabbkaffe:stop(), meck:unload(). +%% This test suite verifies the behavior of `store_batch' operation +%% when the underlying code experiences recoverable or unrecoverable +%% problems. +t_store_batch_fail(_Config) -> + ?check_trace( + #{timetrap => 15_000}, + try + meck:new(emqx_ds_replication_layer, [passthrough, no_history]), + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, (opts())#{n_shards => 2})), + %% Success: + Batch1 = [ + message(<<"C1">>, <<"foo/bar">>, <<"1">>, 1), + message(<<"C1">>, <<"foo/bar">>, <<"2">>, 1) + ], + ?assertMatch(ok, emqx_ds:store_batch(DB, Batch1, #{sync => true})), + %% Inject unrecoverable error: + meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(_DB, _Shard, _Messages) -> + {error, unrecoverable, mock} + end), + Batch2 = [ + message(<<"C1">>, <<"foo/bar">>, <<"3">>, 1), + message(<<"C1">>, <<"foo/bar">>, <<"4">>, 1) + ], + ?assertMatch( + {error, unrecoverable, mock}, emqx_ds:store_batch(DB, Batch2, #{sync => true}) + ), + %% Inject a recoverable error: + Batch3 = [ + message(<<"C1">>, <<"foo/bar">>, <<"5">>, 2), + message(<<"C2">>, <<"foo/bar">>, <<"6">>, 2), + message(<<"C1">>, <<"foo/bar">>, <<"7">>, 3), + message(<<"C2">>, <<"foo/bar">>, <<"8">>, 3) + ], + meck:expect(emqx_ds_replication_layer, ra_store_batch, fun(DB, Shard, Messages) -> + try + ?tp(store_batch, #{messages => Messages}), + meck:passthrough([DB, Shard, Messages]) + catch + _:_ -> + {error, recoverable, mock} + end + end), + ?inject_crash(#{?snk_kind := store_batch}, snabbkaffe_nemesis:recover_after(3)), + ?assertMatch(ok, emqx_ds:store_batch(DB, Batch3, #{sync => true})), + lists:sort(emqx_ds_test_helpers:consume_per_stream(DB, ['#'], 1)) + after + meck:unload() + end, + [ + {"number of successfull flushes after retry", fun(Trace) -> + ?assertMatch([_, _], ?of_kind(store_batch, Trace)) + end}, + {"number of retries", fun(Trace) -> + ?assertMatch([_, _, _], ?of_kind(snabbkaffe_crash, Trace)) + end}, + {"message ordering", fun(StoredMessages, _Trace) -> + [{_, Stream1}, {_, Stream2}] = StoredMessages, + ?assertMatch( + [ + #message{payload = <<"1">>}, + #message{payload = <<"2">>}, + #message{payload = <<"5">>}, + #message{payload = <<"7">>} + ], + Stream1 + ), + ?assertMatch( + [ + #message{payload = <<"6">>}, + #message{payload = <<"8">>} + ], + Stream2 + ) + end} + ] + ). + update_data_set() -> [ [ @@ -748,6 +818,7 @@ init_per_testcase(_TC, Config) -> Config. end_per_testcase(_TC, _Config) -> + snabbkaffe:stop(), ok = application:stop(emqx_durable_storage), mria:stop(), _ = mnesia:delete_schema([node()]), diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 44c45248b..4af1e9791 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -63,9 +63,16 @@ consume(DB, TopicFilter) -> consume(DB, TopicFilter, 0). consume(DB, TopicFilter, StartTime) -> - Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime), lists:flatmap( - fun({_Rank, Stream}) -> consume_stream(DB, Stream, TopicFilter, StartTime) end, + fun({_Stream, Msgs}) -> + Msgs + end, + consume_per_stream(DB, TopicFilter, StartTime)). + +consume_per_stream(DB, TopicFilter, StartTime) -> + Streams = emqx_ds:get_streams(DB, TopicFilter, StartTime), + lists:map( + fun({_Rank, Stream}) -> {Stream, consume_stream(DB, Stream, TopicFilter, StartTime)} end, Streams ). diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 813f91dbd..044e701e3 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -212,7 +212,9 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) -> ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)), ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)), - ok = add_collect_family(Callback, ds_metric_meta(), ?MG(ds_data, RawData)), + ok = add_collect_family( + Callback, emqx_ds_builtin_metrics:prometheus_meta(), ?MG(ds_data, RawData) + ), ok = maybe_license_add_collect_family(Callback, RawData), ok; collect_mf(_Registry, _Callback) -> @@ -265,6 +267,7 @@ fetch_from_local_node(Mode) -> emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode), emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode), emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode), + ds_data => emqx_ds_builtin_metrics:prometheus_collect(Mode), mria_data => mria_data(Mode) }}. @@ -481,7 +484,14 @@ emqx_collect(K = emqx_mria_lag, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_bootstrap_time, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, [])); emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, [])); -emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])). +emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])); +%% DS +emqx_collect(K = emqx_ds_egress_batches, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_egress_batches_retry, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_egress_batches_failed, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_egress_messages, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_egress_bytes, D) -> counter_metrics(?MG(K, D, [])); +emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, [])). %%-------------------------------------------------------------------- %% Indicators @@ -1012,18 +1022,6 @@ catch_all(DataFun) -> _:_ -> undefined end. -%%======================================== -%% Durable storge -%%======================================== - -ds_metric_meta() -> - [ - {emqx_ds_egress_batches, counter, 'egress.batches'}, - {emqx_ds_egress_batches_retry, counter, 'egress.batches.retry'}, - {emqx_ds_egress_messages, counter, 'egress.messages'}, - {emqx_ds_egress_bytes, counter, 'egress.bytes'} - ]. - %%-------------------------------------------------------------------- %% Collect functions %%--------------------------------------------------------------------