diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index fb8170904..2bd312561 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -169,7 +169,8 @@ commit_offset( -spec poll(reply_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) -> {emqx_session:replies(), inflight()}. poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE -> - FetchThreshold = max(1, WindowSize div 2), + MinBatchSize = emqx_config:get([session_persistence, min_batch_size]), + FetchThreshold = min(MinBatchSize, ceil(WindowSize / 2)), FreeSpace = WindowSize - n_inflight(Inflight0), case FreeSpace >= FetchThreshold of false -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 11801b098..b50ac8c64 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -407,6 +407,8 @@ handle_timeout( ?TIMER_PULL, Session0 = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} ) -> + MaxBatchSize = emqx_config:get([session_persistence, max_batch_size]), + BatchSize = min(ReceiveMaximum, MaxBatchSize), {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( fun (_Seqno, Message = #message{qos = ?QOS_0}) -> @@ -417,7 +419,7 @@ handle_timeout( end, Id, Inflight0, - ReceiveMaximum + BatchSize ), IdlePollInterval = emqx_config:get([session_persistence, idle_poll_interval]), Timeout = diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index f46387d3b..cdb1035df 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1773,6 +1773,22 @@ fields("session_persistence") -> } } )}, + {"max_batch_size", + sc( + pos_integer(), + #{ + default => 1000, + desc => ?DESC(session_ds_max_batch_size) + } + )}, + {"min_batch_size", + sc( + pos_integer(), + #{ + default => 100, + desc => ?DESC(session_ds_min_batch_size) + } + )}, {"idle_poll_interval", sc( timeout_duration(), diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 2a6fb03ba..96c9c5824 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1577,4 +1577,16 @@ session_ds_session_gc_interval.desc: session_ds_session_gc_batch_size.desc: """The size of each batch of expired persistent sessions to be garbage collected per iteration.""" +session_ds_max_batch_size.desc: +"""This value affects the flow control for the persistent sessions. +The session queries the DB for the new messages in batches. +Size of the batch doesn't exceed this value or `RecieveMaximum`, whichever is smaller.""" + +session_ds_min_batch_size.desc: +"""This value affects the flow control for the persistent sessions. +The session will query the DB for the new messages when the value of `FreeSpace` variable is larger than this value or `ReceiveMaximum` / 2, whichever is smaller. + +FreeSpace is calculated as `ReceiveMaximum` for the session - number of inflight messages.""" + + }