From 3000a8f286b84b4cc38fdf95ef4ee8a7e57f5eac Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 5 Feb 2024 23:48:14 +0100 Subject: [PATCH] fix(sessds): Postpone deletion of the subscription until fully acked --- apps/emqx/src/emqx_persistent_session_ds.erl | 69 ++++++++++++++-- ...persistent_session_ds_stream_scheduler.erl | 53 +++++++------ .../test/emqx_persistent_session_SUITE.erl | 79 ++++++++++++++++++- 3 files changed, 170 insertions(+), 31 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index cf027bd47..d92e3cc24 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -117,7 +117,7 @@ id := subscription_id(), start_time := emqx_ds:time(), props := map(), - extra := map() + deleted := boolean() }. -define(TIMER_PULL, timer_pull). @@ -313,7 +313,8 @@ subscribe( Subscription = #{ start_time => now_ms(), props => SubOpts, - id => SubId + id => SubId, + deleted => false }, IsNew = true; Subscription0 = #{} -> @@ -343,12 +344,17 @@ unsubscribe( -spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). -do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) -> - S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0), +do_unsubscribe(SessionId, TopicFilter, SubMeta0 = #{id := SubId}, S0) -> + %% Note: we cannot delete the subscription immediately, since its + %% metadata can be used during replay (see `process_batch'). We + %% instead mark it as deleted, and let `subscription_gc' function + %% dispatch it later: + SubMeta = SubMeta0#{deleted => true}, + S1 = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], SubMeta, S0), ?tp(persistent_session_ds_subscription_delete, #{ session_id => SessionId, topic_filter => TopicFilter }), - S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1), + S = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1), ?tp_span( persistent_session_ds_subscription_route_delete, #{session_id => SessionId, topic_filter => TopicFilter}, @@ -459,7 +465,8 @@ handle_timeout( Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1), {ok, Publishes, Session}; handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> - S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S0), + S1 = subscription_gc(S0), + S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1), Interval = emqx_config:get([session_persistence, renew_streams_interval]), Session = emqx_session:ensure_timer( ?TIMER_GET_STREAMS, @@ -502,6 +509,7 @@ replay(ClientInfo, [], Session0 = #{s := S0}) -> Session0, Streams ), + logger:error("Replay streams: ~p~n~p", [Streams, Session]), %% Note: we filled the buffer with the historical messages, and %% from now on we'll rely on the normal inflight/flow control %% mechanisms to replay them: @@ -897,9 +905,43 @@ do_drain_buffer(Inflight0, S0, Acc) -> %%-------------------------------------------------------------------------------- +%% @doc Remove subscriptions that have been marked for deletion, and +%% that don't have any unacked messages: +subscription_gc(S0) -> + subs_fold_all( + fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) -> + case Deleted andalso has_no_unacked_streams(SubId, S0) of + true -> + emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc); + false -> + Acc + end + end, + S0, + S0 + ). + +has_no_unacked_streams(SubId, S) -> + emqx_persistent_session_ds_state:fold_streams( + fun + ({SID, _Stream}, Srs, Acc) when SID =:= SubId -> + emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc; + (_StreamKey, _Srs, Acc) -> + Acc + end, + true, + S + ). + +%% @doc It only returns subscriptions that haven't been marked for deletion: subs_lookup(TopicFilter, S) -> Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined). + case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of + #{deleted := true} -> + undefined; + Sub -> + Sub + end. subs_to_map(S) -> subs_fold( @@ -909,6 +951,19 @@ subs_to_map(S) -> ). subs_fold(Fun, AccIn, S) -> + subs_fold_all( + fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) -> + case Deleted of + true -> Acc; + false -> Fun(TopicFilter, Sub, Acc) + end + end, + AccIn, + S + ). + +%% @doc Iterate over all subscriptions, including the deleted ones: +subs_fold_all(Fun, AccIn, S) -> Subs = emqx_persistent_session_ds_state:get_subscriptions(S), emqx_topic_gbt:fold( fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 745a3f948..ae15f2bd6 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -16,8 +16,8 @@ -module(emqx_persistent_session_ds_stream_scheduler). %% API: --export([find_new_streams/1, find_replay_streams/1]). --export([renew_streams/1, del_subscription/2]). +-export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2]). +-export([renew_streams/1, on_unsubscribe/2]). %% behavior callbacks: -export([]). @@ -127,30 +127,33 @@ renew_streams(S0) -> S1 = remove_unsubscribed_streams(S0), S2 = remove_fully_replayed_streams(S1), emqx_topic_gbt:fold( - fun(Key, _Subscription = #{start_time := StartTime, id := SubId}, Acc) -> - TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)), - Streams = select_streams( - SubId, - emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), + fun + (Key, _Subscription = #{start_time := StartTime, id := SubId, deleted := false}, Acc) -> + TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)), + Streams = select_streams( + SubId, + emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), + Acc + ), + lists:foldl( + fun(I, Acc1) -> + ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1) + end, + Acc, + Streams + ); + (_Key, _DeletedSubscription, Acc) -> Acc - ), - lists:foldl( - fun(I, Acc1) -> - ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1) - end, - Acc, - Streams - ) end, S2, emqx_persistent_session_ds_state:get_subscriptions(S2) ). --spec del_subscription( +-spec on_unsubscribe( emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t() ) -> emqx_persistent_session_ds_state:t(). -del_subscription(SubId, S0) -> +on_unsubscribe(SubId, S0) -> %% NOTE: this function only marks the streams for deletion, %% instead of outright deleting them. %% @@ -170,13 +173,13 @@ del_subscription(SubId, S0) -> %% `renew_streams', when it detects that all in-flight messages %% from the stream have been acked by the client. emqx_persistent_session_ds_state:fold_streams( - fun(Key, ReplayState, Acc) -> + fun(Key, Srs, Acc) -> case Key of {SubId, _Stream} -> %% This stream belongs to a deleted subscription. %% Mark for deletion: emqx_persistent_session_ds_state:put_stream( - Key, ReplayState#srs{unsubscribed = true}, Acc + Key, Srs#srs{unsubscribed = true}, Acc ); _ -> Acc @@ -186,6 +189,14 @@ del_subscription(SubId, S0) -> S0 ). +-spec is_fully_acked( + emqx_persistent_session_ds:stream_state(), emqx_persistent_session_ds_state:t() +) -> boolean(). +is_fully_acked(Srs, S) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), + is_fully_acked(CommQos1, CommQos2, Srs). + %%================================================================================ %% Internal functions %%================================================================================ @@ -207,10 +218,6 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> it_end = Iterator }, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); - SRS = #srs{unsubscribed = true} -> - %% The session resubscribed to the stream after - %% unsubscribing. Spare the stream: - emqx_persistent_session_ds_state:put_stream(Key, SRS#srs{unsubscribed = false}, S); #srs{} -> S end. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index bdd3e367f..f8ee11c08 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -278,7 +278,10 @@ publish_many(Messages) -> publish_many(Messages, WaitForUnregister) -> Fun = fun(Client, Message) -> - {ok, _} = emqtt:publish(Client, Message) + case emqtt:publish(Client, Message) of + ok -> ok; + {ok, _} -> ok + end end, do_publish(Messages, Fun, WaitForUnregister). @@ -1026,6 +1029,80 @@ t_unsubscribe(Config) -> ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), ok = emqtt:disconnect(Client). +%% This testcase verifies that un-acked messages that were once sent +%% to the client are still retransmitted after the session +%% unsubscribes from the topic and reconnects. +t_unsubscribe_replay(Config) -> + ConnFun = ?config(conn_fun, Config), + TopicPrefix = ?config(topic, Config), + ClientId = atom_to_binary(?FUNCTION_NAME), + ClientOpts = [ + {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 10}}, + {max_inflight, 10} + | Config + ], + {ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]), + {ok, _} = emqtt:ConnFun(Sub), + %% 1. Make two subscriptions, one is to be deleted: + Topic1 = iolist_to_binary([TopicPrefix, $/, <<"unsub">>]), + Topic2 = iolist_to_binary([TopicPrefix, $/, <<"sub">>]), + ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic1, qos2)), + ?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic2, qos2)), + %% 2. Publish 2 messages to the first and second topics each + %% (client doesn't ack them): + ok = publish(Topic1, <<"1">>, ?QOS_1), + ok = publish(Topic1, <<"2">>, ?QOS_2), + [Msg1, Msg2] = receive_messages(2), + ?assertMatch( + [ + #{payload := <<"1">>}, + #{payload := <<"2">>} + ], + [Msg1, Msg2] + ), + ok = publish(Topic2, <<"3">>, ?QOS_1), + ok = publish(Topic2, <<"4">>, ?QOS_2), + [Msg3, Msg4] = receive_messages(2), + ?assertMatch( + [ + #{payload := <<"3">>}, + #{payload := <<"4">>} + ], + [Msg3, Msg4] + ), + %% 3. Unsubscribe from the topic and disconnect: + ?assertMatch({ok, _, _}, emqtt:unsubscribe(Sub, Topic1)), + ok = emqtt:disconnect(Sub), + %% 5. Publish more messages to the disconnected topic: + ok = publish(Topic1, <<"5">>, ?QOS_1), + ok = publish(Topic1, <<"6">>, ?QOS_2), + %% 4. Reconnect the client. It must only receive only four + %% messages from the time when it was subscribed: + {ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]), + ?assertMatch({ok, _}, emqtt:ConnFun(Sub1)), + %% Note: we ask for 6 messages, but expect only 4, it's + %% intentional: + ?assertMatch( + #{ + Topic1 := [<<"1">>, <<"2">>], + Topic2 := [<<"3">>, <<"4">>] + }, + get_topicwise_order(receive_messages(6, 5_000)), + debug_info(ClientId) + ), + %% 5. Now let's resubscribe, and check that the session can receive new messages: + ?assertMatch({ok, _, _}, emqtt:subscribe(Sub1, Topic1, qos2)), + ok = publish(Topic1, <<"7">>, ?QOS_0), + ok = publish(Topic1, <<"8">>, ?QOS_1), + ok = publish(Topic1, <<"9">>, ?QOS_2), + ?assertMatch( + [<<"7">>, <<"8">>, <<"9">>], + lists:map(fun get_msgpub_payload/1, receive_messages(3)) + ), + ok = emqtt:disconnect(Sub1). + t_multiple_subscription_matches(Config) -> ConnFun = ?config(conn_fun, Config), Topic = ?config(topic, Config),