feat(sessds): Make batch size configurable

This change affects flow control. It allows to configure maximum size
of a batch, as well as fetch threshold.
This commit is contained in:
ieQu1 2023-12-01 03:29:41 +01:00
parent 4717e56fb6
commit 0e625d814a
4 changed files with 33 additions and 2 deletions

View File

@ -169,7 +169,8 @@ commit_offset(
-spec poll(reply_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) -> -spec poll(reply_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) ->
{emqx_session:replies(), inflight()}. {emqx_session:replies(), inflight()}.
poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE -> poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE ->
FetchThreshold = max(1, WindowSize div 2), MinBatchSize = emqx_config:get([session_persistence, min_batch_size]),
FetchThreshold = min(MinBatchSize, ceil(WindowSize / 2)),
FreeSpace = WindowSize - n_inflight(Inflight0), FreeSpace = WindowSize - n_inflight(Inflight0),
case FreeSpace >= FetchThreshold of case FreeSpace >= FetchThreshold of
false -> false ->

View File

@ -407,6 +407,8 @@ handle_timeout(
?TIMER_PULL, ?TIMER_PULL,
Session0 = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} Session0 = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum}
) -> ) ->
MaxBatchSize = emqx_config:get([session_persistence, max_batch_size]),
BatchSize = min(ReceiveMaximum, MaxBatchSize),
{Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(
fun fun
(_Seqno, Message = #message{qos = ?QOS_0}) -> (_Seqno, Message = #message{qos = ?QOS_0}) ->
@ -417,7 +419,7 @@ handle_timeout(
end, end,
Id, Id,
Inflight0, Inflight0,
ReceiveMaximum BatchSize
), ),
IdlePollInterval = emqx_config:get([session_persistence, idle_poll_interval]), IdlePollInterval = emqx_config:get([session_persistence, idle_poll_interval]),
Timeout = Timeout =

View File

@ -1773,6 +1773,22 @@ fields("session_persistence") ->
} }
} }
)}, )},
{"max_batch_size",
sc(
pos_integer(),
#{
default => 1000,
desc => ?DESC(session_ds_max_batch_size)
}
)},
{"min_batch_size",
sc(
pos_integer(),
#{
default => 100,
desc => ?DESC(session_ds_min_batch_size)
}
)},
{"idle_poll_interval", {"idle_poll_interval",
sc( sc(
timeout_duration(), timeout_duration(),

View File

@ -1577,4 +1577,16 @@ session_ds_session_gc_interval.desc:
session_ds_session_gc_batch_size.desc: session_ds_session_gc_batch_size.desc:
"""The size of each batch of expired persistent sessions to be garbage collected per iteration.""" """The size of each batch of expired persistent sessions to be garbage collected per iteration."""
session_ds_max_batch_size.desc:
"""This value affects the flow control for the persistent sessions.
The session queries the DB for the new messages in batches.
Size of the batch doesn't exceed this value or `RecieveMaximum`, whichever is smaller."""
session_ds_min_batch_size.desc:
"""This value affects the flow control for the persistent sessions.
The session will query the DB for the new messages when the value of `FreeSpace` variable is larger than this value or `ReceiveMaximum` / 2, whichever is smaller.
FreeSpace is calculated as `ReceiveMaximum` for the session - number of inflight messages."""
} }