From a9c55f7568b37577928be7fbd12422cce47ab6c4 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 6 Feb 2024 01:46:11 +0100 Subject: [PATCH 1/4] feat(sessds): Consider #srs with only QoS0 messages fully acked --- .../src/emqx_persistent_session_ds_stream_scheduler.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 45bf6ede1..6aa4ab005 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -168,7 +168,6 @@ del_subscription(SubId, S0) -> %%================================================================================ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> - %% TODO: hash collisions Key = {SubId, Stream}, case emqx_persistent_session_ds_state:get_stream(Key, S) of undefined -> @@ -303,6 +302,12 @@ compare_streams( is_fully_replayed(Comm1, Comm2, S = #srs{it_end = It}) -> It =:= end_of_stream andalso is_fully_acked(Comm1, Comm2, S). +is_fully_acked(_, _, #srs{ + first_seqno_qos1 = Q1, last_seqno_qos1 = Q1, first_seqno_qos2 = Q2, last_seqno_qos2 = Q2 +}) -> + %% Streams where the last chunk doesn't contain any QoS1 and 2 + %% messages are considered fully acked: + true; is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) -> (Comm1 >= S1) andalso (Comm2 >= S2). From 30eb54e86b3a2e4233e68a46c71bb236e8684237 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 5 Feb 2024 19:30:12 +0100 Subject: [PATCH 2/4] fix(sessds): Delay unsubscribe until full ack of in-flight messages --- apps/emqx/src/emqx_persistent_session_ds.hrl | 5 +- ...persistent_session_ds_stream_scheduler.erl | 70 ++++++++++++++++--- 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index fa4bfacf1..8a24be31e 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -62,7 +62,10 @@ first_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(), %% Sequence numbers that have to be committed for the batch: last_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(), - last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno() + last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(), + %% This stream belongs to an unsubscribed topic-filter, and is + %% marked for deletion: + unsubscribed = false :: boolean() }). %% Session metadata keys: diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 6aa4ab005..745a3f948 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -93,7 +93,7 @@ find_new_streams(S) -> (_Key, #srs{it_end = end_of_stream}, Acc) -> Acc; (Key, Stream, Acc) -> - case is_fully_acked(Comm1, Comm2, Stream) of + case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of true -> [{Key, Stream} | Acc]; false -> @@ -124,25 +124,26 @@ find_new_streams(S) -> %% This way, messages from the same topic/shard are never reordered. -spec renew_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). renew_streams(S0) -> - S1 = remove_fully_replayed_streams(S0), + S1 = remove_unsubscribed_streams(S0), + S2 = remove_fully_replayed_streams(S1), emqx_topic_gbt:fold( - fun(Key, _Subscription = #{start_time := StartTime, id := SubId}, S2) -> + fun(Key, _Subscription = #{start_time := StartTime, id := SubId}, Acc) -> TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)), Streams = select_streams( SubId, emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), - S2 + Acc ), lists:foldl( - fun(I, Acc) -> - ensure_iterator(TopicFilter, StartTime, SubId, I, Acc) + fun(I, Acc1) -> + ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1) end, - S2, + Acc, Streams ) end, - S1, - emqx_persistent_session_ds_state:get_subscriptions(S1) + S2, + emqx_persistent_session_ds_state:get_subscriptions(S2) ). -spec del_subscription( @@ -150,11 +151,33 @@ renew_streams(S0) -> ) -> emqx_persistent_session_ds_state:t(). del_subscription(SubId, S0) -> + %% NOTE: this function only marks the streams for deletion, + %% instead of outright deleting them. + %% + %% It's done for two reasons: + %% + %% - MQTT standard states that the broker MUST process acks for + %% all sent messages, and it MAY keep on sending buffered + %% messages: + %% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901186 + %% + %% - Deleting the streams may lead to gaps in the sequence number + %% series, and lead to problems with acknowledgement tracking, we + %% avoid that by delaying the deletion. + %% + %% When the stream is marked for deletion, the session won't fetch + %% _new_ batches from it. Actual deletion is done by + %% `renew_streams', when it detects that all in-flight messages + %% from the stream have been acked by the client. emqx_persistent_session_ds_state:fold_streams( - fun(Key, _, Acc) -> + fun(Key, ReplayState, Acc) -> case Key of {SubId, _Stream} -> - emqx_persistent_session_ds_state:del_stream(Key, Acc); + %% This stream belongs to a deleted subscription. + %% Mark for deletion: + emqx_persistent_session_ds_state:put_stream( + Key, ReplayState#srs{unsubscribed = true}, Acc + ); _ -> Acc end @@ -184,6 +207,10 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> it_end = Iterator }, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); + SRS = #srs{unsubscribed = true} -> + %% The session resubscribed to the stream after + %% unsubscribing. Spare the stream: + emqx_persistent_session_ds_state:put_stream(Key, SRS#srs{unsubscribed = false}, S); #srs{} -> S end. @@ -222,6 +249,27 @@ select_streams(SubId, RankX, Streams0, S) -> lists:takewhile(fun({{_, Y}, _}) -> Y =:= MinRankY end, Streams) end. +%% @doc Remove fully acked streams for the deleted subscriptions. +-spec remove_unsubscribed_streams(emqx_persistent_session_ds_state:t()) -> + emqx_persistent_session_ds_state:t(). +remove_unsubscribed_streams(S0) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0), + emqx_persistent_session_ds_state:fold_streams( + fun(Key, ReplayState, S1) -> + case + ReplayState#srs.unsubscribed andalso is_fully_acked(CommQos1, CommQos2, ReplayState) + of + true -> + emqx_persistent_session_ds_state:del_stream(Key, S1); + false -> + S1 + end + end, + S0, + S0 + ). + %% @doc Advance RankY for each RankX that doesn't have any unreplayed %% streams. %% From 3000a8f286b84b4cc38fdf95ef4ee8a7e57f5eac Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 5 Feb 2024 23:48:14 +0100 Subject: [PATCH 3/4] fix(sessds): Postpone deletion of the subscription until fully acked --- apps/emqx/src/emqx_persistent_session_ds.erl | 69 ++++++++++++++-- ...persistent_session_ds_stream_scheduler.erl | 53 +++++++------ .../test/emqx_persistent_session_SUITE.erl | 79 ++++++++++++++++++- 3 files changed, 170 insertions(+), 31 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index cf027bd47..d92e3cc24 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -117,7 +117,7 @@ id := subscription_id(), start_time := emqx_ds:time(), props := map(), - extra := map() + deleted := boolean() }. -define(TIMER_PULL, timer_pull). @@ -313,7 +313,8 @@ subscribe( Subscription = #{ start_time => now_ms(), props => SubOpts, - id => SubId + id => SubId, + deleted => false }, IsNew = true; Subscription0 = #{} -> @@ -343,12 +344,17 @@ unsubscribe( -spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). -do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) -> - S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0), +do_unsubscribe(SessionId, TopicFilter, SubMeta0 = #{id := SubId}, S0) -> + %% Note: we cannot delete the subscription immediately, since its + %% metadata can be used during replay (see `process_batch'). We + %% instead mark it as deleted, and let `subscription_gc' function + %% dispatch it later: + SubMeta = SubMeta0#{deleted => true}, + S1 = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], SubMeta, S0), ?tp(persistent_session_ds_subscription_delete, #{ session_id => SessionId, topic_filter => TopicFilter }), - S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1), + S = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1), ?tp_span( persistent_session_ds_subscription_route_delete, #{session_id => SessionId, topic_filter => TopicFilter}, @@ -459,7 +465,8 @@ handle_timeout( Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> - S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S0), + S1 = subscription_gc(S0), + S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1), Interval = emqx_config:get([session_persistence, renew_streams_interval]), Session = emqx_session:ensure_timer( ?TIMER_GET_STREAMS, @@ -502,6 +509,7 @@ replay(ClientInfo, [], Session0 = #{s := S0}) -> Session0, Streams ), + logger:error("Replay streams: ~p~n~p", [Streams, Session]), %% Note: we filled the buffer with the historical messages, and %% from now on we'll rely on the normal inflight/flow control %% mechanisms to replay them: @@ -897,9 +905,43 @@ do_drain_buffer(Inflight0, S0, Acc) -> %%-------------------------------------------------------------------------------- +%% @doc Remove subscriptions that have been marked for deletion, and +%% that don't have any unacked messages: +subscription_gc(S0) -> + subs_fold_all( + fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) -> + case Deleted andalso has_no_unacked_streams(SubId, S0) of + true -> + emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc); + false -> + Acc + end + end, + S0, + S0 + ). + +has_no_unacked_streams(SubId, S) -> + emqx_persistent_session_ds_state:fold_streams( + fun + ({SID, _Stream}, Srs, Acc) when SID =:= SubId -> + emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc; + (_StreamKey, _Srs, Acc) -> + Acc + end, + true, + S + ). + +%% @doc It only returns subscriptions that haven't been marked for deletion: subs_lookup(TopicFilter, S) -> Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined). + case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of + #{deleted := true} -> + undefined; + Sub -> + Sub + end. subs_to_map(S) -> subs_fold( @@ -909,6 +951,19 @@ subs_to_map(S) -> ). subs_fold(Fun, AccIn, S) -> + subs_fold_all( + fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) -> + case Deleted of + true -> Acc; + false -> Fun(TopicFilter, Sub, Acc) + end + end, + AccIn, + S + ). + +%% @doc Iterate over all subscriptions, including the deleted ones: +subs_fold_all(Fun, AccIn, S) -> Subs = emqx_persistent_session_ds_state:get_subscriptions(S), emqx_topic_gbt:fold( fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 745a3f948..ae15f2bd6 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -16,8 +16,8 @@ -module(emqx_persistent_session_ds_stream_scheduler). %% API: --export([find_new_streams/1, find_replay_streams/1]). --export([renew_streams/1, del_subscription/2]). +-export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2]). +-export([renew_streams/1, on_unsubscribe/2]). %% behavior callbacks: -export([]). @@ -127,30 +127,33 @@ renew_streams(S0) -> S1 = remove_unsubscribed_streams(S0), S2 = remove_fully_replayed_streams(S1), emqx_topic_gbt:fold( - fun(Key, _Subscription = #{start_time := StartTime, id := SubId}, Acc) -> - TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)), - Streams = select_streams( - SubId, - emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), + fun + (Key, _Subscription = #{start_time := StartTime, id := SubId, deleted := false}, Acc) -> + TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)), + Streams = select_streams( + SubId, + emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), + Acc + ), + lists:foldl( + fun(I, Acc1) -> + ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1) + end, + Acc, + Streams + ); + (_Key, _DeletedSubscription, Acc) -> Acc - ), - lists:foldl( - fun(I, Acc1) -> - ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1) - end, - Acc, - Streams - ) end, S2, emqx_persistent_session_ds_state:get_subscriptions(S2) ). --spec del_subscription( +-spec on_unsubscribe( emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t() ) -> emqx_persistent_session_ds_state:t(). -del_subscription(SubId, S0) -> +on_unsubscribe(SubId, S0) -> %% NOTE: this function only marks the streams for deletion, %% instead of outright deleting them. %% @@ -170,13 +173,13 @@ del_subscription(SubId, S0) -> %% `renew_streams', when it detects that all in-flight messages %% from the stream have been acked by the client. emqx_persistent_session_ds_state:fold_streams( - fun(Key, ReplayState, Acc) -> + fun(Key, Srs, Acc) -> case Key of {SubId, _Stream} -> %% This stream belongs to a deleted subscription. %% Mark for deletion: emqx_persistent_session_ds_state:put_stream( - Key, ReplayState#srs{unsubscribed = true}, Acc + Key, Srs#srs{unsubscribed = true}, Acc ); _ -> Acc @@ -186,6 +189,14 @@ del_subscription(SubId, S0) -> S0 ). +-spec is_fully_acked( + emqx_persistent_session_ds:stream_state(), emqx_persistent_session_ds_state:t() +) -> boolean(). +is_fully_acked(Srs, S) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), + is_fully_acked(CommQos1, CommQos2, Srs). + %%================================================================================ %% Internal functions %%================================================================================ @@ -207,10 +218,6 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> it_end = Iterator }, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); - SRS = #srs{unsubscribed = true} -> - %% The session resubscribed to the stream after - %% unsubscribing. Spare the stream: - emqx_persistent_session_ds_state:put_stream(Key, SRS#srs{unsubscribed = false}, S); #srs{} -> S end. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index bdd3e367f..f8ee11c08 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -278,7 +278,10 @@ publish_many(Messages) -> publish_many(Messages, WaitForUnregister) -> Fun = fun(Client, Message) -> - {ok, _} = emqtt:publish(Client, Message) + case emqtt:publish(Client, Message) of + ok -> ok; + {ok, _} -> ok + end end, do_publish(Messages, Fun, WaitForUnregister). @@ -1026,6 +1029,80 @@ t_unsubscribe(Config) -> ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), ok = emqtt:disconnect(Client). +%% This testcase verifies that un-acked messages that were once sent +%% to the client are still retransmitted after the session +%% unsubscribes from the topic and reconnects. +t_unsubscribe_replay(Config) -> + ConnFun = ?config(conn_fun, Config), + TopicPrefix = ?config(topic, Config), + ClientId = atom_to_binary(?FUNCTION_NAME), + ClientOpts = [ + {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 10}}, + {max_inflight, 10} + | Config + ], + {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]), + {ok, _} = emqtt:ConnFun(Sub), + %% 1. Make two subscriptions, one is to be deleted: + Topic1 = iolist_to_binary([TopicPrefix, $/, <<"unsub">>]), + Topic2 = iolist_to_binary([TopicPrefix, $/, <<"sub">>]), + ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic1, qos2)), + ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic2, qos2)), + %% 2. Publish 2 messages to the first and second topics each + %% (client doesn't ack them): + ok = publish(Topic1, <<"1">>, ?QOS_1), + ok = publish(Topic1, <<"2">>, ?QOS_2), + [Msg1, Msg2] = receive_messages(2), + ?assertMatch( + [ + #{payload := <<"1">>}, + #{payload := <<"2">>} + ], + [Msg1, Msg2] + ), + ok = publish(Topic2, <<"3">>, ?QOS_1), + ok = publish(Topic2, <<"4">>, ?QOS_2), + [Msg3, Msg4] = receive_messages(2), + ?assertMatch( + [ + #{payload := <<"3">>}, + #{payload := <<"4">>} + ], + [Msg3, Msg4] + ), + %% 3. Unsubscribe from the topic and disconnect: + ?assertMatch({ok, _, _}, emqtt:unsubscribe(Sub, Topic1)), + ok = emqtt:disconnect(Sub), + %% 5. Publish more messages to the disconnected topic: + ok = publish(Topic1, <<"5">>, ?QOS_1), + ok = publish(Topic1, <<"6">>, ?QOS_2), + %% 4. Reconnect the client. It must only receive only four + %% messages from the time when it was subscribed: + {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]), + ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)), + %% Note: we ask for 6 messages, but expect only 4, it's + %% intentional: + ?assertMatch( + #{ + Topic1 := [<<"1">>, <<"2">>], + Topic2 := [<<"3">>, <<"4">>] + }, + get_topicwise_order(receive_messages(6, 5_000)), + debug_info(ClientId) + ), + %% 5. Now let's resubscribe, and check that the session can receive new messages: + ?assertMatch({ok, _, _}, emqtt:subscribe(Sub1, Topic1, qos2)), + ok = publish(Topic1, <<"7">>, ?QOS_0), + ok = publish(Topic1, <<"8">>, ?QOS_1), + ok = publish(Topic1, <<"9">>, ?QOS_2), + ?assertMatch( + [<<"7">>, <<"8">>, <<"9">>], + lists:map(fun get_msgpub_payload/1, receive_messages(3)) + ), + ok = emqtt:disconnect(Sub1). + t_multiple_subscription_matches(Config) -> ConnFun = ?config(conn_fun, Config), Topic = ?config(topic, Config), From 19c6d1127ff4e56f9cce82e08eb859dbcba6b1f1 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 7 Feb 2024 12:41:51 +0100 Subject: [PATCH 4/4] refactor(sessds): Extract subscription mgmt logic to separate module --- apps/emqx/src/emqx_persistent_session_ds.erl | 92 +---------- ...persistent_session_ds_stream_scheduler.erl | 8 +- .../src/emqx_persistent_session_ds_subs.erl | 154 ++++++++++++++++++ 3 files changed, 167 insertions(+), 87 deletions(-) create mode 100644 apps/emqx/src/emqx_persistent_session_ds_subs.erl diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index d92e3cc24..16b6db8a9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -209,7 +209,7 @@ info(created_at, #{s := S}) -> info(is_persistent, #{}) -> true; info(subscriptions, #{s := S}) -> - subs_to_map(S); + emqx_persistent_session_ds_subs:to_map(S); info(subscriptions_cnt, #{s := S}) -> emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S)); info(subscriptions_max, #{props := Conf}) -> @@ -280,7 +280,7 @@ subscribe( SubOpts, Session = #{id := ID, s := S0} ) -> - case subs_lookup(TopicFilter, S0) of + case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of undefined -> %% TODO: max subscriptions @@ -322,7 +322,7 @@ subscribe( IsNew = false, S1 = S0 end, - S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S1), + S = emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, Subscription, S1), ?tp(persistent_session_ds_subscription_added, #{ topic_filter => TopicFilter, sub => Subscription, is_new => IsNew }), @@ -334,7 +334,7 @@ unsubscribe( TopicFilter, Session = #{id := ID, s := S0} ) -> - case subs_lookup(TopicFilter, S0) of + case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of undefined -> {error, ?RC_NO_SUBSCRIPTION_EXISTED}; Subscription = #{props := SubOpts} -> @@ -344,13 +344,8 @@ unsubscribe( -spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). -do_unsubscribe(SessionId, TopicFilter, SubMeta0 = #{id := SubId}, S0) -> - %% Note: we cannot delete the subscription immediately, since its - %% metadata can be used during replay (see `process_batch'). We - %% instead mark it as deleted, and let `subscription_gc' function - %% dispatch it later: - SubMeta = SubMeta0#{deleted => true}, - S1 = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], SubMeta, S0), +do_unsubscribe(SessionId, TopicFilter, Subscription = #{id := SubId}, S0) -> + S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, Subscription, S0), ?tp(persistent_session_ds_subscription_delete, #{ session_id => SessionId, topic_filter => TopicFilter }), @@ -365,7 +360,7 @@ do_unsubscribe(SessionId, TopicFilter, SubMeta0 = #{id := SubId}, S0) -> -spec get_subscription(topic_filter(), session()) -> emqx_types:subopts() | undefined. get_subscription(TopicFilter, #{s := S}) -> - case subs_lookup(TopicFilter, S) of + case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of _Subscription = #{props := SubOpts} -> SubOpts; undefined -> @@ -465,7 +460,7 @@ handle_timeout( Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> - S1 = subscription_gc(S0), + S1 = emqx_persistent_session_ds_subs:gc(S0), S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1), Interval = emqx_config:get([session_persistence, renew_streams_interval]), Session = emqx_session:ensure_timer( @@ -509,7 +504,6 @@ replay(ClientInfo, [], Session0 = #{s := S0}) -> Session0, Streams ), - logger:error("Replay streams: ~p~n~p", [Streams, Session]), %% Note: we filled the buffer with the historical messages, and %% from now on we'll rely on the normal inflight/flow control %% mechanisms to replay them: @@ -687,7 +681,7 @@ session_drop(ID, Reason) -> case emqx_persistent_session_ds_state:open(ID) of {ok, S0} -> ?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}), - _S = subs_fold( + _S = emqx_persistent_session_ds_subs:fold( fun(TopicFilter, Subscription, S) -> do_unsubscribe(ID, TopicFilter, Subscription, S) end, @@ -905,74 +899,6 @@ do_drain_buffer(Inflight0, S0, Acc) -> %%-------------------------------------------------------------------------------- -%% @doc Remove subscriptions that have been marked for deletion, and -%% that don't have any unacked messages: -subscription_gc(S0) -> - subs_fold_all( - fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) -> - case Deleted andalso has_no_unacked_streams(SubId, S0) of - true -> - emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc); - false -> - Acc - end - end, - S0, - S0 - ). - -has_no_unacked_streams(SubId, S) -> - emqx_persistent_session_ds_state:fold_streams( - fun - ({SID, _Stream}, Srs, Acc) when SID =:= SubId -> - emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc; - (_StreamKey, _Srs, Acc) -> - Acc - end, - true, - S - ). - -%% @doc It only returns subscriptions that haven't been marked for deletion: -subs_lookup(TopicFilter, S) -> - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of - #{deleted := true} -> - undefined; - Sub -> - Sub - end. - -subs_to_map(S) -> - subs_fold( - fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end, - #{}, - S - ). - -subs_fold(Fun, AccIn, S) -> - subs_fold_all( - fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) -> - case Deleted of - true -> Acc; - false -> Fun(TopicFilter, Sub, Acc) - end - end, - AccIn, - S - ). - -%% @doc Iterate over all subscriptions, including the deleted ones: -subs_fold_all(Fun, AccIn, S) -> - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - emqx_topic_gbt:fold( - fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, - AccIn, - Subs - ). - -%%-------------------------------------------------------------------------------- - %% TODO: find a more reliable way to perform actions that have side %% effects. Add `CBM:init' callback to the session behavior? -spec ensure_timers(session()) -> session(). diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index ae15f2bd6..286d32ef4 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -126,10 +126,10 @@ find_new_streams(S) -> renew_streams(S0) -> S1 = remove_unsubscribed_streams(S0), S2 = remove_fully_replayed_streams(S1), - emqx_topic_gbt:fold( + emqx_persistent_session_ds_subs:fold( fun - (Key, _Subscription = #{start_time := StartTime, id := SubId, deleted := false}, Acc) -> - TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)), + (Key, #{start_time := StartTime, id := SubId, deleted := false}, Acc) -> + TopicFilter = emqx_topic:words(Key), Streams = select_streams( SubId, emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), @@ -146,7 +146,7 @@ renew_streams(S0) -> Acc end, S2, - emqx_persistent_session_ds_state:get_subscriptions(S2) + S2 ). -spec on_unsubscribe( diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl new file mode 100644 index 000000000..92f17b108 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -0,0 +1,154 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc This module encapsulates the data related to the client's +%% subscriptions. It tries to reppresent the subscriptions as if they +%% were a simple key-value map. +%% +%% In reality, however, the session has to retain old the +%% subscriptions for longer to ensure the consistency of message +%% replay. +-module(emqx_persistent_session_ds_subs). + +%% API: +-export([on_subscribe/3, on_unsubscribe/3, gc/1, lookup/2, to_map/1, fold/3, fold_all/3]). + +-export_type([]). + +%%================================================================================ +%% Type declarations +%%================================================================================ + +%%================================================================================ +%% API functions +%%================================================================================ + +%% @doc Process a new subscription +-spec on_subscribe( + emqx_persistent_session_ds:topic_filter(), + emqx_persistent_session_ds:subscription(), + emqx_persistent_session_ds_state:t() +) -> + emqx_persistent_session_ds_state:t(). +on_subscribe(TopicFilter, Subscription, S) -> + emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S). + +%% @doc Process UNSUBSCRIBE +-spec on_unsubscribe( + emqx_persistent_session_ds:topic_filter(), + emqx_persistent_session_ds:subscription(), + emqx_persistent_session_ds_state:t() +) -> + emqx_persistent_session_ds_state:t(). +on_unsubscribe(TopicFilter, Subscription0, S0) -> + %% Note: we cannot delete the subscription immediately, since its + %% metadata can be used during replay (see `process_batch'). We + %% instead mark it as deleted, and let `subscription_gc' function + %% dispatch it later: + Subscription = Subscription0#{deleted => true}, + emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0). + +%% @doc Remove subscriptions that have been marked for deletion, and +%% that don't have any unacked messages: +-spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). +gc(S0) -> + fold_all( + fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) -> + case Deleted andalso has_no_unacked_streams(SubId, S0) of + true -> + emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc); + false -> + Acc + end + end, + S0, + S0 + ). + +%% @doc Fold over active subscriptions: +-spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) -> + emqx_persistent_session_ds:subscription() | undefined. +lookup(TopicFilter, S) -> + Subs = emqx_persistent_session_ds_state:get_subscriptions(S), + case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of + #{deleted := true} -> + undefined; + Sub -> + Sub + end. + +%% @doc Convert active subscriptions to a map, for information +%% purpose: +-spec to_map(emqx_persistent_session_ds_state:t()) -> map(). +to_map(S) -> + fold( + fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end, + #{}, + S + ). + +%% @doc Fold over active subscriptions: +-spec fold( + fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc), + Acc, + emqx_persistent_session_ds_state:t() +) -> + Acc. +fold(Fun, AccIn, S) -> + fold_all( + fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) -> + case Deleted of + true -> Acc; + false -> Fun(TopicFilter, Sub, Acc) + end + end, + AccIn, + S + ). + +%% @doc Fold over all subscriptions, including inactive ones: +-spec fold_all( + fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc), + Acc, + emqx_persistent_session_ds_state:t() +) -> + Acc. +fold_all(Fun, AccIn, S) -> + Subs = emqx_persistent_session_ds_state:get_subscriptions(S), + emqx_topic_gbt:fold( + fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, + AccIn, + Subs + ). + +%%================================================================================ +%% Internal functions +%%================================================================================ + +-spec has_no_unacked_streams( + emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t() +) -> boolean(). +has_no_unacked_streams(SubId, S) -> + emqx_persistent_session_ds_state:fold_streams( + fun + ({SID, _Stream}, Srs, Acc) when SID =:= SubId -> + emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc; + (_StreamKey, _Srs, Acc) -> + Acc + end, + true, + S + ).