fix(ds): Don't reverse entries in the atomic batch

This commit is contained in:
ieQu1 2024-03-25 20:58:24 +01:00
parent 606f2a88cd
commit 044f3d4ef5
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
1 changed files with 27 additions and 27 deletions

View File

@ -130,7 +130,7 @@ store_batch(DB, Messages, Opts) ->
n = 0 :: non_neg_integer(), n = 0 :: non_neg_integer(),
n_bytes = 0 :: non_neg_integer(), n_bytes = 0 :: non_neg_integer(),
tref :: reference(), tref :: reference(),
batch = [] :: [emqx_types:message()], queue :: queue:queue(emqx_types:message()),
pending_replies = [] :: [gen_server:from()] pending_replies = [] :: [gen_server:from()]
}). }).
@ -143,7 +143,8 @@ init([DB, Shard]) ->
db = DB, db = DB,
shard = Shard, shard = Shard,
metrics_id = MetricsId, metrics_id = MetricsId,
tref = start_timer() tref = start_timer(),
queue = queue:new()
}, },
{ok, S}. {ok, S}.
@ -151,7 +152,7 @@ handle_call(#enqueue_req{message = Msg, sync = Sync, payload_bytes = NBytes}, Fr
do_enqueue(From, Sync, Msg, NBytes, S); do_enqueue(From, Sync, Msg, NBytes, S);
handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync, payload_bytes = NBytes}, From, S) -> handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync, payload_bytes = NBytes}, From, S) ->
Len = length(Batch), Len = length(Batch),
do_enqueue(From, Sync, {atomic, Len, NBytes, Batch}, NBytes, S); do_enqueue(From, Sync, {atomic, Len, Batch}, NBytes, S);
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}. {reply, {error, unknown_call}, S}.
@ -177,63 +178,59 @@ terminate(_Reason, _S) ->
-define(COOLDOWN_MIN, 1000). -define(COOLDOWN_MIN, 1000).
-define(COOLDOWN_MAX, 5000). -define(COOLDOWN_MAX, 5000).
do_flush(S = #s{batch = []}) ->
S#s{tref = start_timer()};
do_flush( do_flush(
S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} S = #s{queue = Q, pending_replies = Replies, db = DB, shard = Shard}
) -> ) ->
case emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)) of Messages = queue:to_list(Q),
case emqx_ds_replication_layer:ra_store_batch(DB, Shard, Messages) of
ok -> ok ->
emqx_ds_builtin_metrics:inc_egress_batches(S#s.metrics_id), emqx_ds_builtin_metrics:inc_egress_batches(S#s.metrics_id),
emqx_ds_builtin_metrics:inc_egress_messages(S#s.metrics_id, S#s.n), emqx_ds_builtin_metrics:inc_egress_messages(S#s.metrics_id, S#s.n),
emqx_ds_builtin_metrics:inc_egress_bytes(S#s.metrics_id, S#s.n_bytes), emqx_ds_builtin_metrics:inc_egress_bytes(S#s.metrics_id, S#s.n_bytes),
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
true = erlang:garbage_collect(),
?tp( ?tp(
emqx_ds_replication_layer_egress_flush, emqx_ds_replication_layer_egress_flush,
#{db => DB, shard => Shard, batch => Messages} #{db => DB, shard => Shard, batch => Messages}
), ),
lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies), lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
true = erlang:garbage_collect(), erlang:garbage_collect(),
ok; S#s{
n = 0,
n_bytes = 0,
queue = queue:new(),
pending_replies = [],
tref = start_timer()
};
Error -> Error ->
emqx_ds_builtin_metrics:inc_egress_batches_retry(S#s.metrics_id), emqx_ds_builtin_metrics:inc_egress_batches_retry(S#s.metrics_id),
true = erlang:garbage_collect(), erlang:garbage_collect(),
?tp( ?tp(
warning, warning,
emqx_ds_replication_layer_egress_flush_failed, emqx_ds_replication_layer_egress_flush_failed,
#{db => DB, shard => Shard, reason => Error} #{db => DB, shard => Shard, reason => Error}
), ),
Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN), Cooldown = ?COOLDOWN_MIN + rand:uniform(?COOLDOWN_MAX - ?COOLDOWN_MIN),
ok = timer:sleep(Cooldown), S#s{
%% Since we drop the entire batch here, we at least reply callers with an tref = start_timer(Cooldown)
%% error so they don't hang indefinitely in the `gen_server' call with }
%% `infinity' timeout. end.
lists:foreach(fun(From) -> gen_server:reply(From, {error, Error}) end, Replies)
end,
S#s{
n = 0,
n_bytes = 0,
batch = [],
pending_replies = [],
tref = start_timer()
}.
do_enqueue( do_enqueue(
From, From,
Sync, Sync,
MsgOrBatch, MsgOrBatch,
BatchBytes, BatchBytes,
S0 = #s{n = N, n_bytes = NBytes0, batch = Batch, pending_replies = Replies} S0 = #s{n = N, n_bytes = NBytes0, queue = Q0, pending_replies = Replies}
) -> ) ->
NBytes = NBytes0 + BatchBytes, NBytes = NBytes0 + BatchBytes,
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
S1 = S1 =
case MsgOrBatch of case MsgOrBatch of
{atomic, NumMsgs, Msgs} -> {atomic, NumMsgs, Msgs} ->
S0#s{n = N + NumMsgs, n_bytes = NBytes, batch = Msgs ++ Batch}; Q = lists:foldl(fun queue:in/2, Q0, Msgs),
S0#s{n = N + NumMsgs, n_bytes = NBytes, queue = Q};
Msg -> Msg ->
S0#s{n = N + 1, n_bytes = NBytes, batch = [Msg | Batch]} S0#s{n = N + 1, n_bytes = NBytes, queue = queue:in(Msg, Q0)}
end, end,
%% TODO: later we may want to delay the reply until the message is %% TODO: later we may want to delay the reply until the message is
%% replicated, but it requies changes to the PUBACK/PUBREC flow to %% replicated, but it requies changes to the PUBACK/PUBREC flow to
@ -264,6 +261,9 @@ do_enqueue(
start_timer() -> start_timer() ->
Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
start_timer(Interval).
start_timer(Interval) ->
erlang:send_after(Interval, self(), ?flush). erlang:send_after(Interval, self(), ?flush).
%% @doc Return approximate size of the MQTT message (it doesn't take %% @doc Return approximate size of the MQTT message (it doesn't take