From 663ea695742a7701becf352eb7d7ff0850d17e6d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 26 Mar 2024 16:43:06 +0100 Subject: [PATCH] wip(sessds): taint streams temporarily during failures --- apps/emqx/src/emqx_persistent_session_ds.erl | 28 +++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index cc861f71f..1c727f6c9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -136,6 +136,8 @@ %% TODO: Needs configuration? -define(TIMEOUT_RETRY_REPLAY, 1000). +-define(TIMEOUT_STREAM_TAINT, 1000). + -type session() :: #{ %% Client ID id := id(), @@ -502,7 +504,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) -> {Publishes, Session1} = case ?IS_REPLAY_ONGOING(Session0) of false -> - drain_buffer(fetch_new_messages(Session0, ClientInfo)); + drain_buffer(clear_stream_taints(fetch_new_messages(Session0, ClientInfo))); true -> {[], Session0} end, @@ -511,7 +513,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) -> [] -> get_config(ClientInfo, [idle_poll_interval]); [_ | _] -> - 0 + get_config(ClientInfo, [idle_poll_interval]) div length(Publishes) end, Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; @@ -837,7 +839,8 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> last_seqno_qos1 = SN1, last_seqno_qos2 = SN2 }, - case enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of + Tainted = has_stream_taint(StreamKey, Session0), + case Tainted orelse enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of {ok, Srs, Session} -> S1 = emqx_persistent_session_ds_state:put_seqno( ?next(?QOS_1), @@ -859,6 +862,9 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> reason => Reason, class => Class }), + taint_stream(StreamKey, Session0); + Tainted = true -> + %% Stream is tainted, skip for now. Session0 end. @@ -980,6 +986,22 @@ process_batch( IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight ). +taint_stream(StreamKey, Session) -> + Taints = maps:get(taints, Session, #{}), + Until = now_ms() + ?TIMEOUT_STREAM_TAINT, + Session#{taints => Taints#{StreamKey => Until}}. + +has_stream_taint(StreamKey, Session) -> + Taints = maps:get(taints, Session, #{}), + maps:is_key(StreamKey, Taints). + +clear_stream_taints(Session = #{taints := Taints0}) -> + Now = now_ms(), + Taints = maps:filter(fun(_StreamKey, Until) -> Until > Now end, Taints0), + Session#{taints := Taints}; +clear_stream_taints(Session = #{}) -> + Session. + %%-------------------------------------------------------------------- %% Transient messages %%--------------------------------------------------------------------