From 30eb54e86b3a2e4233e68a46c71bb236e8684237 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 5 Feb 2024 19:30:12 +0100 Subject: [PATCH] fix(sessds): Delay unsubscribe until full ack of in-flight messages --- apps/emqx/src/emqx_persistent_session_ds.hrl | 5 +- ...persistent_session_ds_stream_scheduler.erl | 70 ++++++++++++++++--- 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index fa4bfacf1..8a24be31e 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -62,7 +62,10 @@ first_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(), %% Sequence numbers that have to be committed for the batch: last_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(), - last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno() + last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(), + %% This stream belongs to an unsubscribed topic-filter, and is + %% marked for deletion: + unsubscribed = false :: boolean() }). %% Session metadata keys: diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 6aa4ab005..745a3f948 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -93,7 +93,7 @@ find_new_streams(S) -> (_Key, #srs{it_end = end_of_stream}, Acc) -> Acc; (Key, Stream, Acc) -> - case is_fully_acked(Comm1, Comm2, Stream) of + case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of true -> [{Key, Stream} | Acc]; false -> @@ -124,25 +124,26 @@ find_new_streams(S) -> %% This way, messages from the same topic/shard are never reordered. -spec renew_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). renew_streams(S0) -> - S1 = remove_fully_replayed_streams(S0), + S1 = remove_unsubscribed_streams(S0), + S2 = remove_fully_replayed_streams(S1), emqx_topic_gbt:fold( - fun(Key, _Subscription = #{start_time := StartTime, id := SubId}, S2) -> + fun(Key, _Subscription = #{start_time := StartTime, id := SubId}, Acc) -> TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)), Streams = select_streams( SubId, emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), - S2 + Acc ), lists:foldl( - fun(I, Acc) -> - ensure_iterator(TopicFilter, StartTime, SubId, I, Acc) + fun(I, Acc1) -> + ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1) end, - S2, + Acc, Streams ) end, - S1, - emqx_persistent_session_ds_state:get_subscriptions(S1) + S2, + emqx_persistent_session_ds_state:get_subscriptions(S2) ). -spec del_subscription( @@ -150,11 +151,33 @@ renew_streams(S0) -> ) -> emqx_persistent_session_ds_state:t(). del_subscription(SubId, S0) -> + %% NOTE: this function only marks the streams for deletion, + %% instead of outright deleting them. + %% + %% It's done for two reasons: + %% + %% - MQTT standard states that the broker MUST process acks for + %% all sent messages, and it MAY keep on sending buffered + %% messages: + %% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901186 + %% + %% - Deleting the streams may lead to gaps in the sequence number + %% series, and lead to problems with acknowledgement tracking, we + %% avoid that by delaying the deletion. + %% + %% When the stream is marked for deletion, the session won't fetch + %% _new_ batches from it. Actual deletion is done by + %% `renew_streams', when it detects that all in-flight messages + %% from the stream have been acked by the client. emqx_persistent_session_ds_state:fold_streams( - fun(Key, _, Acc) -> + fun(Key, ReplayState, Acc) -> case Key of {SubId, _Stream} -> - emqx_persistent_session_ds_state:del_stream(Key, Acc); + %% This stream belongs to a deleted subscription. + %% Mark for deletion: + emqx_persistent_session_ds_state:put_stream( + Key, ReplayState#srs{unsubscribed = true}, Acc + ); _ -> Acc end @@ -184,6 +207,10 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> it_end = Iterator }, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); + SRS = #srs{unsubscribed = true} -> + %% The session resubscribed to the stream after + %% unsubscribing. Spare the stream: + emqx_persistent_session_ds_state:put_stream(Key, SRS#srs{unsubscribed = false}, S); #srs{} -> S end. @@ -222,6 +249,27 @@ select_streams(SubId, RankX, Streams0, S) -> lists:takewhile(fun({{_, Y}, _}) -> Y =:= MinRankY end, Streams) end. +%% @doc Remove fully acked streams for the deleted subscriptions. +-spec remove_unsubscribed_streams(emqx_persistent_session_ds_state:t()) -> + emqx_persistent_session_ds_state:t(). +remove_unsubscribed_streams(S0) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0), + emqx_persistent_session_ds_state:fold_streams( + fun(Key, ReplayState, S1) -> + case + ReplayState#srs.unsubscribed andalso is_fully_acked(CommQos1, CommQos2, ReplayState) + of + true -> + emqx_persistent_session_ds_state:del_stream(Key, S1); + false -> + S1 + end + end, + S0, + S0 + ). + %% @doc Advance RankY for each RankX that doesn't have any unreplayed %% streams. %%