diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index a8ae4454d..ac22e1c48 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -196,13 +196,16 @@ init({Id, Index, Opts}) -> InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightTID = inflight_new(InflightWinSize, Id, Index), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), + RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), + BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), + BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), Data = #{ id => Id, index => Index, inflight_tid => InflightTID, async_workers => #{}, batch_size => BatchSize, - batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), + batch_time => BatchTime, queue => Queue, resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), tref => undefined @@ -1639,3 +1642,46 @@ do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query. -else. do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt). -endif. + +%% To avoid message loss due to misconfigurations, we adjust +%% `batch_time' based on `request_timeout'. If `batch_time' > +%% `request_timeout', all requests will timeout before being sent if +%% the message rate is low. Even worse if `pool_size' is high. +%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb. +adjust_batch_time(Id, RequestTimeout, BatchTime0) -> + BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)), + case BatchTime =:= BatchTime0 of + false -> + ?SLOG(info, #{ + id => Id, + msg => adjusting_buffer_worker_batch_time, + new_batch_time => BatchTime + }); + true -> + ok + end, + BatchTime. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +adjust_batch_time_test_() -> + %% just for logging + Id = some_id, + [ + {"batch time smaller than request_time/2", + ?_assertEqual( + 100, + adjust_batch_time(Id, 500, 100) + )}, + {"batch time equal to request_time/2", + ?_assertEqual( + 100, + adjust_batch_time(Id, 200, 100) + )}, + {"batch time greater than request_time/2", + ?_assertEqual( + 50, + adjust_batch_time(Id, 100, 100) + )} + ]. +-endif.