From 7daab1ab2325f394b20e8fcdc50539ab6dcb395e Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Mon, 8 Jul 2024 19:57:36 +0300 Subject: [PATCH] feat(queue): move replay progress to a separate data structure --- apps/emqx/src/emqx_persistent_session_ds.erl | 7 +- ...emqx_persistent_session_ds_shared_subs.erl | 184 ++++++++++++++---- .../src/emqx_ds_shared_sub_group_sm.erl | 30 +-- .../src/emqx_ds_shared_sub_leader.erl | 26 ++- .../src/emqx_ds_shared_sub_proto.erl | 56 ++++-- .../src/proto/emqx_ds_shared_sub_proto_v1.erl | 4 +- .../test/emqx_ds_shared_sub_SUITE.erl | 45 +++++ 7 files changed, 267 insertions(+), 85 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 517681f9a..124b1919a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -993,11 +993,12 @@ do_ensure_all_iterators_closed(_DSSessionID) -> fetch_new_messages(Session0 = #{s := S0, shared_sub_s := SharedSubS0}, ClientInfo) -> {S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0), - LFS = maps:get(last_fetched_stream, Session0, beginning), + Session1 = Session0#{s => S1, shared_sub_s => SharedSubS1}, + LFS = maps:get(last_fetched_stream, Session1, beginning), ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S1), BatchSize = get_config(ClientInfo, [batch_size]), - Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo), - Session1#{shared_sub_s => SharedSubS1}. + Session2 = fetch_new_messages(ItStream, BatchSize, Session1, ClientInfo), + Session2#{shared_sub_s => SharedSubS1}. fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) -> #{inflight := Inflight} = Session0, diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 0bdbff30a..bb4c62726 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -48,6 +48,8 @@ to_map/2 ]). +-define(EPOCH_BITS, 15). + -define(schedule_subscribe, schedule_subscribe). -define(schedule_unsubscribe, schedule_unsubscribe). @@ -58,10 +60,22 @@ -type agent_stream_progress() :: #{ stream := emqx_ds:stream(), - iterator := emqx_ds:iterator(), + progress := progress(), use_finished := boolean() }. +-type progress() :: + #{ + acked := true, + iterator := emqx_ds:iterator() + } + | #{ + acked := false, + iterator := emqx_ds:iterator(), + qos1_acked := boolean(), + qos2_acked := boolean() + }. + -type scheduled_action() :: #{ type := scheduled_action_type(), stream_keys_to_wait := [stream_key()], @@ -82,6 +96,11 @@ -define(rank_x, rank_shared). -define(rank_y, 0). +-export_type([ + progress/0, + agent_stream_progress/0 +]). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -290,7 +309,9 @@ renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = Sh {StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams( Agent0 ), - ?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}), + ?tp(warning, shared_subs_new_stream_lease_events, #{ + stream_lease_events => format_lease_events(StreamLeaseEvents) + }), S1 = lists:foldl( fun (#{type := lease} = Event, S) -> accept_stream(Event, S, ScheduledActions); @@ -317,8 +338,11 @@ accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) -> accept_stream(Event, S) end. +%% TODO: +%% handle unacked iterator accept_stream( - #{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0 + #{topic_filter := TopicFilter, stream := Stream, progress := #{iterator := Iterator}} = _Event, + S0 ) -> case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of undefined -> @@ -326,8 +350,17 @@ accept_stream( S0; #{id := SubId, current_state := SStateId} -> Key = {SubId, Stream}, - case emqx_persistent_session_ds_state:get_stream(Key, S0) of - undefined -> + NeedCreateStream = + case emqx_persistent_session_ds_state:get_stream(Key, S0) of + undefined -> + true; + #srs{unsubscribed = true} -> + true; + _SRS -> + false + end, + case NeedCreateStream of + true -> NewSRS = #srs{ rank_x = ?rank_x, @@ -338,7 +371,7 @@ accept_stream( }, S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0), S1; - _SRS -> + false -> S0 end end. @@ -371,22 +404,30 @@ revoke_stream( emqx_persistent_session_ds_state:t(), t() ) -> {emqx_persistent_session_ds_state:t(), t()}. -on_streams_replay(S, #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0) -> - Progresses = stream_progresses(S), +on_streams_replay(S0, SharedSubS0) -> + {S1, #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS1} = + renew_streams(S0, SharedSubS0), + + Progresses = all_stream_progresses(S1, Agent0), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress( Agent0, Progresses ), - {Agent2, ScheduledActions1} = run_scheduled_actions(S, Agent1, ScheduledActions0), - SharedSubS1 = SharedSubS0#{ + {Agent2, ScheduledActions1} = run_scheduled_actions(S1, Agent1, ScheduledActions0), + SharedSubS2 = SharedSubS1#{ agent => Agent2, scheduled_actions => ScheduledActions1 }, - {S, SharedSubS1}. + {S1, SharedSubS2}. %%-------------------------------------------------------------------- %% on_streams_replay internal functions -stream_progresses(S) -> +all_stream_progresses(S, Agent) -> + all_stream_progresses(S, Agent, _NeedUnacked = false). + +all_stream_progresses(S, _Agent, NeedUnacked) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), fold_shared_stream_states( fun( #share{group = Group}, @@ -394,9 +435,12 @@ stream_progresses(S) -> SRS, ProgressesAcc0 ) -> - case is_stream_fully_acked(S, SRS) of + case + is_stream_started(CommQos1, CommQos2, SRS) and + (NeedUnacked or is_stream_fully_acked(CommQos1, CommQos2, SRS)) + of true -> - StreamProgress = stream_progress(S, Stream, SRS), + StreamProgress = stream_progress(CommQos1, CommQos2, Stream, SRS), maps:update_with( Group, fun(Progresses) -> [StreamProgress | Progresses] end, @@ -437,7 +481,7 @@ run_scheduled_action( [] -> ?tp(warning, shared_subs_schedule_action_complete, #{ topic_filter => TopicFilter, - progresses => format_streams(Progresses1), + progresses => format_stream_progresses(Progresses1), type => Type }), %% Regular progress won't se unsubscribed streams, so we need to @@ -467,6 +511,8 @@ run_scheduled_action( end. filter_unfinished_streams(S, StreamKeysToWait) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), lists:filter( fun(Key) -> case emqx_persistent_session_ds_state:get_stream(Key, S) of @@ -475,21 +521,19 @@ filter_unfinished_streams(S, StreamKeysToWait) -> %% in completed state before deletion true; SRS -> - not is_stream_fully_acked(S, SRS) + not is_stream_fully_acked(CommQos1, CommQos2, SRS) end end, StreamKeysToWait ). stream_progresses(S, StreamKeys) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), lists:map( fun({_SubId, Stream} = Key) -> - #srs{it_end = ItEnd} = SRS = emqx_persistent_session_ds_state:get_stream(Key, S), - #{ - stream => Stream, - iterator => ItEnd, - use_finished => is_use_finished(S, SRS) - } + SRS = emqx_persistent_session_ds_state:get_stream(Key, S), + stream_progress(CommQos1, CommQos2, Stream, SRS) end, StreamKeys ). @@ -499,7 +543,7 @@ stream_progresses(S, StreamKeys) -> on_disconnect(S0, #{agent := Agent0} = SharedSubS0) -> S1 = revoke_all_streams(S0), - Progresses = stream_progresses(S1), + Progresses = all_stream_progresses(S1, Agent0), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses), SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}}, {S1, SharedSubS1}. @@ -565,12 +609,41 @@ stream_keys_by_sub_id(S, MatchSubId) -> S ). -stream_progress(S, Stream, #srs{it_end = EndIt} = SRS) -> - #{ - stream => Stream, - iterator => EndIt, - use_finished => is_use_finished(S, SRS) - }. +stream_progress( + CommQos1, + CommQos2, + Stream, + #srs{ + it_end = EndIt, + it_begin = BeginIt, + first_seqno_qos1 = StartQos1, + first_seqno_qos2 = StartQos2 + } = SRS +) -> + Qos1Acked = seqno_diff(?QOS_1, CommQos1, StartQos1), + Qos2Acked = seqno_diff(?QOS_2, CommQos2, StartQos2), + case is_stream_fully_acked(CommQos1, CommQos2, SRS) of + true -> + #{ + stream => Stream, + progress => #{ + acked => true, + iterator => EndIt + }, + use_finished => is_use_finished(SRS) + }; + false -> + #{ + stream => Stream, + progress => #{ + acked => false, + iterator => BeginIt, + qos1_acked => Qos1Acked, + qos2_acked => Qos2Acked + }, + use_finished => is_use_finished(SRS) + } + end. fold_shared_subs(Fun, Acc, S) -> emqx_persistent_session_ds_state:fold_subscriptions( @@ -618,11 +691,30 @@ agent_opts(#{session_id := SessionId}) -> now_ms() -> erlang:system_time(millisecond). -is_use_finished(_S, #srs{unsubscribed = Unsubscribed}) -> +is_use_finished(#srs{unsubscribed = Unsubscribed}) -> Unsubscribed. -is_stream_fully_acked(S, SRS) -> - emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S). +is_stream_started(CommQos1, CommQos2, #srs{first_seqno_qos1 = Q1, last_seqno_qos1 = Q2}) -> + (CommQos1 >= Q1) or (CommQos2 >= Q2). + +is_stream_fully_acked(_, _, #srs{ + first_seqno_qos1 = Q1, last_seqno_qos1 = Q1, first_seqno_qos2 = Q2, last_seqno_qos2 = Q2 +}) -> + %% Streams where the last chunk doesn't contain any QoS1 and 2 + %% messages are considered fully acked: + true; +is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) -> + (Comm1 >= S1) andalso (Comm2 >= S2). + +-dialyzer({nowarn_function, seqno_diff/3}). +seqno_diff(?QOS_1, A, B) -> + %% For QoS1 messages we skip a seqno every time the epoch changes, + %% we need to substract that from the diff: + EpochA = A bsr ?EPOCH_BITS, + EpochB = B bsr ?EPOCH_BITS, + A - B - (EpochA - EpochB); +seqno_diff(?QOS_2, A, B) -> + A - B. %%-------------------------------------------------------------------- %% Formatters @@ -633,21 +725,24 @@ format_schedule_action(#{ }) -> #{ type => Type, - progresses => format_streams(Progresses), + progresses => format_stream_progresses(Progresses), stream_keys_to_wait => format_stream_keys(StreamKeysToWait) }. -format_streams(Streams) -> +format_stream_progresses(Streams) -> lists:map( - fun format_stream/1, + fun format_stream_progress/1, Streams ). -format_stream(#{stream := Stream, iterator := Iterator} = Value) -> - Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}. +format_stream_progress(#{stream := Stream, progress := Progress} = Value) -> + Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}. -format_stream_key({SubId, Stream}) -> - {SubId, format_opaque(Stream)}. +format_progress(#{iterator := Iterator} = Progress) -> + Progress#{iterator => format_opaque(Iterator)}. + +format_stream_key(beginning) -> beginning; +format_stream_key({SubId, Stream}) -> {SubId, format_opaque(Stream)}. format_stream_keys(StreamKeys) -> lists:map( @@ -655,5 +750,16 @@ format_stream_keys(StreamKeys) -> StreamKeys ). +format_lease_events(Events) -> + lists:map( + fun format_lease_event/1, + Events + ). + +format_lease_event(#{stream := Stream, progress := Progress} = Event) -> + Event#{stream => format_opaque(Stream), progress => format_progress(Progress)}; +format_lease_event(#{stream := Stream} = Event) -> + Event#{stream => format_opaque(Stream)}. + format_opaque(Opaque) -> erlang:phash2(Opaque). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index 81bca367a..7d260dc0b 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -45,11 +45,13 @@ send_after := fun((non_neg_integer(), term()) -> reference()) }. +-type progress() :: emqx_persistent_session_ds_shared_subs:progress(). + -type stream_lease_event() :: #{ type => lease, stream => emqx_ds:stream(), - iterator => emqx_ds:iterator() + progress => progress() } | #{ type => revoke, @@ -60,7 +62,7 @@ #{ type => lease, stream => emqx_ds:stream(), - iterator => emqx_ds:iterator(), + progress => progress(), topic_filter => emqx_persistent_session_ds:share_topic_filter() } | #{ @@ -81,13 +83,13 @@ -type connecting_data() :: #{}. -type replaying_data() :: #{ leader => emqx_ds_shared_sub_proto:leader(), - streams => #{emqx_ds:stream() => emqx_ds:iterator()}, + streams => #{emqx_ds:stream() => progress()}, version => emqx_ds_shared_sub_proto:version(), prev_version => undefined }. -type updating_data() :: #{ leader => emqx_ds_shared_sub_proto:leader(), - streams => #{emqx_ds:stream() => emqx_ds:iterator()}, + streams => #{emqx_ds:stream() => progress()}, version => emqx_ds_shared_sub_proto:version(), prev_version => emqx_ds_shared_sub_proto:version() }. @@ -275,18 +277,18 @@ handle_leader_update_streams( id => Id, version_old => VersionOld, version_new => VersionNew, - stream_progresses => emqx_ds_shared_sub_proto:format_streams(StreamProgresses) + stream_progresses => emqx_ds_shared_sub_proto:format_stream_progresses(StreamProgresses) }), {AddEvents, Streams1} = lists:foldl( - fun(#{stream := Stream, iterator := It}, {AddEventAcc, StreamsAcc}) -> + fun(#{stream := Stream, progress := Progress}, {AddEventAcc, StreamsAcc}) -> case maps:is_key(Stream, StreamsAcc) of true -> %% We prefer our own progress {AddEventAcc, StreamsAcc}; false -> { - [#{type => lease, stream => Stream, iterator => It} | AddEventAcc], - StreamsAcc#{Stream => It} + [#{type => lease, stream => Stream, progress => Progress} | AddEventAcc], + StreamsAcc#{Stream => Progress} } end end, @@ -310,6 +312,10 @@ handle_leader_update_streams( maps:keys(Streams1) ), StreamLeaseEvents = AddEvents ++ RevokeEvents, + ?tp(warning, shared_sub_group_sm_leader_update_streams, #{ + id => Id, + stream_lease_events => emqx_ds_shared_sub_proto:format_lease_events(StreamLeaseEvents) + }), transition( GSM, ?updating, @@ -540,11 +546,11 @@ run_enter_callback(#{state := ?disconnected} = GSM) -> progresses_to_lease_events(StreamProgresses) -> lists:map( - fun(#{stream := Stream, iterator := It}) -> + fun(#{stream := Stream, progress := Progress}) -> #{ type => lease, stream => Stream, - iterator => It + progress => Progress } end, StreamProgresses @@ -552,8 +558,8 @@ progresses_to_lease_events(StreamProgresses) -> progresses_to_map(StreamProgresses) -> lists:foldl( - fun(#{stream := Stream, iterator := It}, Acc) -> - Acc#{Stream => It} + fun(#{stream := Stream, progress := Progress}, Acc) -> + Acc#{Stream => Progress} end, #{}, StreamProgresses diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 143eed1fe..976ce2437 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -49,8 +49,10 @@ revoked_streams := list(emqx_ds:stream()) }. +-type progress() :: emqx_persistent_session_ds_shared_subs:progress(). + -type stream_state() :: #{ - iterator => emqx_ds:iterator(), + progress => progress(), rank => emqx_ds:stream_rank() }. @@ -84,7 +86,8 @@ -export_type([ options/0, - data/0 + data/0, + progress/0 ]). %% States @@ -310,8 +313,12 @@ update_progresses(StreamStates, NewStreamsWRanks, TopicFilter, StartTime) -> {ok, It} = emqx_ds:make_iterator( ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ), + Progress = #{ + iterator => It, + acked => true + }, { - NewStreamStatesAcc#{Stream => #{iterator => It, rank => Rank}}, + NewStreamStatesAcc#{Stream => #{progress => Progress, rank => Rank}}, OldStreamStatesAcc } end @@ -637,18 +644,18 @@ update_stream_progresses( ReceivedStreamProgresses ) -> {StreamStates1, ReplayedStreams} = lists:foldl( - fun(#{stream := Stream, iterator := It}, {StreamStatesAcc, ReplayedStreamsAcc}) -> + fun(#{stream := Stream, progress := Progress}, {StreamStatesAcc, ReplayedStreamsAcc}) -> case StreamOwners of #{Stream := Agent} -> StreamData0 = maps:get(Stream, StreamStatesAcc), - case It of - end_of_stream -> + case Progress of + #{iterator := end_of_stream} -> Rank = maps:get(rank, StreamData0), {maps:remove(Stream, StreamStatesAcc), ReplayedStreamsAcc#{ Stream => Rank }}; _ -> - StreamData1 = StreamData0#{iterator => It}, + StreamData1 = StreamData0#{progress => Progress}, {StreamStatesAcc#{Stream => StreamData1}, ReplayedStreamsAcc} end; _ -> @@ -701,6 +708,9 @@ clean_revoked_streams( ( #{ stream := Stream, + progress := #{ + acked := true + }, use_finished := true } ) -> @@ -953,7 +963,7 @@ stream_progresses(#{stream_states := StreamStates} = _Data, Streams) -> StreamData = maps:get(Stream, StreamStates), #{ stream => Stream, - iterator => maps:get(iterator, StreamData) + progress => maps:get(progress, StreamData) } end, Streams diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index 184e8d147..e74fae19c 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -22,10 +22,12 @@ ]). -export([ - format_streams/1, - format_stream/1, + format_stream_progresses/1, + format_stream_progress/1, format_stream_key/1, format_stream_keys/1, + format_lease_event/1, + format_lease_events/1, agent/2 ]). @@ -38,23 +40,19 @@ id := emqx_persistent_session_ds:id() }. --type stream_progress() :: #{ +-type leader_stream_progress() :: #{ stream := emqx_ds:stream(), - iterator := emqx_ds:iterator() + progress := emqx_persistent_session_ds_shared_subs:progress() }. --type agent_stream_progress() :: #{ - stream := emqx_ds:stream(), - iterator := emqx_ds:iterator(), - use_finished := boolean() -}. +-type agent_stream_progress() :: emqx_persistent_session_ds_shared_subs:agent_stream_progress(). -export_type([ agent/0, leader/0, group/0, version/0, - stream_progress/0, + leader_stream_progress/0, agent_stream_progress/0, agent_metadata/0 ]). @@ -91,7 +89,7 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) when type => agent_update_stream_states, to_leader => ToLeader, from_agent => FromAgent, - stream_progresses => format_streams(StreamProgresses), + stream_progresses => format_stream_progresses(StreamProgresses), version => Version }), _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)), @@ -111,7 +109,7 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve type => agent_update_stream_states, to_leader => ToLeader, from_agent => FromAgent, - stream_progresses => format_streams(StreamProgresses), + stream_progresses => format_stream_progresses(StreamProgresses), version_old => VersionOld, version_new => VersionNew }), @@ -131,7 +129,7 @@ agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) when type => agent_disconnect, to_leader => ToLeader, from_agent => FromAgent, - stream_progresses => format_streams(StreamProgresses), + stream_progresses => format_stream_progresses(StreamProgresses), version => Version }), _ = erlang:send(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)), @@ -143,14 +141,15 @@ agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) -> %% leader -> agent messages --spec leader_lease_streams(agent(), group(), leader(), list(stream_progress()), version()) -> ok. +-spec leader_lease_streams(agent(), group(), leader(), list(leader_stream_progress()), version()) -> + ok. leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) -> ?tp(warning, shared_sub_proto_msg, #{ type => leader_lease_streams, to_agent => ToAgent, of_group => OfGroup, leader => Leader, - streams => format_streams(Streams), + streams => format_stream_progresses(Streams), version => Version }), _ = emqx_persistent_session_ds_shared_subs_agent:send( @@ -200,7 +199,8 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew ). --spec leader_update_streams(agent(), group(), version(), version(), list(stream_progress())) -> ok. +-spec leader_update_streams(agent(), group(), version(), version(), list(leader_stream_progress())) -> + ok. leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when ?is_local_agent(ToAgent) -> @@ -210,7 +210,7 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when of_group => OfGroup, version_old => VersionOld, version_new => VersionNew, - streams_new => format_streams(StreamsNew) + streams_new => format_stream_progresses(StreamsNew) }), _ = emqx_persistent_session_ds_shared_subs_agent:send( ?agent_pid(ToAgent), @@ -247,14 +247,17 @@ agent(Id, Pid) -> _ = Id, ?agent(Id, Pid). -format_streams(Streams) -> +format_stream_progresses(Streams) -> lists:map( - fun format_stream/1, + fun format_stream_progress/1, Streams ). -format_stream(#{stream := Stream, iterator := Iterator} = Value) -> - Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}. +format_stream_progress(#{stream := Stream, progress := Progress} = Value) -> + Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}. + +format_progress(#{iterator := Iterator} = Progress) -> + Progress#{iterator => format_opaque(Iterator)}. format_stream_key({SubId, Stream}) -> {SubId, format_opaque(Stream)}. @@ -265,6 +268,17 @@ format_stream_keys(StreamKeys) -> StreamKeys ). +format_lease_events(Events) -> + lists:map( + fun format_lease_event/1, + Events + ). + +format_lease_event(#{stream := Stream, progress := Progress} = Event) -> + Event#{stream => format_opaque(Stream), progress => format_progress(Progress)}; +format_lease_event(#{stream := Stream} = Event) -> + Event#{stream => format_opaque(Stream)}. + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- diff --git a/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl b/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl index 2dfc8be65..52f64937d 100644 --- a/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl +++ b/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl @@ -82,7 +82,7 @@ agent_disconnect(Node, ToLeader, FromAgent, StreamProgresses, Version) -> emqx_ds_shared_sub_proto:agent(), emqx_ds_shared_sub_proto:group(), emqx_ds_shared_sub_proto:leader(), - list(emqx_ds_shared_sub_proto:stream_progress()), + list(emqx_ds_shared_sub_proto:leader_stream_progress()), emqx_ds_shared_sub_proto:version() ) -> ok. leader_lease_streams(Node, ToAgent, OfGroup, Leader, Streams, Version) -> @@ -117,7 +117,7 @@ leader_renew_stream_lease(Node, ToAgent, OfGroup, VersionOld, VersionNew) -> emqx_ds_shared_sub_proto:group(), emqx_ds_shared_sub_proto:version(), emqx_ds_shared_sub_proto:version(), - list(emqx_ds_shared_sub_proto:stream_progress()) + list(emqx_ds_shared_sub_proto:leader_stream_progress()) ) -> ok. leader_update_streams(Node, ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> erpc:cast(Node, emqx_ds_shared_sub_proto, leader_update_streams, [ diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index 4733dc650..0f665b5a3 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -183,6 +183,51 @@ t_graceful_disconnect(_Config) -> ok = emqtt:disconnect(ConnShared2), ok = emqtt:disconnect(ConnPub). +t_disconnect_no_double_replay(_Config) -> + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + ConnShared1 = emqtt_connect_sub(<<"client_shared1">>), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr9/topic9/#">>, 1), + + ConnShared2 = emqtt_connect_sub(<<"client_shared2">>), + {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr9/topic9/#">>, 1), + + ct:sleep(1000), + + NPubs = 10_000, + + Topics = [<<"topic9/1">>, <<"topic9/2">>, <<"topic9/3">>], + ok = publish_n(ConnPub, Topics, 1, NPubs), + + Self = self(), + _ = spawn_link(fun() -> + ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs), + Self ! publish_done + end), + + ok = emqtt:disconnect(ConnShared2), + + receive + publish_done -> ok + end, + + Pubs = drain_publishes(), + + ClientByBid = fun(Pid) -> + case Pid of + ConnShared1 -> <<"client_shared1">>; + ConnShared2 -> <<"client_shared2">> + end + end, + + {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), + + ?assertEqual([], Missing), + ?assertEqual([], Duplicate), + + ok = emqtt:disconnect(ConnShared1), + ok = emqtt:disconnect(ConnPub). + t_intensive_reassign(_Config) -> ConnPub = emqtt_connect_pub(<<"client_pub">>),