fix(sessds): Replace min- and max- batch size with batch_size
This commit is contained in:
parent
94b0ab983d
commit
91ddbbcc3f
|
@ -733,7 +733,7 @@ fetch_new_messages(Session = #{s := S}, ClientInfo) ->
|
||||||
fetch_new_messages([], Session, _ClientInfo) ->
|
fetch_new_messages([], Session, _ClientInfo) ->
|
||||||
Session;
|
Session;
|
||||||
fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) ->
|
fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) ->
|
||||||
BatchSize = emqx_config:get([session_persistence, max_batch_size]),
|
BatchSize = emqx_config:get([session_persistence, batch_size]),
|
||||||
case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of
|
case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of
|
||||||
true ->
|
true ->
|
||||||
%% Buffer is full:
|
%% Buffer is full:
|
||||||
|
|
|
@ -258,7 +258,10 @@ roots(medium) ->
|
||||||
{"durable_storage",
|
{"durable_storage",
|
||||||
sc(
|
sc(
|
||||||
ref("durable_storage"),
|
ref("durable_storage"),
|
||||||
#{importance => ?IMPORTANCE_MEDIUM}
|
#{
|
||||||
|
importance => ?IMPORTANCE_MEDIUM,
|
||||||
|
desc => ?DESC(durable_storage)
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
roots(low) ->
|
roots(low) ->
|
||||||
|
@ -1659,20 +1662,33 @@ fields("session_persistence") ->
|
||||||
default => false
|
default => false
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
{"batch_size",
|
||||||
|
sc(
|
||||||
|
pos_integer(),
|
||||||
|
#{
|
||||||
|
default => 100,
|
||||||
|
desc => ?DESC(session_ds_batch_size),
|
||||||
|
importance => ?IMPORTANCE_MEDIUM
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
%% Deprecated, now the replayer always use constant batch size:
|
||||||
{"max_batch_size",
|
{"max_batch_size",
|
||||||
sc(
|
sc(
|
||||||
pos_integer(),
|
pos_integer(),
|
||||||
#{
|
#{
|
||||||
default => 100,
|
default => 100,
|
||||||
desc => ?DESC(session_ds_max_batch_size)
|
desc => ?DESC(session_ds_max_batch_size),
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
%% Deprecated, now the replayer always use constant batch size:
|
||||||
{"min_batch_size",
|
{"min_batch_size",
|
||||||
sc(
|
sc(
|
||||||
pos_integer(),
|
pos_integer(),
|
||||||
#{
|
#{
|
||||||
default => 100,
|
default => 100,
|
||||||
desc => ?DESC(session_ds_min_batch_size)
|
desc => ?DESC(session_ds_min_batch_size),
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{"idle_poll_interval",
|
{"idle_poll_interval",
|
||||||
|
|
|
@ -1531,29 +1531,46 @@ resource_tags.label:
|
||||||
resource_tags.desc:
|
resource_tags.desc:
|
||||||
"""Tags to annotate this config entry."""
|
"""Tags to annotate this config entry."""
|
||||||
|
|
||||||
|
session_persistence_enable.label:
|
||||||
|
"""Enable session persistence"""
|
||||||
|
|
||||||
session_persistence_enable.desc:
|
session_persistence_enable.desc:
|
||||||
"""Use durable storage for client sessions persistence.
|
"""Use durable storage for client sessions persistence.
|
||||||
If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime."""
|
If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime.
|
||||||
|
|
||||||
|
:::warning
|
||||||
|
This feature is currently experimental. Please don't enable it in the producation environments that contain valuable data.
|
||||||
|
:::"""
|
||||||
|
|
||||||
|
|
||||||
|
session_ds_session_gc_interval.label:
|
||||||
|
"""Session garbage collection interval"""
|
||||||
|
|
||||||
session_ds_session_gc_interval.desc:
|
session_ds_session_gc_interval.desc:
|
||||||
"""The interval at which session garbage collection is executed for persistent sessions."""
|
"""The interval at which session garbage collection is executed for persistent sessions."""
|
||||||
|
|
||||||
|
session_ds_session_gc_batch_size.label:
|
||||||
|
"""Session garbage collection batch size"""
|
||||||
|
|
||||||
session_ds_session_gc_batch_size.desc:
|
session_ds_session_gc_batch_size.desc:
|
||||||
"""The size of each batch of expired persistent sessions to be garbage collected per iteration."""
|
"""The size of each batch of expired persistent sessions to be garbage collected per iteration."""
|
||||||
|
|
||||||
session_ds_max_batch_size.desc:
|
session_ds_batch_size.label:
|
||||||
|
"""Batch size"""
|
||||||
|
|
||||||
|
session_ds_batch_size.desc:
|
||||||
"""This value affects the flow control for the persistent sessions.
|
"""This value affects the flow control for the persistent sessions.
|
||||||
The session queries the DB for the new messages in batches.
|
Persistent session queries the durable message storage in batches.
|
||||||
Size of the batch doesn't exceed this value or `ReceiveMaximum`, whichever is smaller."""
|
This value specifies size of the batch.
|
||||||
|
|
||||||
session_ds_min_batch_size.desc:
|
Note: larger batches generally improve the throughput and overall performance of the system, but increase RAM usage per client."""
|
||||||
"""This value affects the flow control for the persistent sessions.
|
|
||||||
The session will query the DB for the new messages when the value of `FreeSpace` variable is larger than this value or `ReceiveMaximum` / 2, whichever is smaller.
|
|
||||||
|
|
||||||
`FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages."""
|
durable_storage.label:
|
||||||
|
"""Durable storage"""
|
||||||
|
|
||||||
session_ds_message_retention_period.desc:
|
durable_storage.desc:
|
||||||
"""The minimum amount of time that messages should be retained for. After messages have been in storage for at least this period of time, they'll be dropped."""
|
"""Configuration related to the EMQX durable storages.
|
||||||
|
|
||||||
|
EMQX uses durable storages to offload various data, such as MQTT messages, to disc."""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue