From c43b3eb535395c531a633a06ddd967f440912bf8 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 6 Dec 2023 15:36:51 +0100 Subject: [PATCH 1/2] fix(sessds): Add debug logs for the session garbage collection --- apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl | 2 +- apps/emqx_durable_storage/src/emqx_ds.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl index bf607804f..af387d2ca 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl @@ -82,7 +82,7 @@ try_gc() -> CoreNodes = mria_membership:running_core_nodelist(), Res = global:trans( {?MODULE, self()}, - fun() -> ?tp_span(ds_session_gc, #{}, start_gc()) end, + fun() -> ?tp_span(debug, ds_session_gc, #{}, start_gc()) end, CoreNodes, %% Note: we set retries to 1 here because, in rare occasions, GC might start at the %% same time in more than one node, and each one will abort the other. By allowing diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 649341eb5..6b371cf79 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -195,7 +195,7 @@ store_batch(DB, Msgs) -> %% replay. This function returns stream together with its %% "coordinate": `stream_rank()'. %% -%% Stream rank is a tuple of two integers, let's call them X and Y. If +%% Stream rank is a tuple of two terms, let's call them X and Y. If %% X coordinate of two streams is different, they are independent and %% can be replayed in parallel. If it's the same, then the stream with %% smaller Y coordinate should be replayed first. If Y coordinates are From 371ec349928e510c996bc91d98e86af9c73024e3 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 6 Dec 2023 15:57:56 +0100 Subject: [PATCH 2/2] feat(sessds): Make stream renew interval configurable --- apps/emqx/src/emqx_persistent_session_ds.erl | 3 ++- apps/emqx/src/emqx_schema.erl | 8 ++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 37385c12c..443c6c39d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -429,7 +429,8 @@ handle_timeout( {ok, Publishes, Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) -> renew_streams(Session), - {ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session)}; + Interval = emqx_config:get([session_persistence, renew_streams_interval]), + {ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, Interval, Session)}; handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> %% Note: we take a pessimistic approach here and assume that the client will be alive %% until the next bump timeout. With this, we avoid garbage collecting this session diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index cdb1035df..d915becc0 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1805,6 +1805,14 @@ fields("session_persistence") -> desc => ?DESC(session_ds_last_alive_update_interval) } )}, + {"renew_streams_interval", + sc( + timeout_duration(), + #{ + default => <<"5000ms">>, + importance => ?IMPORTANCE_HIDDEN + } + )}, {"session_gc_interval", sc( timeout_duration(),