From f611cbab45245e1756fc000a11ceecacae5a649a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 19 Dec 2022 23:16:05 +0100 Subject: [PATCH] chore: cap replayq seg size under total size --- apps/emqx_resource/src/emqx_resource_worker.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 6722e1b43..482c82f6a 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -117,13 +117,16 @@ init({Id, Index, Opts}) -> true = gproc_pool:connect_worker(Id, {Id, Index}), Name = name(Id, Index), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), + SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE), + TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), + SegBytes = min(SegBytes0, TotalBytes), Queue = case maps:get(enable_queue, Opts, false) of true -> replayq:open(#{ dir => disk_queue_dir(Id, Index), - seg_bytes => maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE), - max_total_bytes => maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), + seg_bytes => SegBytes, + max_total_bytes => TotalBytes, sizer => fun ?MODULE:estimate_size/1, marshaller => fun ?MODULE:queue_item_marshaller/1 });