From c569625dd17ba6a61a5530a0aed9f1699d0e0b29 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Mon, 8 Jul 2024 21:55:25 +0300 Subject: [PATCH] feat(queue): handle partially unacked ranges --- ...emqx_persistent_session_ds_shared_subs.erl | 55 ++++++- .../test/emqx_ds_shared_sub_SUITE.erl | 136 +++++++++++------- 2 files changed, 136 insertions(+), 55 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index bb4c62726..7535e1a61 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -31,6 +31,8 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("session_internals.hrl"). + +-include_lib("emqx/include/emqx_persistent_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). -export([ @@ -338,10 +340,8 @@ accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) -> accept_stream(Event, S) end. -%% TODO: -%% handle unacked iterator accept_stream( - #{topic_filter := TopicFilter, stream := Stream, progress := #{iterator := Iterator}} = _Event, + #{topic_filter := TopicFilter, stream := Stream, progress := Progress} = _Event, S0 ) -> case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of @@ -361,6 +361,7 @@ accept_stream( end, case NeedCreateStream of true -> + Iterator = rewind_iterator(Progress), NewSRS = #srs{ rank_x = ?rank_x, @@ -376,6 +377,52 @@ accept_stream( end end. +%% Skip acked messages. +%% This may be a bit inefficient, and it is unclear how to handle errors. +%% +%% A better variant would be to wrap the iterator on `emqx_ds` level in a new one, +%% that will skip acked messages internally in `emqx_ds:next` function. +%% Unluckily, emqx_ds does not have a wrapping structure around iterators of +%% the underlying levels, so we cannot wrap it without a risk of confusion. + +rewind_iterator(#{iterator := Iterator, acked := true}) -> + Iterator; +rewind_iterator(#{iterator := Iterator0, acked := false, qos1_acked := 0, qos2_acked := 0}) -> + Iterator0; +%% This should not happen, means the DS is consistent +rewind_iterator(#{iterator := Iterator0, acked := false, qos1_acked := Q1, qos2_acked := Q2}) when + Q1 < 0 orelse Q2 < 0 +-> + Iterator0; +rewind_iterator( + #{iterator := Iterator0, acked := false, qos1_acked := Q1Old, qos2_acked := Q2Old} = Progress +) -> + case emqx_ds:next(?PERSISTENT_MESSAGE_DB, Iterator0, Q1Old + Q2Old) of + {ok, Iterator1, Messages} -> + {Q1New, Q2New} = update_qos_acked(Q1Old, Q2Old, Messages), + rewind_iterator(Progress#{ + iterator => Iterator1, qos1_acked => Q1New, qos2_acked => Q2New + }); + {ok, end_of_stream} -> + end_of_stream; + {error, _, _} -> + %% What to do here? + %% In the wrapping variant we do not have this problem. + Iterator0 + end. + +update_qos_acked(Q1, Q2, []) -> + {Q1, Q2}; +update_qos_acked(Q1, Q2, [{_Key, Message} | Messages]) -> + case emqx_message:qos(Message) of + ?QOS_1 -> + update_qos_acked(Q1 - 1, Q2, Messages); + ?QOS_2 -> + update_qos_acked(Q1, Q2 - 1, Messages); + _ -> + update_qos_acked(Q1, Q2, Messages) + end. + revoke_stream( #{topic_filter := TopicFilter, stream := Stream}, S0 ) -> @@ -543,7 +590,7 @@ stream_progresses(S, StreamKeys) -> on_disconnect(S0, #{agent := Agent0} = SharedSubS0) -> S1 = revoke_all_streams(S0), - Progresses = all_stream_progresses(S1, Agent0), + Progresses = all_stream_progresses(S1, Agent0, _NeedUnacked = true), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses), SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}}, {S1, SharedSubS1}. diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index 0f665b5a3..dfc2203c4 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -183,51 +183,6 @@ t_graceful_disconnect(_Config) -> ok = emqtt:disconnect(ConnShared2), ok = emqtt:disconnect(ConnPub). -t_disconnect_no_double_replay(_Config) -> - ConnPub = emqtt_connect_pub(<<"client_pub">>), - - ConnShared1 = emqtt_connect_sub(<<"client_shared1">>), - {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr9/topic9/#">>, 1), - - ConnShared2 = emqtt_connect_sub(<<"client_shared2">>), - {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr9/topic9/#">>, 1), - - ct:sleep(1000), - - NPubs = 10_000, - - Topics = [<<"topic9/1">>, <<"topic9/2">>, <<"topic9/3">>], - ok = publish_n(ConnPub, Topics, 1, NPubs), - - Self = self(), - _ = spawn_link(fun() -> - ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs), - Self ! publish_done - end), - - ok = emqtt:disconnect(ConnShared2), - - receive - publish_done -> ok - end, - - Pubs = drain_publishes(), - - ClientByBid = fun(Pid) -> - case Pid of - ConnShared1 -> <<"client_shared1">>; - ConnShared2 -> <<"client_shared2">> - end - end, - - {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), - - ?assertEqual([], Missing), - ?assertEqual([], Duplicate), - - ok = emqtt:disconnect(ConnShared1), - ok = emqtt:disconnect(ConnPub). - t_intensive_reassign(_Config) -> ConnPub = emqtt_connect_pub(<<"client_pub">>), @@ -373,6 +328,80 @@ t_quick_resubscribe(_Config) -> ok = emqtt:disconnect(ConnShared2), ok = emqtt:disconnect(ConnPub). +t_disconnect_no_double_replay1(_Config) -> + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + ConnShared1 = emqtt_connect_sub(<<"client_shared1">>), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr11/topic11/#">>, 1), + + ConnShared2 = emqtt_connect_sub(<<"client_shared2">>), + {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr11/topic11/#">>, 1), + + ct:sleep(1000), + + NPubs = 10_000, + + Topics = [<<"topic11/1">>, <<"topic11/2">>, <<"topic11/3">>], + ok = publish_n(ConnPub, Topics, 1, NPubs), + + Self = self(), + _ = spawn_link(fun() -> + ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs), + Self ! publish_done + end), + + ok = emqtt:disconnect(ConnShared2), + + receive + publish_done -> ok + end, + + Pubs = drain_publishes(), + + ClientByBid = fun(Pid) -> + case Pid of + ConnShared1 -> <<"client_shared1">>; + ConnShared2 -> <<"client_shared2">> + end + end, + + {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), + + ?assertEqual([], Missing), + ?assertEqual([], Duplicate), + + ok = emqtt:disconnect(ConnShared1), + ok = emqtt:disconnect(ConnPub). + +t_disconnect_no_double_replay2(_Config) -> + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + ConnShared1 = emqtt_connect_sub(<<"client_shared1">>, [{auto_ack, false}]), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr12/topic12/#">>, 1), + + ct:sleep(1000), + + ok = publish_n(ConnPub, [<<"topic12/1">>], 1, 20), + + receive + {publish, #{payload := <<"1">>, packet_id := PacketId1}} -> + ok = emqtt:puback(ConnShared1, PacketId1) + after 5000 -> + ct:fail("No publish received") + end, + + ok = emqtt:disconnect(ConnShared1), + + ConnShared12 = emqtt_connect_sub(<<"client_shared12">>), + {ok, _, _} = emqtt:subscribe(ConnShared12, <<"$share/gr12/topic12/#">>, 1), + + ?assertNotReceive( + {publish, #{payload := <<"1">>}}, + 3000 + ), + + ok = emqtt:disconnect(ConnShared12). + t_lease_reconnect(_Config) -> ConnPub = emqtt_connect_pub(<<"client_pub">>), @@ -432,12 +461,17 @@ t_renew_lease_timeout(_Config) -> %%-------------------------------------------------------------------- emqtt_connect_sub(ClientId) -> - {ok, C} = emqtt:start_link([ - {clientid, ClientId}, - {clean_start, true}, - {proto_ver, v5}, - {properties, #{'Session-Expiry-Interval' => 7_200}} - ]), + emqtt_connect_sub(ClientId, []). + +emqtt_connect_sub(ClientId, Options) -> + {ok, C} = emqtt:start_link( + [ + {clientid, ClientId}, + {clean_start, true}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 7_200}} + ] ++ Options + ), {ok, _} = emqtt:connect(C), C.