diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index bbca13172..b03cfe72e 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2017-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -1915,6 +1915,24 @@ fields("session_storage_backend_builtin") -> default => 3, importance => ?IMPORTANCE_HIDDEN } + )}, + {"egress_batch_size", + sc( + pos_integer(), + #{ + default => 1000, + mapping => "emqx_durable_storage.egress_batch_size", + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"egress_flush_interval", + sc( + timeout_duration_ms(), + #{ + default => 100, + mapping => "emqx_durable_storage.egress_flush_interval", + importance => ?IMPORTANCE_HIDDEN + } )} ]. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 3b264d9d1..842e8e5ed 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -141,7 +141,7 @@ do_flush( }. do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) -> - NMax = 1000, + NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000), S1 = S0#s{n = N + 1, batch = [Msg | Batch]}, S2 = case N >= NMax of @@ -171,5 +171,5 @@ do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Repl {noreply, S}. start_timer() -> - Interval = 10, + Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), erlang:send_after(Interval, self(), flush).