feat(queue): move replay progress to a separate data structure

This commit is contained in:
Ilya Averyanov 2024-07-08 19:57:36 +03:00
parent 077ee38530
commit 7daab1ab23
7 changed files with 267 additions and 85 deletions

View File

@ -993,11 +993,12 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
fetch_new_messages(Session0 = #{s := S0, shared_sub_s := SharedSubS0}, ClientInfo) -> fetch_new_messages(Session0 = #{s := S0, shared_sub_s := SharedSubS0}, ClientInfo) ->
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0), {S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0),
LFS = maps:get(last_fetched_stream, Session0, beginning), Session1 = Session0#{s => S1, shared_sub_s => SharedSubS1},
LFS = maps:get(last_fetched_stream, Session1, beginning),
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S1), ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S1),
BatchSize = get_config(ClientInfo, [batch_size]), BatchSize = get_config(ClientInfo, [batch_size]),
Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo), Session2 = fetch_new_messages(ItStream, BatchSize, Session1, ClientInfo),
Session1#{shared_sub_s => SharedSubS1}. Session2#{shared_sub_s => SharedSubS1}.
fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) -> fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
#{inflight := Inflight} = Session0, #{inflight := Inflight} = Session0,

View File

@ -48,6 +48,8 @@
to_map/2 to_map/2
]). ]).
-define(EPOCH_BITS, 15).
-define(schedule_subscribe, schedule_subscribe). -define(schedule_subscribe, schedule_subscribe).
-define(schedule_unsubscribe, schedule_unsubscribe). -define(schedule_unsubscribe, schedule_unsubscribe).
@ -58,10 +60,22 @@
-type agent_stream_progress() :: #{ -type agent_stream_progress() :: #{
stream := emqx_ds:stream(), stream := emqx_ds:stream(),
iterator := emqx_ds:iterator(), progress := progress(),
use_finished := boolean() use_finished := boolean()
}. }.
-type progress() ::
#{
acked := true,
iterator := emqx_ds:iterator()
}
| #{
acked := false,
iterator := emqx_ds:iterator(),
qos1_acked := boolean(),
qos2_acked := boolean()
}.
-type scheduled_action() :: #{ -type scheduled_action() :: #{
type := scheduled_action_type(), type := scheduled_action_type(),
stream_keys_to_wait := [stream_key()], stream_keys_to_wait := [stream_key()],
@ -82,6 +96,11 @@
-define(rank_x, rank_shared). -define(rank_x, rank_shared).
-define(rank_y, 0). -define(rank_y, 0).
-export_type([
progress/0,
agent_stream_progress/0
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -290,7 +309,9 @@ renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = Sh
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams( {StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
Agent0 Agent0
), ),
?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}), ?tp(warning, shared_subs_new_stream_lease_events, #{
stream_lease_events => format_lease_events(StreamLeaseEvents)
}),
S1 = lists:foldl( S1 = lists:foldl(
fun fun
(#{type := lease} = Event, S) -> accept_stream(Event, S, ScheduledActions); (#{type := lease} = Event, S) -> accept_stream(Event, S, ScheduledActions);
@ -317,8 +338,11 @@ accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) ->
accept_stream(Event, S) accept_stream(Event, S)
end. end.
%% TODO:
%% handle unacked iterator
accept_stream( accept_stream(
#{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0 #{topic_filter := TopicFilter, stream := Stream, progress := #{iterator := Iterator}} = _Event,
S0
) -> ) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
undefined -> undefined ->
@ -326,8 +350,17 @@ accept_stream(
S0; S0;
#{id := SubId, current_state := SStateId} -> #{id := SubId, current_state := SStateId} ->
Key = {SubId, Stream}, Key = {SubId, Stream},
NeedCreateStream =
case emqx_persistent_session_ds_state:get_stream(Key, S0) of case emqx_persistent_session_ds_state:get_stream(Key, S0) of
undefined -> undefined ->
true;
#srs{unsubscribed = true} ->
true;
_SRS ->
false
end,
case NeedCreateStream of
true ->
NewSRS = NewSRS =
#srs{ #srs{
rank_x = ?rank_x, rank_x = ?rank_x,
@ -338,7 +371,7 @@ accept_stream(
}, },
S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0), S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0),
S1; S1;
_SRS -> false ->
S0 S0
end end
end. end.
@ -371,22 +404,30 @@ revoke_stream(
emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds_state:t(),
t() t()
) -> {emqx_persistent_session_ds_state:t(), t()}. ) -> {emqx_persistent_session_ds_state:t(), t()}.
on_streams_replay(S, #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0) -> on_streams_replay(S0, SharedSubS0) ->
Progresses = stream_progresses(S), {S1, #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS1} =
renew_streams(S0, SharedSubS0),
Progresses = all_stream_progresses(S1, Agent0),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress( Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, Progresses Agent0, Progresses
), ),
{Agent2, ScheduledActions1} = run_scheduled_actions(S, Agent1, ScheduledActions0), {Agent2, ScheduledActions1} = run_scheduled_actions(S1, Agent1, ScheduledActions0),
SharedSubS1 = SharedSubS0#{ SharedSubS2 = SharedSubS1#{
agent => Agent2, agent => Agent2,
scheduled_actions => ScheduledActions1 scheduled_actions => ScheduledActions1
}, },
{S, SharedSubS1}. {S1, SharedSubS2}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% on_streams_replay internal functions %% on_streams_replay internal functions
stream_progresses(S) -> all_stream_progresses(S, Agent) ->
all_stream_progresses(S, Agent, _NeedUnacked = false).
all_stream_progresses(S, _Agent, NeedUnacked) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
fold_shared_stream_states( fold_shared_stream_states(
fun( fun(
#share{group = Group}, #share{group = Group},
@ -394,9 +435,12 @@ stream_progresses(S) ->
SRS, SRS,
ProgressesAcc0 ProgressesAcc0
) -> ) ->
case is_stream_fully_acked(S, SRS) of case
is_stream_started(CommQos1, CommQos2, SRS) and
(NeedUnacked or is_stream_fully_acked(CommQos1, CommQos2, SRS))
of
true -> true ->
StreamProgress = stream_progress(S, Stream, SRS), StreamProgress = stream_progress(CommQos1, CommQos2, Stream, SRS),
maps:update_with( maps:update_with(
Group, Group,
fun(Progresses) -> [StreamProgress | Progresses] end, fun(Progresses) -> [StreamProgress | Progresses] end,
@ -437,7 +481,7 @@ run_scheduled_action(
[] -> [] ->
?tp(warning, shared_subs_schedule_action_complete, #{ ?tp(warning, shared_subs_schedule_action_complete, #{
topic_filter => TopicFilter, topic_filter => TopicFilter,
progresses => format_streams(Progresses1), progresses => format_stream_progresses(Progresses1),
type => Type type => Type
}), }),
%% Regular progress won't se unsubscribed streams, so we need to %% Regular progress won't se unsubscribed streams, so we need to
@ -467,6 +511,8 @@ run_scheduled_action(
end. end.
filter_unfinished_streams(S, StreamKeysToWait) -> filter_unfinished_streams(S, StreamKeysToWait) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
lists:filter( lists:filter(
fun(Key) -> fun(Key) ->
case emqx_persistent_session_ds_state:get_stream(Key, S) of case emqx_persistent_session_ds_state:get_stream(Key, S) of
@ -475,21 +521,19 @@ filter_unfinished_streams(S, StreamKeysToWait) ->
%% in completed state before deletion %% in completed state before deletion
true; true;
SRS -> SRS ->
not is_stream_fully_acked(S, SRS) not is_stream_fully_acked(CommQos1, CommQos2, SRS)
end end
end, end,
StreamKeysToWait StreamKeysToWait
). ).
stream_progresses(S, StreamKeys) -> stream_progresses(S, StreamKeys) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
lists:map( lists:map(
fun({_SubId, Stream} = Key) -> fun({_SubId, Stream} = Key) ->
#srs{it_end = ItEnd} = SRS = emqx_persistent_session_ds_state:get_stream(Key, S), SRS = emqx_persistent_session_ds_state:get_stream(Key, S),
#{ stream_progress(CommQos1, CommQos2, Stream, SRS)
stream => Stream,
iterator => ItEnd,
use_finished => is_use_finished(S, SRS)
}
end, end,
StreamKeys StreamKeys
). ).
@ -499,7 +543,7 @@ stream_progresses(S, StreamKeys) ->
on_disconnect(S0, #{agent := Agent0} = SharedSubS0) -> on_disconnect(S0, #{agent := Agent0} = SharedSubS0) ->
S1 = revoke_all_streams(S0), S1 = revoke_all_streams(S0),
Progresses = stream_progresses(S1), Progresses = all_stream_progresses(S1, Agent0),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses),
SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}}, SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}},
{S1, SharedSubS1}. {S1, SharedSubS1}.
@ -565,12 +609,41 @@ stream_keys_by_sub_id(S, MatchSubId) ->
S S
). ).
stream_progress(S, Stream, #srs{it_end = EndIt} = SRS) -> stream_progress(
CommQos1,
CommQos2,
Stream,
#srs{
it_end = EndIt,
it_begin = BeginIt,
first_seqno_qos1 = StartQos1,
first_seqno_qos2 = StartQos2
} = SRS
) ->
Qos1Acked = seqno_diff(?QOS_1, CommQos1, StartQos1),
Qos2Acked = seqno_diff(?QOS_2, CommQos2, StartQos2),
case is_stream_fully_acked(CommQos1, CommQos2, SRS) of
true ->
#{ #{
stream => Stream, stream => Stream,
iterator => EndIt, progress => #{
use_finished => is_use_finished(S, SRS) acked => true,
}. iterator => EndIt
},
use_finished => is_use_finished(SRS)
};
false ->
#{
stream => Stream,
progress => #{
acked => false,
iterator => BeginIt,
qos1_acked => Qos1Acked,
qos2_acked => Qos2Acked
},
use_finished => is_use_finished(SRS)
}
end.
fold_shared_subs(Fun, Acc, S) -> fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions( emqx_persistent_session_ds_state:fold_subscriptions(
@ -618,11 +691,30 @@ agent_opts(#{session_id := SessionId}) ->
now_ms() -> now_ms() ->
erlang:system_time(millisecond). erlang:system_time(millisecond).
is_use_finished(_S, #srs{unsubscribed = Unsubscribed}) -> is_use_finished(#srs{unsubscribed = Unsubscribed}) ->
Unsubscribed. Unsubscribed.
is_stream_fully_acked(S, SRS) -> is_stream_started(CommQos1, CommQos2, #srs{first_seqno_qos1 = Q1, last_seqno_qos1 = Q2}) ->
emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S). (CommQos1 >= Q1) or (CommQos2 >= Q2).
is_stream_fully_acked(_, _, #srs{
first_seqno_qos1 = Q1, last_seqno_qos1 = Q1, first_seqno_qos2 = Q2, last_seqno_qos2 = Q2
}) ->
%% Streams where the last chunk doesn't contain any QoS1 and 2
%% messages are considered fully acked:
true;
is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
(Comm1 >= S1) andalso (Comm2 >= S2).
-dialyzer({nowarn_function, seqno_diff/3}).
seqno_diff(?QOS_1, A, B) ->
%% For QoS1 messages we skip a seqno every time the epoch changes,
%% we need to substract that from the diff:
EpochA = A bsr ?EPOCH_BITS,
EpochB = B bsr ?EPOCH_BITS,
A - B - (EpochA - EpochB);
seqno_diff(?QOS_2, A, B) ->
A - B.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Formatters %% Formatters
@ -633,21 +725,24 @@ format_schedule_action(#{
}) -> }) ->
#{ #{
type => Type, type => Type,
progresses => format_streams(Progresses), progresses => format_stream_progresses(Progresses),
stream_keys_to_wait => format_stream_keys(StreamKeysToWait) stream_keys_to_wait => format_stream_keys(StreamKeysToWait)
}. }.
format_streams(Streams) -> format_stream_progresses(Streams) ->
lists:map( lists:map(
fun format_stream/1, fun format_stream_progress/1,
Streams Streams
). ).
format_stream(#{stream := Stream, iterator := Iterator} = Value) -> format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}. Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
format_stream_key({SubId, Stream}) -> format_progress(#{iterator := Iterator} = Progress) ->
{SubId, format_opaque(Stream)}. Progress#{iterator => format_opaque(Iterator)}.
format_stream_key(beginning) -> beginning;
format_stream_key({SubId, Stream}) -> {SubId, format_opaque(Stream)}.
format_stream_keys(StreamKeys) -> format_stream_keys(StreamKeys) ->
lists:map( lists:map(
@ -655,5 +750,16 @@ format_stream_keys(StreamKeys) ->
StreamKeys StreamKeys
). ).
format_lease_events(Events) ->
lists:map(
fun format_lease_event/1,
Events
).
format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
format_lease_event(#{stream := Stream} = Event) ->
Event#{stream => format_opaque(Stream)}.
format_opaque(Opaque) -> format_opaque(Opaque) ->
erlang:phash2(Opaque). erlang:phash2(Opaque).

View File

@ -45,11 +45,13 @@
send_after := fun((non_neg_integer(), term()) -> reference()) send_after := fun((non_neg_integer(), term()) -> reference())
}. }.
-type progress() :: emqx_persistent_session_ds_shared_subs:progress().
-type stream_lease_event() :: -type stream_lease_event() ::
#{ #{
type => lease, type => lease,
stream => emqx_ds:stream(), stream => emqx_ds:stream(),
iterator => emqx_ds:iterator() progress => progress()
} }
| #{ | #{
type => revoke, type => revoke,
@ -60,7 +62,7 @@
#{ #{
type => lease, type => lease,
stream => emqx_ds:stream(), stream => emqx_ds:stream(),
iterator => emqx_ds:iterator(), progress => progress(),
topic_filter => emqx_persistent_session_ds:share_topic_filter() topic_filter => emqx_persistent_session_ds:share_topic_filter()
} }
| #{ | #{
@ -81,13 +83,13 @@
-type connecting_data() :: #{}. -type connecting_data() :: #{}.
-type replaying_data() :: #{ -type replaying_data() :: #{
leader => emqx_ds_shared_sub_proto:leader(), leader => emqx_ds_shared_sub_proto:leader(),
streams => #{emqx_ds:stream() => emqx_ds:iterator()}, streams => #{emqx_ds:stream() => progress()},
version => emqx_ds_shared_sub_proto:version(), version => emqx_ds_shared_sub_proto:version(),
prev_version => undefined prev_version => undefined
}. }.
-type updating_data() :: #{ -type updating_data() :: #{
leader => emqx_ds_shared_sub_proto:leader(), leader => emqx_ds_shared_sub_proto:leader(),
streams => #{emqx_ds:stream() => emqx_ds:iterator()}, streams => #{emqx_ds:stream() => progress()},
version => emqx_ds_shared_sub_proto:version(), version => emqx_ds_shared_sub_proto:version(),
prev_version => emqx_ds_shared_sub_proto:version() prev_version => emqx_ds_shared_sub_proto:version()
}. }.
@ -275,18 +277,18 @@ handle_leader_update_streams(
id => Id, id => Id,
version_old => VersionOld, version_old => VersionOld,
version_new => VersionNew, version_new => VersionNew,
stream_progresses => emqx_ds_shared_sub_proto:format_streams(StreamProgresses) stream_progresses => emqx_ds_shared_sub_proto:format_stream_progresses(StreamProgresses)
}), }),
{AddEvents, Streams1} = lists:foldl( {AddEvents, Streams1} = lists:foldl(
fun(#{stream := Stream, iterator := It}, {AddEventAcc, StreamsAcc}) -> fun(#{stream := Stream, progress := Progress}, {AddEventAcc, StreamsAcc}) ->
case maps:is_key(Stream, StreamsAcc) of case maps:is_key(Stream, StreamsAcc) of
true -> true ->
%% We prefer our own progress %% We prefer our own progress
{AddEventAcc, StreamsAcc}; {AddEventAcc, StreamsAcc};
false -> false ->
{ {
[#{type => lease, stream => Stream, iterator => It} | AddEventAcc], [#{type => lease, stream => Stream, progress => Progress} | AddEventAcc],
StreamsAcc#{Stream => It} StreamsAcc#{Stream => Progress}
} }
end end
end, end,
@ -310,6 +312,10 @@ handle_leader_update_streams(
maps:keys(Streams1) maps:keys(Streams1)
), ),
StreamLeaseEvents = AddEvents ++ RevokeEvents, StreamLeaseEvents = AddEvents ++ RevokeEvents,
?tp(warning, shared_sub_group_sm_leader_update_streams, #{
id => Id,
stream_lease_events => emqx_ds_shared_sub_proto:format_lease_events(StreamLeaseEvents)
}),
transition( transition(
GSM, GSM,
?updating, ?updating,
@ -540,11 +546,11 @@ run_enter_callback(#{state := ?disconnected} = GSM) ->
progresses_to_lease_events(StreamProgresses) -> progresses_to_lease_events(StreamProgresses) ->
lists:map( lists:map(
fun(#{stream := Stream, iterator := It}) -> fun(#{stream := Stream, progress := Progress}) ->
#{ #{
type => lease, type => lease,
stream => Stream, stream => Stream,
iterator => It progress => Progress
} }
end, end,
StreamProgresses StreamProgresses
@ -552,8 +558,8 @@ progresses_to_lease_events(StreamProgresses) ->
progresses_to_map(StreamProgresses) -> progresses_to_map(StreamProgresses) ->
lists:foldl( lists:foldl(
fun(#{stream := Stream, iterator := It}, Acc) -> fun(#{stream := Stream, progress := Progress}, Acc) ->
Acc#{Stream => It} Acc#{Stream => Progress}
end, end,
#{}, #{},
StreamProgresses StreamProgresses

View File

@ -49,8 +49,10 @@
revoked_streams := list(emqx_ds:stream()) revoked_streams := list(emqx_ds:stream())
}. }.
-type progress() :: emqx_persistent_session_ds_shared_subs:progress().
-type stream_state() :: #{ -type stream_state() :: #{
iterator => emqx_ds:iterator(), progress => progress(),
rank => emqx_ds:stream_rank() rank => emqx_ds:stream_rank()
}. }.
@ -84,7 +86,8 @@
-export_type([ -export_type([
options/0, options/0,
data/0 data/0,
progress/0
]). ]).
%% States %% States
@ -310,8 +313,12 @@ update_progresses(StreamStates, NewStreamsWRanks, TopicFilter, StartTime) ->
{ok, It} = emqx_ds:make_iterator( {ok, It} = emqx_ds:make_iterator(
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
), ),
Progress = #{
iterator => It,
acked => true
},
{ {
NewStreamStatesAcc#{Stream => #{iterator => It, rank => Rank}}, NewStreamStatesAcc#{Stream => #{progress => Progress, rank => Rank}},
OldStreamStatesAcc OldStreamStatesAcc
} }
end end
@ -637,18 +644,18 @@ update_stream_progresses(
ReceivedStreamProgresses ReceivedStreamProgresses
) -> ) ->
{StreamStates1, ReplayedStreams} = lists:foldl( {StreamStates1, ReplayedStreams} = lists:foldl(
fun(#{stream := Stream, iterator := It}, {StreamStatesAcc, ReplayedStreamsAcc}) -> fun(#{stream := Stream, progress := Progress}, {StreamStatesAcc, ReplayedStreamsAcc}) ->
case StreamOwners of case StreamOwners of
#{Stream := Agent} -> #{Stream := Agent} ->
StreamData0 = maps:get(Stream, StreamStatesAcc), StreamData0 = maps:get(Stream, StreamStatesAcc),
case It of case Progress of
end_of_stream -> #{iterator := end_of_stream} ->
Rank = maps:get(rank, StreamData0), Rank = maps:get(rank, StreamData0),
{maps:remove(Stream, StreamStatesAcc), ReplayedStreamsAcc#{ {maps:remove(Stream, StreamStatesAcc), ReplayedStreamsAcc#{
Stream => Rank Stream => Rank
}}; }};
_ -> _ ->
StreamData1 = StreamData0#{iterator => It}, StreamData1 = StreamData0#{progress => Progress},
{StreamStatesAcc#{Stream => StreamData1}, ReplayedStreamsAcc} {StreamStatesAcc#{Stream => StreamData1}, ReplayedStreamsAcc}
end; end;
_ -> _ ->
@ -701,6 +708,9 @@ clean_revoked_streams(
( (
#{ #{
stream := Stream, stream := Stream,
progress := #{
acked := true
},
use_finished := true use_finished := true
} }
) -> ) ->
@ -953,7 +963,7 @@ stream_progresses(#{stream_states := StreamStates} = _Data, Streams) ->
StreamData = maps:get(Stream, StreamStates), StreamData = maps:get(Stream, StreamStates),
#{ #{
stream => Stream, stream => Stream,
iterator => maps:get(iterator, StreamData) progress => maps:get(progress, StreamData)
} }
end, end,
Streams Streams

View File

@ -22,10 +22,12 @@
]). ]).
-export([ -export([
format_streams/1, format_stream_progresses/1,
format_stream/1, format_stream_progress/1,
format_stream_key/1, format_stream_key/1,
format_stream_keys/1, format_stream_keys/1,
format_lease_event/1,
format_lease_events/1,
agent/2 agent/2
]). ]).
@ -38,23 +40,19 @@
id := emqx_persistent_session_ds:id() id := emqx_persistent_session_ds:id()
}. }.
-type stream_progress() :: #{ -type leader_stream_progress() :: #{
stream := emqx_ds:stream(), stream := emqx_ds:stream(),
iterator := emqx_ds:iterator() progress := emqx_persistent_session_ds_shared_subs:progress()
}. }.
-type agent_stream_progress() :: #{ -type agent_stream_progress() :: emqx_persistent_session_ds_shared_subs:agent_stream_progress().
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator(),
use_finished := boolean()
}.
-export_type([ -export_type([
agent/0, agent/0,
leader/0, leader/0,
group/0, group/0,
version/0, version/0,
stream_progress/0, leader_stream_progress/0,
agent_stream_progress/0, agent_stream_progress/0,
agent_metadata/0 agent_metadata/0
]). ]).
@ -91,7 +89,7 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) when
type => agent_update_stream_states, type => agent_update_stream_states,
to_leader => ToLeader, to_leader => ToLeader,
from_agent => FromAgent, from_agent => FromAgent,
stream_progresses => format_streams(StreamProgresses), stream_progresses => format_stream_progresses(StreamProgresses),
version => Version version => Version
}), }),
_ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)), _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)),
@ -111,7 +109,7 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve
type => agent_update_stream_states, type => agent_update_stream_states,
to_leader => ToLeader, to_leader => ToLeader,
from_agent => FromAgent, from_agent => FromAgent,
stream_progresses => format_streams(StreamProgresses), stream_progresses => format_stream_progresses(StreamProgresses),
version_old => VersionOld, version_old => VersionOld,
version_new => VersionNew version_new => VersionNew
}), }),
@ -131,7 +129,7 @@ agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) when
type => agent_disconnect, type => agent_disconnect,
to_leader => ToLeader, to_leader => ToLeader,
from_agent => FromAgent, from_agent => FromAgent,
stream_progresses => format_streams(StreamProgresses), stream_progresses => format_stream_progresses(StreamProgresses),
version => Version version => Version
}), }),
_ = erlang:send(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)), _ = erlang:send(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)),
@ -143,14 +141,15 @@ agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) ->
%% leader -> agent messages %% leader -> agent messages
-spec leader_lease_streams(agent(), group(), leader(), list(stream_progress()), version()) -> ok. -spec leader_lease_streams(agent(), group(), leader(), list(leader_stream_progress()), version()) ->
ok.
leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) -> leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{ ?tp(warning, shared_sub_proto_msg, #{
type => leader_lease_streams, type => leader_lease_streams,
to_agent => ToAgent, to_agent => ToAgent,
of_group => OfGroup, of_group => OfGroup,
leader => Leader, leader => Leader,
streams => format_streams(Streams), streams => format_stream_progresses(Streams),
version => Version version => Version
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
@ -200,7 +199,8 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew
). ).
-spec leader_update_streams(agent(), group(), version(), version(), list(stream_progress())) -> ok. -spec leader_update_streams(agent(), group(), version(), version(), list(leader_stream_progress())) ->
ok.
leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when
?is_local_agent(ToAgent) ?is_local_agent(ToAgent)
-> ->
@ -210,7 +210,7 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when
of_group => OfGroup, of_group => OfGroup,
version_old => VersionOld, version_old => VersionOld,
version_new => VersionNew, version_new => VersionNew,
streams_new => format_streams(StreamsNew) streams_new => format_stream_progresses(StreamsNew)
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
?agent_pid(ToAgent), ?agent_pid(ToAgent),
@ -247,14 +247,17 @@ agent(Id, Pid) ->
_ = Id, _ = Id,
?agent(Id, Pid). ?agent(Id, Pid).
format_streams(Streams) -> format_stream_progresses(Streams) ->
lists:map( lists:map(
fun format_stream/1, fun format_stream_progress/1,
Streams Streams
). ).
format_stream(#{stream := Stream, iterator := Iterator} = Value) -> format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}. Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
format_progress(#{iterator := Iterator} = Progress) ->
Progress#{iterator => format_opaque(Iterator)}.
format_stream_key({SubId, Stream}) -> format_stream_key({SubId, Stream}) ->
{SubId, format_opaque(Stream)}. {SubId, format_opaque(Stream)}.
@ -265,6 +268,17 @@ format_stream_keys(StreamKeys) ->
StreamKeys StreamKeys
). ).
format_lease_events(Events) ->
lists:map(
fun format_lease_event/1,
Events
).
format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
format_lease_event(#{stream := Stream} = Event) ->
Event#{stream => format_opaque(Stream)}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helpers %% Helpers
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -82,7 +82,7 @@ agent_disconnect(Node, ToLeader, FromAgent, StreamProgresses, Version) ->
emqx_ds_shared_sub_proto:agent(), emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:group(), emqx_ds_shared_sub_proto:group(),
emqx_ds_shared_sub_proto:leader(), emqx_ds_shared_sub_proto:leader(),
list(emqx_ds_shared_sub_proto:stream_progress()), list(emqx_ds_shared_sub_proto:leader_stream_progress()),
emqx_ds_shared_sub_proto:version() emqx_ds_shared_sub_proto:version()
) -> ok. ) -> ok.
leader_lease_streams(Node, ToAgent, OfGroup, Leader, Streams, Version) -> leader_lease_streams(Node, ToAgent, OfGroup, Leader, Streams, Version) ->
@ -117,7 +117,7 @@ leader_renew_stream_lease(Node, ToAgent, OfGroup, VersionOld, VersionNew) ->
emqx_ds_shared_sub_proto:group(), emqx_ds_shared_sub_proto:group(),
emqx_ds_shared_sub_proto:version(), emqx_ds_shared_sub_proto:version(),
emqx_ds_shared_sub_proto:version(), emqx_ds_shared_sub_proto:version(),
list(emqx_ds_shared_sub_proto:stream_progress()) list(emqx_ds_shared_sub_proto:leader_stream_progress())
) -> ok. ) -> ok.
leader_update_streams(Node, ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> leader_update_streams(Node, ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, leader_update_streams, [ erpc:cast(Node, emqx_ds_shared_sub_proto, leader_update_streams, [

View File

@ -183,6 +183,51 @@ t_graceful_disconnect(_Config) ->
ok = emqtt:disconnect(ConnShared2), ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub). ok = emqtt:disconnect(ConnPub).
t_disconnect_no_double_replay(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr9/topic9/#">>, 1),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr9/topic9/#">>, 1),
ct:sleep(1000),
NPubs = 10_000,
Topics = [<<"topic9/1">>, <<"topic9/2">>, <<"topic9/3">>],
ok = publish_n(ConnPub, Topics, 1, NPubs),
Self = self(),
_ = spawn_link(fun() ->
ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs),
Self ! publish_done
end),
ok = emqtt:disconnect(ConnShared2),
receive
publish_done -> ok
end,
Pubs = drain_publishes(),
ClientByBid = fun(Pid) ->
case Pid of
ConnShared1 -> <<"client_shared1">>;
ConnShared2 -> <<"client_shared2">>
end
end,
{Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
?assertEqual([], Missing),
?assertEqual([], Duplicate),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnPub).
t_intensive_reassign(_Config) -> t_intensive_reassign(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>), ConnPub = emqtt_connect_pub(<<"client_pub">>),