feat(ds): Make egress batching configurable

This commit is contained in:
ieQu1 2024-01-23 21:29:37 +01:00
parent 137535a821
commit eee221f1d0
2 changed files with 21 additions and 3 deletions

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -1915,6 +1915,24 @@ fields("session_storage_backend_builtin") ->
default => 3, default => 3,
importance => ?IMPORTANCE_HIDDEN importance => ?IMPORTANCE_HIDDEN
} }
)},
{"egress_batch_size",
sc(
pos_integer(),
#{
default => 1000,
mapping => "emqx_durable_storage.egress_batch_size",
importance => ?IMPORTANCE_HIDDEN
}
)},
{"egress_flush_interval",
sc(
timeout_duration_ms(),
#{
default => 100,
mapping => "emqx_durable_storage.egress_flush_interval",
importance => ?IMPORTANCE_HIDDEN
}
)} )}
]. ].

View File

@ -141,7 +141,7 @@ do_flush(
}. }.
do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
NMax = 1000, NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
S1 = S0#s{n = N + 1, batch = [Msg | Batch]}, S1 = S0#s{n = N + 1, batch = [Msg | Batch]},
S2 = S2 =
case N >= NMax of case N >= NMax of
@ -171,5 +171,5 @@ do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Repl
{noreply, S}. {noreply, S}.
start_timer() -> start_timer() ->
Interval = 10, Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100),
erlang:send_after(Interval, self(), flush). erlang:send_after(Interval, self(), flush).