From cf4a46a78b64df9c05ddc7f235ffea2938e865f9 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 9 Nov 2023 14:22:32 +0100 Subject: [PATCH] fix(ds): Speed up polling for the new messages Poll immediately if the previous poll returned non-empty result --- .../src/emqx_persistent_message_ds_replayer.erl | 4 ++++ apps/emqx/src/emqx_persistent_session_ds.erl | 16 ++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index d137891a2..3964ee4e3 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -28,6 +28,10 @@ -include("emqx_persistent_session_ds.hrl"). +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + %%================================================================================ %% Type declarations %%================================================================================ diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index f3027f500..bf16567fd 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -344,7 +344,15 @@ deliver(_ClientInfo, _Delivers, Session) -> handle_timeout(_ClientInfo, pull, Session = #{id := Id, inflight := Inflight0}) -> WindowSize = 100, {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(Id, Inflight0, WindowSize), - ensure_timer(pull), + %% TODO: make these values configurable: + Timeout = + case Publishes of + [] -> + 100; + [_ | _] -> + 0 + end, + ensure_timer(pull, Timeout), {ok, Publishes, Session#{inflight => Inflight}}; handle_timeout(_ClientInfo, get_streams, Session = #{id := Id}) -> renew_streams(Id), @@ -714,5 +722,9 @@ ensure_timers() -> -spec ensure_timer(pull | get_streams) -> ok. ensure_timer(Type) -> - _ = emqx_utils:start_timer(100, {emqx_session, Type}), + ensure_timer(Type, 100). + +-spec ensure_timer(pull | get_streams, non_neg_integer()) -> ok. +ensure_timer(Type, Timeout) -> + _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}), ok.