feat(queue): handle partially unacked ranges

This commit is contained in:
Ilya Averyanov 2024-07-08 21:55:25 +03:00
parent 7daab1ab23
commit c569625dd1
2 changed files with 136 additions and 55 deletions

View File

@ -31,6 +31,8 @@
-include("emqx_mqtt.hrl").
-include("logger.hrl").
-include("session_internals.hrl").
-include_lib("emqx/include/emqx_persistent_message.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-export([
@ -338,10 +340,8 @@ accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) ->
accept_stream(Event, S)
end.
%% TODO:
%% handle unacked iterator
accept_stream(
#{topic_filter := TopicFilter, stream := Stream, progress := #{iterator := Iterator}} = _Event,
#{topic_filter := TopicFilter, stream := Stream, progress := Progress} = _Event,
S0
) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
@ -361,6 +361,7 @@ accept_stream(
end,
case NeedCreateStream of
true ->
Iterator = rewind_iterator(Progress),
NewSRS =
#srs{
rank_x = ?rank_x,
@ -376,6 +377,52 @@ accept_stream(
end
end.
%% Skip acked messages.
%% This may be a bit inefficient, and it is unclear how to handle errors.
%%
%% A better variant would be to wrap the iterator on `emqx_ds` level in a new one,
%% that will skip acked messages internally in `emqx_ds:next` function.
%% Unluckily, emqx_ds does not have a wrapping structure around iterators of
%% the underlying levels, so we cannot wrap it without a risk of confusion.
rewind_iterator(#{iterator := Iterator, acked := true}) ->
Iterator;
rewind_iterator(#{iterator := Iterator0, acked := false, qos1_acked := 0, qos2_acked := 0}) ->
Iterator0;
%% This should not happen, means the DS is consistent
rewind_iterator(#{iterator := Iterator0, acked := false, qos1_acked := Q1, qos2_acked := Q2}) when
Q1 < 0 orelse Q2 < 0
->
Iterator0;
rewind_iterator(
#{iterator := Iterator0, acked := false, qos1_acked := Q1Old, qos2_acked := Q2Old} = Progress
) ->
case emqx_ds:next(?PERSISTENT_MESSAGE_DB, Iterator0, Q1Old + Q2Old) of
{ok, Iterator1, Messages} ->
{Q1New, Q2New} = update_qos_acked(Q1Old, Q2Old, Messages),
rewind_iterator(Progress#{
iterator => Iterator1, qos1_acked => Q1New, qos2_acked => Q2New
});
{ok, end_of_stream} ->
end_of_stream;
{error, _, _} ->
%% What to do here?
%% In the wrapping variant we do not have this problem.
Iterator0
end.
update_qos_acked(Q1, Q2, []) ->
{Q1, Q2};
update_qos_acked(Q1, Q2, [{_Key, Message} | Messages]) ->
case emqx_message:qos(Message) of
?QOS_1 ->
update_qos_acked(Q1 - 1, Q2, Messages);
?QOS_2 ->
update_qos_acked(Q1, Q2 - 1, Messages);
_ ->
update_qos_acked(Q1, Q2, Messages)
end.
revoke_stream(
#{topic_filter := TopicFilter, stream := Stream}, S0
) ->
@ -543,7 +590,7 @@ stream_progresses(S, StreamKeys) ->
on_disconnect(S0, #{agent := Agent0} = SharedSubS0) ->
S1 = revoke_all_streams(S0),
Progresses = all_stream_progresses(S1, Agent0),
Progresses = all_stream_progresses(S1, Agent0, _NeedUnacked = true),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses),
SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}},
{S1, SharedSubS1}.

View File

@ -183,51 +183,6 @@ t_graceful_disconnect(_Config) ->
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_disconnect_no_double_replay(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr9/topic9/#">>, 1),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr9/topic9/#">>, 1),
ct:sleep(1000),
NPubs = 10_000,
Topics = [<<"topic9/1">>, <<"topic9/2">>, <<"topic9/3">>],
ok = publish_n(ConnPub, Topics, 1, NPubs),
Self = self(),
_ = spawn_link(fun() ->
ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs),
Self ! publish_done
end),
ok = emqtt:disconnect(ConnShared2),
receive
publish_done -> ok
end,
Pubs = drain_publishes(),
ClientByBid = fun(Pid) ->
case Pid of
ConnShared1 -> <<"client_shared1">>;
ConnShared2 -> <<"client_shared2">>
end
end,
{Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
?assertEqual([], Missing),
?assertEqual([], Duplicate),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnPub).
t_intensive_reassign(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
@ -373,6 +328,80 @@ t_quick_resubscribe(_Config) ->
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_disconnect_no_double_replay1(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr11/topic11/#">>, 1),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr11/topic11/#">>, 1),
ct:sleep(1000),
NPubs = 10_000,
Topics = [<<"topic11/1">>, <<"topic11/2">>, <<"topic11/3">>],
ok = publish_n(ConnPub, Topics, 1, NPubs),
Self = self(),
_ = spawn_link(fun() ->
ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs),
Self ! publish_done
end),
ok = emqtt:disconnect(ConnShared2),
receive
publish_done -> ok
end,
Pubs = drain_publishes(),
ClientByBid = fun(Pid) ->
case Pid of
ConnShared1 -> <<"client_shared1">>;
ConnShared2 -> <<"client_shared2">>
end
end,
{Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
?assertEqual([], Missing),
?assertEqual([], Duplicate),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnPub).
t_disconnect_no_double_replay2(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>, [{auto_ack, false}]),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr12/topic12/#">>, 1),
ct:sleep(1000),
ok = publish_n(ConnPub, [<<"topic12/1">>], 1, 20),
receive
{publish, #{payload := <<"1">>, packet_id := PacketId1}} ->
ok = emqtt:puback(ConnShared1, PacketId1)
after 5000 ->
ct:fail("No publish received")
end,
ok = emqtt:disconnect(ConnShared1),
ConnShared12 = emqtt_connect_sub(<<"client_shared12">>),
{ok, _, _} = emqtt:subscribe(ConnShared12, <<"$share/gr12/topic12/#">>, 1),
?assertNotReceive(
{publish, #{payload := <<"1">>}},
3000
),
ok = emqtt:disconnect(ConnShared12).
t_lease_reconnect(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
@ -432,12 +461,17 @@ t_renew_lease_timeout(_Config) ->
%%--------------------------------------------------------------------
emqtt_connect_sub(ClientId) ->
{ok, C} = emqtt:start_link([
{clientid, ClientId},
{clean_start, true},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 7_200}}
]),
emqtt_connect_sub(ClientId, []).
emqtt_connect_sub(ClientId, Options) ->
{ok, C} = emqtt:start_link(
[
{clientid, ClientId},
{clean_start, true},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 7_200}}
] ++ Options
),
{ok, _} = emqtt:connect(C),
C.