wip(sessds): taint streams temporarily during failures

This commit is contained in:
Andrew Mayorov 2024-03-26 16:43:06 +01:00
parent 0e36f7afa4
commit 663ea69574
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 25 additions and 3 deletions

View File

@ -136,6 +136,8 @@
%% TODO: Needs configuration?
-define(TIMEOUT_RETRY_REPLAY, 1000).
-define(TIMEOUT_STREAM_TAINT, 1000).
-type session() :: #{
%% Client ID
id := id(),
@ -502,7 +504,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
{Publishes, Session1} =
case ?IS_REPLAY_ONGOING(Session0) of
false ->
drain_buffer(fetch_new_messages(Session0, ClientInfo));
drain_buffer(clear_stream_taints(fetch_new_messages(Session0, ClientInfo)));
true ->
{[], Session0}
end,
@ -511,7 +513,7 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
[] ->
get_config(ClientInfo, [idle_poll_interval]);
[_ | _] ->
0
get_config(ClientInfo, [idle_poll_interval]) div length(Publishes)
end,
Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
{ok, Publishes, Session};
@ -837,7 +839,8 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
last_seqno_qos1 = SN1,
last_seqno_qos2 = SN2
},
case enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of
Tainted = has_stream_taint(StreamKey, Session0),
case Tainted orelse enqueue_batch(false, BatchSize, Srs1, Session0, ClientInfo) of
{ok, Srs, Session} ->
S1 = emqx_persistent_session_ds_state:put_seqno(
?next(?QOS_1),
@ -859,6 +862,9 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
reason => Reason,
class => Class
}),
taint_stream(StreamKey, Session0);
Tainted = true ->
%% Stream is tainted, skip for now.
Session0
end.
@ -980,6 +986,22 @@ process_batch(
IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight
).
taint_stream(StreamKey, Session) ->
Taints = maps:get(taints, Session, #{}),
Until = now_ms() + ?TIMEOUT_STREAM_TAINT,
Session#{taints => Taints#{StreamKey => Until}}.
has_stream_taint(StreamKey, Session) ->
Taints = maps:get(taints, Session, #{}),
maps:is_key(StreamKey, Taints).
clear_stream_taints(Session = #{taints := Taints0}) ->
Now = now_ms(),
Taints = maps:filter(fun(_StreamKey, Until) -> Until > Now end, Taints0),
Session#{taints := Taints};
clear_stream_taints(Session = #{}) ->
Session.
%%--------------------------------------------------------------------
%% Transient messages
%%--------------------------------------------------------------------