Merge pull request #12119 from ieQu1/EMQX-10342
Make stream renew timer configurable
This commit is contained in:
commit
9b612cb0e7
|
@ -429,7 +429,8 @@ handle_timeout(
|
||||||
{ok, Publishes, Session};
|
{ok, Publishes, Session};
|
||||||
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) ->
|
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session) ->
|
||||||
renew_streams(Session),
|
renew_streams(Session),
|
||||||
{ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, 100, Session)};
|
Interval = emqx_config:get([session_persistence, renew_streams_interval]),
|
||||||
|
{ok, [], emqx_session:ensure_timer(?TIMER_GET_STREAMS, Interval, Session)};
|
||||||
handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) ->
|
handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) ->
|
||||||
%% Note: we take a pessimistic approach here and assume that the client will be alive
|
%% Note: we take a pessimistic approach here and assume that the client will be alive
|
||||||
%% until the next bump timeout. With this, we avoid garbage collecting this session
|
%% until the next bump timeout. With this, we avoid garbage collecting this session
|
||||||
|
|
|
@ -82,7 +82,7 @@ try_gc() ->
|
||||||
CoreNodes = mria_membership:running_core_nodelist(),
|
CoreNodes = mria_membership:running_core_nodelist(),
|
||||||
Res = global:trans(
|
Res = global:trans(
|
||||||
{?MODULE, self()},
|
{?MODULE, self()},
|
||||||
fun() -> ?tp_span(ds_session_gc, #{}, start_gc()) end,
|
fun() -> ?tp_span(debug, ds_session_gc, #{}, start_gc()) end,
|
||||||
CoreNodes,
|
CoreNodes,
|
||||||
%% Note: we set retries to 1 here because, in rare occasions, GC might start at the
|
%% Note: we set retries to 1 here because, in rare occasions, GC might start at the
|
||||||
%% same time in more than one node, and each one will abort the other. By allowing
|
%% same time in more than one node, and each one will abort the other. By allowing
|
||||||
|
|
|
@ -1805,6 +1805,14 @@ fields("session_persistence") ->
|
||||||
desc => ?DESC(session_ds_last_alive_update_interval)
|
desc => ?DESC(session_ds_last_alive_update_interval)
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
|
{"renew_streams_interval",
|
||||||
|
sc(
|
||||||
|
timeout_duration(),
|
||||||
|
#{
|
||||||
|
default => <<"5000ms">>,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
}
|
||||||
|
)},
|
||||||
{"session_gc_interval",
|
{"session_gc_interval",
|
||||||
sc(
|
sc(
|
||||||
timeout_duration(),
|
timeout_duration(),
|
||||||
|
|
|
@ -195,7 +195,7 @@ store_batch(DB, Msgs) ->
|
||||||
%% replay. This function returns stream together with its
|
%% replay. This function returns stream together with its
|
||||||
%% "coordinate": `stream_rank()'.
|
%% "coordinate": `stream_rank()'.
|
||||||
%%
|
%%
|
||||||
%% Stream rank is a tuple of two integers, let's call them X and Y. If
|
%% Stream rank is a tuple of two terms, let's call them X and Y. If
|
||||||
%% X coordinate of two streams is different, they are independent and
|
%% X coordinate of two streams is different, they are independent and
|
||||||
%% can be replayed in parallel. If it's the same, then the stream with
|
%% can be replayed in parallel. If it's the same, then the stream with
|
||||||
%% smaller Y coordinate should be replayed first. If Y coordinates are
|
%% smaller Y coordinate should be replayed first. If Y coordinates are
|
||||||
|
|
Loading…
Reference in New Issue