From 09c3ae795dca9305f8b49cc329d10c35938e3f2d Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 13 Jun 2024 23:53:22 +0200 Subject: [PATCH] refactor(ds_raft): Replace egress server with common emqx_ds_buffer --- .../test/emqx_ds_backends_SUITE.erl | 4 +- .../src/emqx_ds_builtin_raft_db_sup.erl | 3 +- .../src/emqx_ds_replication_layer.erl | 53 ++- .../src/emqx_ds_replication_layer_egress.erl | 392 ------------------ .../test/emqx_ds_replication_SUITE.erl | 4 +- .../src/emqx_ds_buffer.erl | 29 +- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 4 +- .../test/emqx_ds_test_helpers.erl | 2 +- 8 files changed, 62 insertions(+), 429 deletions(-) delete mode 100644 apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_egress.erl diff --git a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl index 11ea1417f..d119766f3 100644 --- a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl +++ b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl @@ -241,7 +241,7 @@ t_09_atomic_store_batch(Config) -> sync => true }) ), - {ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush}), + {ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_buffer_flush}), ?assertMatch(#{batch := [_, _, _]}, Flush) end, [] @@ -271,7 +271,7 @@ t_10_non_atomic_store_batch(Config) -> end, fun(Trace) -> %% Should contain one flush per message. - Batches = ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)), + Batches = ?projection(batch, ?of_kind(emqx_ds_buffer_flush, Trace)), ?assertMatch([_], Batches), ?assertMatch( [_, _, _], diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft_db_sup.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft_db_sup.erl index 74e97bf52..1816e551f 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft_db_sup.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft_db_sup.erl @@ -267,9 +267,10 @@ shard_allocator_spec(DB) -> }. egress_spec(DB, Shard) -> + Options = #{}, #{ id => Shard, - start => {emqx_ds_replication_layer_egress, start_link, [DB, Shard]}, + start => {emqx_ds_buffer, start_link, [emqx_ds_replication_layer, Options, DB, Shard]}, shutdown => 5_000, restart => permanent, type => worker diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl index 45f04e341..1c7e0c1c2 100644 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl @@ -7,6 +7,7 @@ -module(emqx_ds_replication_layer). %-behaviour(emqx_ds). +-behaviour(emqx_ds_buffer). -export([ list_shards/1, @@ -25,8 +26,12 @@ update_iterator/3, next/3, delete_next/4, - shard_of_message/3, - current_timestamp/2 + + current_timestamp/2, + + shard_of_message/4, + flush_buffer/4, + init_buffer/3 ]). %% internal exports: @@ -234,7 +239,7 @@ drop_db(DB) -> emqx_ds:store_batch_result(). store_batch(DB, Messages, Opts) -> try - emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts) + emqx_ds_buffer:store_batch(DB, Messages, Opts) catch error:{Reason, _Call} when Reason == timeout; Reason == noproc -> {error, recoverable, Reason} @@ -350,17 +355,6 @@ delete_next(DB, Iter0, Selector, BatchSize) -> Other end. --spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> - emqx_ds_replication_layer:shard_id(). -shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> - N = emqx_ds_replication_shard_allocator:n_shards(DB), - Hash = - case SerializeBy of - clientid -> erlang:phash2(From, N); - topic -> erlang:phash2(Topic, N) - end, - integer_to_binary(Hash). - -spec foreach_shard(emqx_ds:db(), fun((shard_id()) -> _)) -> ok. foreach_shard(DB, Fun) -> lists:foreach(Fun, list_shards(DB)). @@ -372,9 +366,38 @@ current_timestamp(DB, Shard) -> emqx_ds_builtin_raft_sup:get_gvar(DB, ?gv_timestamp(Shard), 0). %%================================================================================ -%% behavior callbacks +%% emqx_ds_buffer callbacks %%================================================================================ +-record(bs, {}). +-type egress_state() :: #bs{}. + +-spec init_buffer(emqx_ds:db(), shard_id(), _Options) -> {ok, egress_state()}. +init_buffer(_DB, _Shard, _Options) -> + {ok, #bs{}}. + +-spec flush_buffer(emqx_ds:db(), shard_id(), [emqx_types:message()], egress_state()) -> + {egress_state(), ok | {error, recoverable | unrecoverable, _}}. +flush_buffer(DB, Shard, Messages, State) -> + case ra_store_batch(DB, Shard, Messages) of + {timeout, ServerId} -> + Result = {error, recoverable, {timeout, ServerId}}; + Result -> + ok + end, + {State, Result}. + +-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) -> + emqx_ds_replication_layer:shard_id(). +shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) -> + N = emqx_ds_replication_shard_allocator:n_shards(DB), + Hash = + case SerializeBy of + clientid -> erlang:phash2(From, N); + topic -> erlang:phash2(Topic, N) + end, + integer_to_binary(Hash). + %%================================================================================ %% Internal exports (RPC targets) %%================================================================================ diff --git a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_egress.erl deleted file mode 100644 index ce117011c..000000000 --- a/apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_egress.erl +++ /dev/null @@ -1,392 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - -%% @doc Egress servers are responsible for proxing the outcoming -%% `store_batch' requests towards EMQX DS shards. -%% -%% They re-assemble messages from different local processes into -%% fixed-sized batches, and introduce centralized channels between the -%% nodes. They are also responsible for maintaining backpressure -%% towards the local publishers. -%% -%% There is (currently) one egress process for each shard running on -%% each node, but it should be possible to have a pool of egress -%% servers, if needed. --module(emqx_ds_replication_layer_egress). - --behaviour(gen_server). - -%% API: --export([start_link/2, store_batch/3]). - -%% behavior callbacks: --export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). - -%% internal exports: --export([]). - --export_type([]). - --include_lib("emqx_utils/include/emqx_message.hrl"). --include_lib("snabbkaffe/include/trace.hrl"). - -%%================================================================================ -%% Type declarations -%%================================================================================ - --define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}). --define(flush, flush). - --record(enqueue_req, { - messages :: [emqx_types:message()], - sync :: boolean(), - atomic :: boolean(), - n_messages :: non_neg_integer(), - payload_bytes :: non_neg_integer() -}). - -%%================================================================================ -%% API functions -%%================================================================================ - --spec start_link(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, pid()}. -start_link(DB, Shard) -> - gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []). - --spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> - emqx_ds:store_batch_result(). -store_batch(DB, Messages, Opts) -> - Sync = maps:get(sync, Opts, true), - Atomic = maps:get(atomic, Opts, false), - %% Usually we expect all messages in the batch to go into the - %% single shard, so this function is optimized for the happy case. - case shards_of_batch(DB, Messages) of - [{Shard, {NMsgs, NBytes}}] -> - %% Happy case: - enqueue_call_or_cast( - ?via(DB, Shard), - #enqueue_req{ - messages = Messages, - sync = Sync, - atomic = Atomic, - n_messages = NMsgs, - payload_bytes = NBytes - } - ); - [_, _ | _] when Atomic -> - %% It's impossible to commit a batch to multiple shards - %% atomically - {error, unrecoverable, atomic_commit_to_multiple_shards}; - _Shards -> - %% Use a slower implementation for the unlikely case: - repackage_messages(DB, Messages, Sync) - end. - -%%================================================================================ -%% behavior callbacks -%%================================================================================ - --record(s, { - db :: emqx_ds:db(), - shard :: emqx_ds_replication_layer:shard_id(), - metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(), - n_retries = 0 :: non_neg_integer(), - %% FIXME: Currently max_retries is always 0, because replication - %% layer doesn't guarantee idempotency. Retrying would create - %% duplicate messages. - max_retries = 0 :: non_neg_integer(), - n = 0 :: non_neg_integer(), - n_bytes = 0 :: non_neg_integer(), - tref :: undefined | reference(), - queue :: queue:queue(emqx_types:message()), - pending_replies = [] :: [gen_server:from()] -}). - -init([DB, Shard]) -> - process_flag(trap_exit, true), - process_flag(message_queue_data, off_heap), - logger:update_process_metadata(#{domain => [emqx, ds, egress, DB]}), - MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard), - ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId), - S = #s{ - db = DB, - shard = Shard, - metrics_id = MetricsId, - queue = queue:new() - }, - {ok, S}. - -format_status(Status) -> - maps:map( - fun - (state, #s{db = DB, shard = Shard, queue = Q}) -> - #{ - db => DB, - shard => Shard, - queue => queue:len(Q) - }; - (_, Val) -> - Val - end, - Status - ). - -handle_call( - #enqueue_req{ - messages = Msgs, - sync = Sync, - atomic = Atomic, - n_messages = NMsgs, - payload_bytes = NBytes - }, - From, - S0 = #s{pending_replies = Replies0} -) -> - S = S0#s{pending_replies = [From | Replies0]}, - {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; -handle_call(_Call, _From, S) -> - {reply, {error, unknown_call}, S}. - -handle_cast( - #enqueue_req{ - messages = Msgs, - sync = Sync, - atomic = Atomic, - n_messages = NMsgs, - payload_bytes = NBytes - }, - S -) -> - {noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)}; -handle_cast(_Cast, S) -> - {noreply, S}. - -handle_info(?flush, S) -> - {noreply, flush(S)}; -handle_info(_Info, S) -> - {noreply, S}. - -terminate(_Reason, _S) -> - ok. - -%%================================================================================ -%% Internal exports -%%================================================================================ - -%%================================================================================ -%% Internal functions -%%================================================================================ - -enqueue( - Sync, - Atomic, - Msgs, - BatchSize, - BatchBytes, - S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0} -) -> - %% At this point we don't split the batches, even when they aren't - %% atomic. It wouldn't win us anything in terms of memory, and - %% EMQX currently feeds data to DS in very small batches, so - %% granularity should be fine enough. - NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), - NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity), - NMsgs = NMsgs0 + BatchSize, - NBytes = NBytes0 + BatchBytes, - case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of - true -> - %% Adding this batch would cause buffer to overflow. Flush - %% it now, and retry: - S1 = flush(S0), - enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1); - false -> - %% The buffer is empty, we enqueue the atomic batch in its - %% entirety: - Q1 = lists:foldl(fun queue:in/2, Q0, Msgs), - S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1}, - case NMsgs >= NMax orelse NBytes >= NBytesMax of - true -> - flush(S1); - false -> - ensure_timer(S1) - end - end. - --define(COOLDOWN_MIN, 1000). --define(COOLDOWN_MAX, 5000). - -flush(S) -> - do_flush(cancel_timer(S)). - -do_flush(S0 = #s{n = 0}) -> - S0; -do_flush( - S = #s{ - queue = Q, - pending_replies = Replies, - db = DB, - shard = Shard, - metrics_id = Metrics, - n_retries = Retries, - max_retries = MaxRetries - } -) -> - Messages = queue:to_list(Q), - T0 = erlang:monotonic_time(microsecond), - Result = emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages), - T1 = erlang:monotonic_time(microsecond), - emqx_ds_builtin_metrics:observe_egress_flush_time(Metrics, T1 - T0), - case Result of - ok -> - emqx_ds_builtin_metrics:inc_egress_batches(Metrics), - emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n), - emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes), - ?tp( - emqx_ds_replication_layer_egress_flush, - #{db => DB, shard => Shard, batch => Messages} - ), - lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), - erlang:garbage_collect(), - S#s{ - n = 0, - n_bytes = 0, - queue = queue:new(), - pending_replies = [] - }; - {timeout, ServerId} when Retries < MaxRetries -> - %% Note: this is a hot loop, so we report error messages - %% with `debug' level to avoid wiping the logs. Instead, - %% error the detection must rely on the metrics. Debug - %% logging can be enabled for the particular egress server - %% via logger domain. - ?tp( - debug, - emqx_ds_replication_layer_egress_flush_retry, - #{db => DB, shard => Shard, reason => timeout, server_id => ServerId} - ), - %% Retry sending the batch: - emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics), - erlang:garbage_collect(), - %% We block the gen_server until the next retry. - BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), - timer:sleep(BlockTime), - S#s{n_retries = Retries + 1}; - Err -> - ?tp( - debug, - emqx_ds_replication_layer_egress_flush_failed, - #{db => DB, shard => Shard, error => Err} - ), - emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics), - Reply = - case Err of - {error, _, _} -> Err; - {timeout, ServerId} -> {error, recoverable, {timeout, ServerId}}; - _ -> {error, unrecoverable, Err} - end, - lists:foreach( - fun(From) -> gen_server:reply(From, Reply) end, Replies - ), - erlang:garbage_collect(), - S#s{ - n = 0, - n_bytes = 0, - queue = queue:new(), - pending_replies = [], - n_retries = 0 - } - end. - --spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) -> - [{emqx_ds_replication_layer:shard_id(), {NMessages, NBytes}}] -when - NMessages :: non_neg_integer(), - NBytes :: non_neg_integer(). -shards_of_batch(DB, Messages) -> - maps:to_list( - lists:foldl( - fun(Message, Acc) -> - %% TODO: sharding strategy must be part of the DS DB schema: - Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), - Size = payload_size(Message), - maps:update_with( - Shard, - fun({N, S}) -> - {N + 1, S + Size} - end, - {1, Size}, - Acc - ) - end, - #{}, - Messages - ) - ). - -repackage_messages(DB, Messages, Sync) -> - Batches = lists:foldl( - fun(Message, Acc) -> - Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid), - Size = payload_size(Message), - maps:update_with( - Shard, - fun({N, S, Msgs}) -> - {N + 1, S + Size, [Message | Msgs]} - end, - {1, Size, [Message]}, - Acc - ) - end, - #{}, - Messages - ), - maps:fold( - fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) -> - Err = enqueue_call_or_cast( - ?via(DB, Shard), - #enqueue_req{ - messages = lists:reverse(RevMessages), - sync = Sync, - atomic = false, - n_messages = NMsgs, - payload_bytes = ByteSize - } - ), - compose_errors(ErrAcc, Err) - end, - ok, - Batches - ). - -enqueue_call_or_cast(To, Req = #enqueue_req{sync = true}) -> - gen_server:call(To, Req, infinity); -enqueue_call_or_cast(To, Req = #enqueue_req{sync = false}) -> - gen_server:cast(To, Req). - -compose_errors(ErrAcc, ok) -> - ErrAcc; -compose_errors(ok, Err) -> - Err; -compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) -> - {error, unrecoverable, Err}; -compose_errors(ErrAcc, _Err) -> - ErrAcc. - -ensure_timer(S = #s{tref = undefined}) -> - Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), - Tref = erlang:send_after(Interval, self(), ?flush), - S#s{tref = Tref}; -ensure_timer(S) -> - S. - -cancel_timer(S = #s{tref = undefined}) -> - S; -cancel_timer(S = #s{tref = TRef}) -> - _ = erlang:cancel_timer(TRef), - S#s{tref = undefined}. - -%% @doc Return approximate size of the MQTT message (it doesn't take -%% all things into account, for example headers and extras) -payload_size(#message{payload = P, topic = T}) -> - size(P) + size(T). diff --git a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl index 978da91a4..3bb2ba4c4 100644 --- a/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl @@ -630,8 +630,8 @@ t_error_mapping_replication_layer(_Config) -> ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), - ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard1}), - ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard2}), + ?block_until(#{?snk_kind := emqx_ds_buffer_flush, shard := Shard1}), + ?block_until(#{?snk_kind := emqx_ds_buffer_flush, shard := Shard2}), Streams0 = emqx_ds:get_streams(DB, TopicFilter, 0), Iterators0 = lists:map( diff --git a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl index f6f6c6241..f0cf4fe83 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_buffer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_buffer.erl @@ -21,7 +21,7 @@ -behaviour(gen_server). %% API: --export([start_link/4, store_batch/3]). +-export([start_link/4, store_batch/3, shard_of_message/3]). %% behavior callbacks: -export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -63,7 +63,7 @@ %% API functions %%================================================================================ --spec start_link(module(), _CallbackOptions, emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> +-spec start_link(module(), _CallbackOptions, emqx_ds:db(), _ShardId) -> {ok, pid()}. start_link(CallbackModule, CallbackOptions, DB, Shard) -> gen_server:start_link( @@ -99,6 +99,11 @@ store_batch(DB, Messages, Opts) -> repackage_messages(DB, Messages, Sync) end. +-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> _Shard. +shard_of_message(DB, Message, ShardBy) -> + {CBM, Options} = persistent_term:get(?cbm(DB)), + CBM:shard_of_message(DB, Message, ShardBy, Options). + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -107,7 +112,7 @@ store_batch(DB, Messages, Opts) -> callback_module :: module(), callback_state :: term(), db :: emqx_ds:db(), - shard :: emqx_ds_replication_layer:shard_id(), + shard :: _ShardId, metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(), n_retries = 0 :: non_neg_integer(), %% FIXME: Currently max_retries is always 0, because replication @@ -124,7 +129,7 @@ store_batch(DB, Messages, Opts) -> init([CBM, CBMOptions, DB, Shard]) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), - logger:update_process_metadata(#{domain => [emqx, ds, egress, DB]}), + logger:update_process_metadata(#{domain => [emqx, ds, buffer, DB]}), MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard), ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId), {ok, CallbackS} = CBM:init_buffer(DB, Shard, CBMOptions), @@ -236,10 +241,6 @@ enqueue( end end. -shard_of_message(DB, Message, ShardBy) -> - {CBM, Options} = persistent_term:get(?cbm(DB)), - CBM:shard_of_message(DB, Message, ShardBy, Options). - -define(COOLDOWN_MIN, 1000). -define(COOLDOWN_MAX, 5000). @@ -273,7 +274,7 @@ do_flush( emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n), emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes), ?tp( - emqx_ds_replication_layer_egress_flush, + emqx_ds_buffer_flush, #{db => DB, shard => Shard, batch => Messages} ), lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), @@ -285,7 +286,7 @@ do_flush( queue = queue:new(), pending_replies = [] }; - {timeout, ServerId} when Retries < MaxRetries -> + {error, recoverable, Err} when Retries < MaxRetries -> %% Note: this is a hot loop, so we report error messages %% with `debug' level to avoid wiping the logs. Instead, %% error the detection must rely on the metrics. Debug @@ -293,8 +294,8 @@ do_flush( %% via logger domain. ?tp( debug, - emqx_ds_replication_layer_egress_flush_retry, - #{db => DB, shard => Shard, reason => timeout, server_id => ServerId} + emqx_ds_buffer_flush_retry, + #{db => DB, shard => Shard, reason => Err} ), %% Retry sending the batch: emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics), @@ -306,7 +307,7 @@ do_flush( Err -> ?tp( debug, - emqx_ds_replication_layer_egress_flush_failed, + emqx_ds_buffer_flush_failed, #{db => DB, shard => Shard, error => Err} ), emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics), @@ -330,7 +331,7 @@ do_flush( end. -spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) -> - [{emqx_ds_replication_layer:shard_id(), {NMessages, NBytes}}] + [{_ShardId, {NMessages, NBytes}}] when NMessages :: non_neg_integer(), NBytes :: non_neg_integer(). diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index bf820e0bf..54033ae78 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -276,7 +276,7 @@ t_atomic_store_batch(_Config) -> %% Must contain exactly one flush with all messages. ?assertMatch( [#{batch := [_, _, _]}], - ?of_kind(emqx_ds_replication_layer_egress_flush, Trace) + ?of_kind(emqx_ds_buffer_flush, Trace) ), ok end @@ -305,7 +305,7 @@ t_non_atomic_store_batch(_Config) -> end, fun(ExpectedMsgs, Trace) -> ProcessedMsgs = lists:append( - ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)) + ?projection(batch, ?of_kind(emqx_ds_buffer_flush, Trace)) ), ?assertEqual( ExpectedMsgs, diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index 5e7753058..7130041ec 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -310,7 +310,7 @@ nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) -> shard_of_clientid(DB, Node, ClientId) -> ?ON( Node, - emqx_ds_replication_layer:shard_of_message(DB, #message{from = ClientId}, clientid) + emqx_ds_buffer:shard_of_message(DB, #message{from = ClientId}, clientid) ). %% Consume eagerly: