refactor(ds_raft): Replace egress server with common emqx_ds_buffer
This commit is contained in:
parent
a0fbd37e58
commit
09c3ae795d
|
@ -241,7 +241,7 @@ t_09_atomic_store_batch(Config) ->
|
||||||
sync => true
|
sync => true
|
||||||
})
|
})
|
||||||
),
|
),
|
||||||
{ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush}),
|
{ok, Flush} = ?block_until(#{?snk_kind := emqx_ds_buffer_flush}),
|
||||||
?assertMatch(#{batch := [_, _, _]}, Flush)
|
?assertMatch(#{batch := [_, _, _]}, Flush)
|
||||||
end,
|
end,
|
||||||
[]
|
[]
|
||||||
|
@ -271,7 +271,7 @@ t_10_non_atomic_store_batch(Config) ->
|
||||||
end,
|
end,
|
||||||
fun(Trace) ->
|
fun(Trace) ->
|
||||||
%% Should contain one flush per message.
|
%% Should contain one flush per message.
|
||||||
Batches = ?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace)),
|
Batches = ?projection(batch, ?of_kind(emqx_ds_buffer_flush, Trace)),
|
||||||
?assertMatch([_], Batches),
|
?assertMatch([_], Batches),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[_, _, _],
|
[_, _, _],
|
||||||
|
|
|
@ -267,9 +267,10 @@ shard_allocator_spec(DB) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
egress_spec(DB, Shard) ->
|
egress_spec(DB, Shard) ->
|
||||||
|
Options = #{},
|
||||||
#{
|
#{
|
||||||
id => Shard,
|
id => Shard,
|
||||||
start => {emqx_ds_replication_layer_egress, start_link, [DB, Shard]},
|
start => {emqx_ds_buffer, start_link, [emqx_ds_replication_layer, Options, DB, Shard]},
|
||||||
shutdown => 5_000,
|
shutdown => 5_000,
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
type => worker
|
type => worker
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
-module(emqx_ds_replication_layer).
|
-module(emqx_ds_replication_layer).
|
||||||
|
|
||||||
%-behaviour(emqx_ds).
|
%-behaviour(emqx_ds).
|
||||||
|
-behaviour(emqx_ds_buffer).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
list_shards/1,
|
list_shards/1,
|
||||||
|
@ -25,8 +26,12 @@
|
||||||
update_iterator/3,
|
update_iterator/3,
|
||||||
next/3,
|
next/3,
|
||||||
delete_next/4,
|
delete_next/4,
|
||||||
shard_of_message/3,
|
|
||||||
current_timestamp/2
|
current_timestamp/2,
|
||||||
|
|
||||||
|
shard_of_message/4,
|
||||||
|
flush_buffer/4,
|
||||||
|
init_buffer/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% internal exports:
|
%% internal exports:
|
||||||
|
@ -234,7 +239,7 @@ drop_db(DB) ->
|
||||||
emqx_ds:store_batch_result().
|
emqx_ds:store_batch_result().
|
||||||
store_batch(DB, Messages, Opts) ->
|
store_batch(DB, Messages, Opts) ->
|
||||||
try
|
try
|
||||||
emqx_ds_replication_layer_egress:store_batch(DB, Messages, Opts)
|
emqx_ds_buffer:store_batch(DB, Messages, Opts)
|
||||||
catch
|
catch
|
||||||
error:{Reason, _Call} when Reason == timeout; Reason == noproc ->
|
error:{Reason, _Call} when Reason == timeout; Reason == noproc ->
|
||||||
{error, recoverable, Reason}
|
{error, recoverable, Reason}
|
||||||
|
@ -350,17 +355,6 @@ delete_next(DB, Iter0, Selector, BatchSize) ->
|
||||||
Other
|
Other
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) ->
|
|
||||||
emqx_ds_replication_layer:shard_id().
|
|
||||||
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
|
|
||||||
N = emqx_ds_replication_shard_allocator:n_shards(DB),
|
|
||||||
Hash =
|
|
||||||
case SerializeBy of
|
|
||||||
clientid -> erlang:phash2(From, N);
|
|
||||||
topic -> erlang:phash2(Topic, N)
|
|
||||||
end,
|
|
||||||
integer_to_binary(Hash).
|
|
||||||
|
|
||||||
-spec foreach_shard(emqx_ds:db(), fun((shard_id()) -> _)) -> ok.
|
-spec foreach_shard(emqx_ds:db(), fun((shard_id()) -> _)) -> ok.
|
||||||
foreach_shard(DB, Fun) ->
|
foreach_shard(DB, Fun) ->
|
||||||
lists:foreach(Fun, list_shards(DB)).
|
lists:foreach(Fun, list_shards(DB)).
|
||||||
|
@ -372,9 +366,38 @@ current_timestamp(DB, Shard) ->
|
||||||
emqx_ds_builtin_raft_sup:get_gvar(DB, ?gv_timestamp(Shard), 0).
|
emqx_ds_builtin_raft_sup:get_gvar(DB, ?gv_timestamp(Shard), 0).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% behavior callbacks
|
%% emqx_ds_buffer callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
|
-record(bs, {}).
|
||||||
|
-type egress_state() :: #bs{}.
|
||||||
|
|
||||||
|
-spec init_buffer(emqx_ds:db(), shard_id(), _Options) -> {ok, egress_state()}.
|
||||||
|
init_buffer(_DB, _Shard, _Options) ->
|
||||||
|
{ok, #bs{}}.
|
||||||
|
|
||||||
|
-spec flush_buffer(emqx_ds:db(), shard_id(), [emqx_types:message()], egress_state()) ->
|
||||||
|
{egress_state(), ok | {error, recoverable | unrecoverable, _}}.
|
||||||
|
flush_buffer(DB, Shard, Messages, State) ->
|
||||||
|
case ra_store_batch(DB, Shard, Messages) of
|
||||||
|
{timeout, ServerId} ->
|
||||||
|
Result = {error, recoverable, {timeout, ServerId}};
|
||||||
|
Result ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
{State, Result}.
|
||||||
|
|
||||||
|
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic, _Options) ->
|
||||||
|
emqx_ds_replication_layer:shard_id().
|
||||||
|
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy, _Options) ->
|
||||||
|
N = emqx_ds_replication_shard_allocator:n_shards(DB),
|
||||||
|
Hash =
|
||||||
|
case SerializeBy of
|
||||||
|
clientid -> erlang:phash2(From, N);
|
||||||
|
topic -> erlang:phash2(Topic, N)
|
||||||
|
end,
|
||||||
|
integer_to_binary(Hash).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal exports (RPC targets)
|
%% Internal exports (RPC targets)
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
|
@ -1,392 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc Egress servers are responsible for proxing the outcoming
|
|
||||||
%% `store_batch' requests towards EMQX DS shards.
|
|
||||||
%%
|
|
||||||
%% They re-assemble messages from different local processes into
|
|
||||||
%% fixed-sized batches, and introduce centralized channels between the
|
|
||||||
%% nodes. They are also responsible for maintaining backpressure
|
|
||||||
%% towards the local publishers.
|
|
||||||
%%
|
|
||||||
%% There is (currently) one egress process for each shard running on
|
|
||||||
%% each node, but it should be possible to have a pool of egress
|
|
||||||
%% servers, if needed.
|
|
||||||
-module(emqx_ds_replication_layer_egress).
|
|
||||||
|
|
||||||
-behaviour(gen_server).
|
|
||||||
|
|
||||||
%% API:
|
|
||||||
-export([start_link/2, store_batch/3]).
|
|
||||||
|
|
||||||
%% behavior callbacks:
|
|
||||||
-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
|
||||||
|
|
||||||
%% internal exports:
|
|
||||||
-export([]).
|
|
||||||
|
|
||||||
-export_type([]).
|
|
||||||
|
|
||||||
-include_lib("emqx_utils/include/emqx_message.hrl").
|
|
||||||
-include_lib("snabbkaffe/include/trace.hrl").
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% Type declarations
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
-define(via(DB, Shard), {via, gproc, {n, l, {?MODULE, DB, Shard}}}).
|
|
||||||
-define(flush, flush).
|
|
||||||
|
|
||||||
-record(enqueue_req, {
|
|
||||||
messages :: [emqx_types:message()],
|
|
||||||
sync :: boolean(),
|
|
||||||
atomic :: boolean(),
|
|
||||||
n_messages :: non_neg_integer(),
|
|
||||||
payload_bytes :: non_neg_integer()
|
|
||||||
}).
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% API functions
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
-spec start_link(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, pid()}.
|
|
||||||
start_link(DB, Shard) ->
|
|
||||||
gen_server:start_link(?via(DB, Shard), ?MODULE, [DB, Shard], []).
|
|
||||||
|
|
||||||
-spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) ->
|
|
||||||
emqx_ds:store_batch_result().
|
|
||||||
store_batch(DB, Messages, Opts) ->
|
|
||||||
Sync = maps:get(sync, Opts, true),
|
|
||||||
Atomic = maps:get(atomic, Opts, false),
|
|
||||||
%% Usually we expect all messages in the batch to go into the
|
|
||||||
%% single shard, so this function is optimized for the happy case.
|
|
||||||
case shards_of_batch(DB, Messages) of
|
|
||||||
[{Shard, {NMsgs, NBytes}}] ->
|
|
||||||
%% Happy case:
|
|
||||||
enqueue_call_or_cast(
|
|
||||||
?via(DB, Shard),
|
|
||||||
#enqueue_req{
|
|
||||||
messages = Messages,
|
|
||||||
sync = Sync,
|
|
||||||
atomic = Atomic,
|
|
||||||
n_messages = NMsgs,
|
|
||||||
payload_bytes = NBytes
|
|
||||||
}
|
|
||||||
);
|
|
||||||
[_, _ | _] when Atomic ->
|
|
||||||
%% It's impossible to commit a batch to multiple shards
|
|
||||||
%% atomically
|
|
||||||
{error, unrecoverable, atomic_commit_to_multiple_shards};
|
|
||||||
_Shards ->
|
|
||||||
%% Use a slower implementation for the unlikely case:
|
|
||||||
repackage_messages(DB, Messages, Sync)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% behavior callbacks
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
-record(s, {
|
|
||||||
db :: emqx_ds:db(),
|
|
||||||
shard :: emqx_ds_replication_layer:shard_id(),
|
|
||||||
metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(),
|
|
||||||
n_retries = 0 :: non_neg_integer(),
|
|
||||||
%% FIXME: Currently max_retries is always 0, because replication
|
|
||||||
%% layer doesn't guarantee idempotency. Retrying would create
|
|
||||||
%% duplicate messages.
|
|
||||||
max_retries = 0 :: non_neg_integer(),
|
|
||||||
n = 0 :: non_neg_integer(),
|
|
||||||
n_bytes = 0 :: non_neg_integer(),
|
|
||||||
tref :: undefined | reference(),
|
|
||||||
queue :: queue:queue(emqx_types:message()),
|
|
||||||
pending_replies = [] :: [gen_server:from()]
|
|
||||||
}).
|
|
||||||
|
|
||||||
init([DB, Shard]) ->
|
|
||||||
process_flag(trap_exit, true),
|
|
||||||
process_flag(message_queue_data, off_heap),
|
|
||||||
logger:update_process_metadata(#{domain => [emqx, ds, egress, DB]}),
|
|
||||||
MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard),
|
|
||||||
ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId),
|
|
||||||
S = #s{
|
|
||||||
db = DB,
|
|
||||||
shard = Shard,
|
|
||||||
metrics_id = MetricsId,
|
|
||||||
queue = queue:new()
|
|
||||||
},
|
|
||||||
{ok, S}.
|
|
||||||
|
|
||||||
format_status(Status) ->
|
|
||||||
maps:map(
|
|
||||||
fun
|
|
||||||
(state, #s{db = DB, shard = Shard, queue = Q}) ->
|
|
||||||
#{
|
|
||||||
db => DB,
|
|
||||||
shard => Shard,
|
|
||||||
queue => queue:len(Q)
|
|
||||||
};
|
|
||||||
(_, Val) ->
|
|
||||||
Val
|
|
||||||
end,
|
|
||||||
Status
|
|
||||||
).
|
|
||||||
|
|
||||||
handle_call(
|
|
||||||
#enqueue_req{
|
|
||||||
messages = Msgs,
|
|
||||||
sync = Sync,
|
|
||||||
atomic = Atomic,
|
|
||||||
n_messages = NMsgs,
|
|
||||||
payload_bytes = NBytes
|
|
||||||
},
|
|
||||||
From,
|
|
||||||
S0 = #s{pending_replies = Replies0}
|
|
||||||
) ->
|
|
||||||
S = S0#s{pending_replies = [From | Replies0]},
|
|
||||||
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
|
|
||||||
handle_call(_Call, _From, S) ->
|
|
||||||
{reply, {error, unknown_call}, S}.
|
|
||||||
|
|
||||||
handle_cast(
|
|
||||||
#enqueue_req{
|
|
||||||
messages = Msgs,
|
|
||||||
sync = Sync,
|
|
||||||
atomic = Atomic,
|
|
||||||
n_messages = NMsgs,
|
|
||||||
payload_bytes = NBytes
|
|
||||||
},
|
|
||||||
S
|
|
||||||
) ->
|
|
||||||
{noreply, enqueue(Sync, Atomic, Msgs, NMsgs, NBytes, S)};
|
|
||||||
handle_cast(_Cast, S) ->
|
|
||||||
{noreply, S}.
|
|
||||||
|
|
||||||
handle_info(?flush, S) ->
|
|
||||||
{noreply, flush(S)};
|
|
||||||
handle_info(_Info, S) ->
|
|
||||||
{noreply, S}.
|
|
||||||
|
|
||||||
terminate(_Reason, _S) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% Internal exports
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
%%================================================================================
|
|
||||||
%% Internal functions
|
|
||||||
%%================================================================================
|
|
||||||
|
|
||||||
enqueue(
|
|
||||||
Sync,
|
|
||||||
Atomic,
|
|
||||||
Msgs,
|
|
||||||
BatchSize,
|
|
||||||
BatchBytes,
|
|
||||||
S0 = #s{n = NMsgs0, n_bytes = NBytes0, queue = Q0}
|
|
||||||
) ->
|
|
||||||
%% At this point we don't split the batches, even when they aren't
|
|
||||||
%% atomic. It wouldn't win us anything in terms of memory, and
|
|
||||||
%% EMQX currently feeds data to DS in very small batches, so
|
|
||||||
%% granularity should be fine enough.
|
|
||||||
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
|
|
||||||
NBytesMax = application:get_env(emqx_durable_storage, egress_batch_bytes, infinity),
|
|
||||||
NMsgs = NMsgs0 + BatchSize,
|
|
||||||
NBytes = NBytes0 + BatchBytes,
|
|
||||||
case (NMsgs >= NMax orelse NBytes >= NBytesMax) andalso (NMsgs0 > 0) of
|
|
||||||
true ->
|
|
||||||
%% Adding this batch would cause buffer to overflow. Flush
|
|
||||||
%% it now, and retry:
|
|
||||||
S1 = flush(S0),
|
|
||||||
enqueue(Sync, Atomic, Msgs, BatchSize, BatchBytes, S1);
|
|
||||||
false ->
|
|
||||||
%% The buffer is empty, we enqueue the atomic batch in its
|
|
||||||
%% entirety:
|
|
||||||
Q1 = lists:foldl(fun queue:in/2, Q0, Msgs),
|
|
||||||
S1 = S0#s{n = NMsgs, n_bytes = NBytes, queue = Q1},
|
|
||||||
case NMsgs >= NMax orelse NBytes >= NBytesMax of
|
|
||||||
true ->
|
|
||||||
flush(S1);
|
|
||||||
false ->
|
|
||||||
ensure_timer(S1)
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
-define(COOLDOWN_MIN, 1000).
|
|
||||||
-define(COOLDOWN_MAX, 5000).
|
|
||||||
|
|
||||||
flush(S) ->
|
|
||||||
do_flush(cancel_timer(S)).
|
|
||||||
|
|
||||||
do_flush(S0 = #s{n = 0}) ->
|
|
||||||
S0;
|
|
||||||
do_flush(
|
|
||||||
S = #s{
|
|
||||||
queue = Q,
|
|
||||||
pending_replies = Replies,
|
|
||||||
db = DB,
|
|
||||||
shard = Shard,
|
|
||||||
metrics_id = Metrics,
|
|
||||||
n_retries = Retries,
|
|
||||||
max_retries = MaxRetries
|
|
||||||
}
|
|
||||||
) ->
|
|
||||||
Messages = queue:to_list(Q),
|
|
||||||
T0 = erlang:monotonic_time(microsecond),
|
|
||||||
Result = emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages),
|
|
||||||
T1 = erlang:monotonic_time(microsecond),
|
|
||||||
emqx_ds_builtin_metrics:observe_egress_flush_time(Metrics, T1 - T0),
|
|
||||||
case Result of
|
|
||||||
ok ->
|
|
||||||
emqx_ds_builtin_metrics:inc_egress_batches(Metrics),
|
|
||||||
emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n),
|
|
||||||
emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes),
|
|
||||||
?tp(
|
|
||||||
emqx_ds_replication_layer_egress_flush,
|
|
||||||
#{db => DB, shard => Shard, batch => Messages}
|
|
||||||
),
|
|
||||||
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
|
|
||||||
erlang:garbage_collect(),
|
|
||||||
S#s{
|
|
||||||
n = 0,
|
|
||||||
n_bytes = 0,
|
|
||||||
queue = queue:new(),
|
|
||||||
pending_replies = []
|
|
||||||
};
|
|
||||||
{timeout, ServerId} when Retries < MaxRetries ->
|
|
||||||
%% Note: this is a hot loop, so we report error messages
|
|
||||||
%% with `debug' level to avoid wiping the logs. Instead,
|
|
||||||
%% error the detection must rely on the metrics. Debug
|
|
||||||
%% logging can be enabled for the particular egress server
|
|
||||||
%% via logger domain.
|
|
||||||
?tp(
|
|
||||||
debug,
|
|
||||||
emqx_ds_replication_layer_egress_flush_retry,
|
|
||||||
#{db => DB, shard => Shard, reason => timeout, server_id => ServerId}
|
|
||||||
),
|
|
||||||
%% Retry sending the batch:
|
|
||||||
emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics),
|
|
||||||
erlang:garbage_collect(),
|
|
||||||
%% We block the gen_server until the next retry.
|
|
||||||
BlockTime = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
|
|
||||||
timer:sleep(BlockTime),
|
|
||||||
S#s{n_retries = Retries + 1};
|
|
||||||
Err ->
|
|
||||||
?tp(
|
|
||||||
debug,
|
|
||||||
emqx_ds_replication_layer_egress_flush_failed,
|
|
||||||
#{db => DB, shard => Shard, error => Err}
|
|
||||||
),
|
|
||||||
emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics),
|
|
||||||
Reply =
|
|
||||||
case Err of
|
|
||||||
{error, _, _} -> Err;
|
|
||||||
{timeout, ServerId} -> {error, recoverable, {timeout, ServerId}};
|
|
||||||
_ -> {error, unrecoverable, Err}
|
|
||||||
end,
|
|
||||||
lists:foreach(
|
|
||||||
fun(From) -> gen_server:reply(From, Reply) end, Replies
|
|
||||||
),
|
|
||||||
erlang:garbage_collect(),
|
|
||||||
S#s{
|
|
||||||
n = 0,
|
|
||||||
n_bytes = 0,
|
|
||||||
queue = queue:new(),
|
|
||||||
pending_replies = [],
|
|
||||||
n_retries = 0
|
|
||||||
}
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) ->
|
|
||||||
[{emqx_ds_replication_layer:shard_id(), {NMessages, NBytes}}]
|
|
||||||
when
|
|
||||||
NMessages :: non_neg_integer(),
|
|
||||||
NBytes :: non_neg_integer().
|
|
||||||
shards_of_batch(DB, Messages) ->
|
|
||||||
maps:to_list(
|
|
||||||
lists:foldl(
|
|
||||||
fun(Message, Acc) ->
|
|
||||||
%% TODO: sharding strategy must be part of the DS DB schema:
|
|
||||||
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
|
|
||||||
Size = payload_size(Message),
|
|
||||||
maps:update_with(
|
|
||||||
Shard,
|
|
||||||
fun({N, S}) ->
|
|
||||||
{N + 1, S + Size}
|
|
||||||
end,
|
|
||||||
{1, Size},
|
|
||||||
Acc
|
|
||||||
)
|
|
||||||
end,
|
|
||||||
#{},
|
|
||||||
Messages
|
|
||||||
)
|
|
||||||
).
|
|
||||||
|
|
||||||
repackage_messages(DB, Messages, Sync) ->
|
|
||||||
Batches = lists:foldl(
|
|
||||||
fun(Message, Acc) ->
|
|
||||||
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
|
|
||||||
Size = payload_size(Message),
|
|
||||||
maps:update_with(
|
|
||||||
Shard,
|
|
||||||
fun({N, S, Msgs}) ->
|
|
||||||
{N + 1, S + Size, [Message | Msgs]}
|
|
||||||
end,
|
|
||||||
{1, Size, [Message]},
|
|
||||||
Acc
|
|
||||||
)
|
|
||||||
end,
|
|
||||||
#{},
|
|
||||||
Messages
|
|
||||||
),
|
|
||||||
maps:fold(
|
|
||||||
fun(Shard, {NMsgs, ByteSize, RevMessages}, ErrAcc) ->
|
|
||||||
Err = enqueue_call_or_cast(
|
|
||||||
?via(DB, Shard),
|
|
||||||
#enqueue_req{
|
|
||||||
messages = lists:reverse(RevMessages),
|
|
||||||
sync = Sync,
|
|
||||||
atomic = false,
|
|
||||||
n_messages = NMsgs,
|
|
||||||
payload_bytes = ByteSize
|
|
||||||
}
|
|
||||||
),
|
|
||||||
compose_errors(ErrAcc, Err)
|
|
||||||
end,
|
|
||||||
ok,
|
|
||||||
Batches
|
|
||||||
).
|
|
||||||
|
|
||||||
enqueue_call_or_cast(To, Req = #enqueue_req{sync = true}) ->
|
|
||||||
gen_server:call(To, Req, infinity);
|
|
||||||
enqueue_call_or_cast(To, Req = #enqueue_req{sync = false}) ->
|
|
||||||
gen_server:cast(To, Req).
|
|
||||||
|
|
||||||
compose_errors(ErrAcc, ok) ->
|
|
||||||
ErrAcc;
|
|
||||||
compose_errors(ok, Err) ->
|
|
||||||
Err;
|
|
||||||
compose_errors({error, recoverable, _}, {error, unrecoverable, Err}) ->
|
|
||||||
{error, unrecoverable, Err};
|
|
||||||
compose_errors(ErrAcc, _Err) ->
|
|
||||||
ErrAcc.
|
|
||||||
|
|
||||||
ensure_timer(S = #s{tref = undefined}) ->
|
|
||||||
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
|
|
||||||
Tref = erlang:send_after(Interval, self(), ?flush),
|
|
||||||
S#s{tref = Tref};
|
|
||||||
ensure_timer(S) ->
|
|
||||||
S.
|
|
||||||
|
|
||||||
cancel_timer(S = #s{tref = undefined}) ->
|
|
||||||
S;
|
|
||||||
cancel_timer(S = #s{tref = TRef}) ->
|
|
||||||
_ = erlang:cancel_timer(TRef),
|
|
||||||
S#s{tref = undefined}.
|
|
||||||
|
|
||||||
%% @doc Return approximate size of the MQTT message (it doesn't take
|
|
||||||
%% all things into account, for example headers and extras)
|
|
||||||
payload_size(#message{payload = P, topic = T}) ->
|
|
||||||
size(P) + size(T).
|
|
|
@ -630,8 +630,8 @@ t_error_mapping_replication_layer(_Config) ->
|
||||||
|
|
||||||
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
|
?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)),
|
||||||
|
|
||||||
?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard1}),
|
?block_until(#{?snk_kind := emqx_ds_buffer_flush, shard := Shard1}),
|
||||||
?block_until(#{?snk_kind := emqx_ds_replication_layer_egress_flush, shard := Shard2}),
|
?block_until(#{?snk_kind := emqx_ds_buffer_flush, shard := Shard2}),
|
||||||
|
|
||||||
Streams0 = emqx_ds:get_streams(DB, TopicFilter, 0),
|
Streams0 = emqx_ds:get_streams(DB, TopicFilter, 0),
|
||||||
Iterators0 = lists:map(
|
Iterators0 = lists:map(
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
%% API:
|
%% API:
|
||||||
-export([start_link/4, store_batch/3]).
|
-export([start_link/4, store_batch/3, shard_of_message/3]).
|
||||||
|
|
||||||
%% behavior callbacks:
|
%% behavior callbacks:
|
||||||
-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
|
||||||
|
@ -63,7 +63,7 @@
|
||||||
%% API functions
|
%% API functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
-spec start_link(module(), _CallbackOptions, emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
|
-spec start_link(module(), _CallbackOptions, emqx_ds:db(), _ShardId) ->
|
||||||
{ok, pid()}.
|
{ok, pid()}.
|
||||||
start_link(CallbackModule, CallbackOptions, DB, Shard) ->
|
start_link(CallbackModule, CallbackOptions, DB, Shard) ->
|
||||||
gen_server:start_link(
|
gen_server:start_link(
|
||||||
|
@ -99,6 +99,11 @@ store_batch(DB, Messages, Opts) ->
|
||||||
repackage_messages(DB, Messages, Sync)
|
repackage_messages(DB, Messages, Sync)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> _Shard.
|
||||||
|
shard_of_message(DB, Message, ShardBy) ->
|
||||||
|
{CBM, Options} = persistent_term:get(?cbm(DB)),
|
||||||
|
CBM:shard_of_message(DB, Message, ShardBy, Options).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% behavior callbacks
|
%% behavior callbacks
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
@ -107,7 +112,7 @@ store_batch(DB, Messages, Opts) ->
|
||||||
callback_module :: module(),
|
callback_module :: module(),
|
||||||
callback_state :: term(),
|
callback_state :: term(),
|
||||||
db :: emqx_ds:db(),
|
db :: emqx_ds:db(),
|
||||||
shard :: emqx_ds_replication_layer:shard_id(),
|
shard :: _ShardId,
|
||||||
metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(),
|
metrics_id :: emqx_ds_builtin_metrics:shard_metrics_id(),
|
||||||
n_retries = 0 :: non_neg_integer(),
|
n_retries = 0 :: non_neg_integer(),
|
||||||
%% FIXME: Currently max_retries is always 0, because replication
|
%% FIXME: Currently max_retries is always 0, because replication
|
||||||
|
@ -124,7 +129,7 @@ store_batch(DB, Messages, Opts) ->
|
||||||
init([CBM, CBMOptions, DB, Shard]) ->
|
init([CBM, CBMOptions, DB, Shard]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
process_flag(message_queue_data, off_heap),
|
process_flag(message_queue_data, off_heap),
|
||||||
logger:update_process_metadata(#{domain => [emqx, ds, egress, DB]}),
|
logger:update_process_metadata(#{domain => [emqx, ds, buffer, DB]}),
|
||||||
MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard),
|
MetricsId = emqx_ds_builtin_metrics:shard_metric_id(DB, Shard),
|
||||||
ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId),
|
ok = emqx_ds_builtin_metrics:init_for_shard(MetricsId),
|
||||||
{ok, CallbackS} = CBM:init_buffer(DB, Shard, CBMOptions),
|
{ok, CallbackS} = CBM:init_buffer(DB, Shard, CBMOptions),
|
||||||
|
@ -236,10 +241,6 @@ enqueue(
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
shard_of_message(DB, Message, ShardBy) ->
|
|
||||||
{CBM, Options} = persistent_term:get(?cbm(DB)),
|
|
||||||
CBM:shard_of_message(DB, Message, ShardBy, Options).
|
|
||||||
|
|
||||||
-define(COOLDOWN_MIN, 1000).
|
-define(COOLDOWN_MIN, 1000).
|
||||||
-define(COOLDOWN_MAX, 5000).
|
-define(COOLDOWN_MAX, 5000).
|
||||||
|
|
||||||
|
@ -273,7 +274,7 @@ do_flush(
|
||||||
emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n),
|
emqx_ds_builtin_metrics:inc_egress_messages(Metrics, S#s.n),
|
||||||
emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes),
|
emqx_ds_builtin_metrics:inc_egress_bytes(Metrics, S#s.n_bytes),
|
||||||
?tp(
|
?tp(
|
||||||
emqx_ds_replication_layer_egress_flush,
|
emqx_ds_buffer_flush,
|
||||||
#{db => DB, shard => Shard, batch => Messages}
|
#{db => DB, shard => Shard, batch => Messages}
|
||||||
),
|
),
|
||||||
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
|
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
|
||||||
|
@ -285,7 +286,7 @@ do_flush(
|
||||||
queue = queue:new(),
|
queue = queue:new(),
|
||||||
pending_replies = []
|
pending_replies = []
|
||||||
};
|
};
|
||||||
{timeout, ServerId} when Retries < MaxRetries ->
|
{error, recoverable, Err} when Retries < MaxRetries ->
|
||||||
%% Note: this is a hot loop, so we report error messages
|
%% Note: this is a hot loop, so we report error messages
|
||||||
%% with `debug' level to avoid wiping the logs. Instead,
|
%% with `debug' level to avoid wiping the logs. Instead,
|
||||||
%% error the detection must rely on the metrics. Debug
|
%% error the detection must rely on the metrics. Debug
|
||||||
|
@ -293,8 +294,8 @@ do_flush(
|
||||||
%% via logger domain.
|
%% via logger domain.
|
||||||
?tp(
|
?tp(
|
||||||
debug,
|
debug,
|
||||||
emqx_ds_replication_layer_egress_flush_retry,
|
emqx_ds_buffer_flush_retry,
|
||||||
#{db => DB, shard => Shard, reason => timeout, server_id => ServerId}
|
#{db => DB, shard => Shard, reason => Err}
|
||||||
),
|
),
|
||||||
%% Retry sending the batch:
|
%% Retry sending the batch:
|
||||||
emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics),
|
emqx_ds_builtin_metrics:inc_egress_batches_retry(Metrics),
|
||||||
|
@ -306,7 +307,7 @@ do_flush(
|
||||||
Err ->
|
Err ->
|
||||||
?tp(
|
?tp(
|
||||||
debug,
|
debug,
|
||||||
emqx_ds_replication_layer_egress_flush_failed,
|
emqx_ds_buffer_flush_failed,
|
||||||
#{db => DB, shard => Shard, error => Err}
|
#{db => DB, shard => Shard, error => Err}
|
||||||
),
|
),
|
||||||
emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics),
|
emqx_ds_builtin_metrics:inc_egress_batches_failed(Metrics),
|
||||||
|
@ -330,7 +331,7 @@ do_flush(
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) ->
|
-spec shards_of_batch(emqx_ds:db(), [emqx_types:message()]) ->
|
||||||
[{emqx_ds_replication_layer:shard_id(), {NMessages, NBytes}}]
|
[{_ShardId, {NMessages, NBytes}}]
|
||||||
when
|
when
|
||||||
NMessages :: non_neg_integer(),
|
NMessages :: non_neg_integer(),
|
||||||
NBytes :: non_neg_integer().
|
NBytes :: non_neg_integer().
|
||||||
|
|
|
@ -276,7 +276,7 @@ t_atomic_store_batch(_Config) ->
|
||||||
%% Must contain exactly one flush with all messages.
|
%% Must contain exactly one flush with all messages.
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[#{batch := [_, _, _]}],
|
[#{batch := [_, _, _]}],
|
||||||
?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
|
?of_kind(emqx_ds_buffer_flush, Trace)
|
||||||
),
|
),
|
||||||
ok
|
ok
|
||||||
end
|
end
|
||||||
|
@ -305,7 +305,7 @@ t_non_atomic_store_batch(_Config) ->
|
||||||
end,
|
end,
|
||||||
fun(ExpectedMsgs, Trace) ->
|
fun(ExpectedMsgs, Trace) ->
|
||||||
ProcessedMsgs = lists:append(
|
ProcessedMsgs = lists:append(
|
||||||
?projection(batch, ?of_kind(emqx_ds_replication_layer_egress_flush, Trace))
|
?projection(batch, ?of_kind(emqx_ds_buffer_flush, Trace))
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
ExpectedMsgs,
|
ExpectedMsgs,
|
||||||
|
|
|
@ -310,7 +310,7 @@ nodes_of_clientid(DB, ClientId, Nodes = [N0 | _]) ->
|
||||||
shard_of_clientid(DB, Node, ClientId) ->
|
shard_of_clientid(DB, Node, ClientId) ->
|
||||||
?ON(
|
?ON(
|
||||||
Node,
|
Node,
|
||||||
emqx_ds_replication_layer:shard_of_message(DB, #message{from = ClientId}, clientid)
|
emqx_ds_buffer:shard_of_message(DB, #message{from = ClientId}, clientid)
|
||||||
).
|
).
|
||||||
|
|
||||||
%% Consume eagerly:
|
%% Consume eagerly:
|
||||||
|
|
Loading…
Reference in New Issue