diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 8f6b7c683..196d667c6 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -58,14 +58,18 @@ %% TODO https://emqx.atlassian.net/browse/EMQX-12575 %% Implement some stats to assign evenly? stream_progresses := #{ - emqx_ds:stream() => emqx_ds:iterator() + emqx_ds:stream() => #{ + iterator => emqx_ds:iterator(), + rank => emqx_ds:stream_rank() + } }, agents := #{ emqx_ds_shared_sub_proto:agent() => agent_state() }, stream_owners := #{ emqx_ds:stream() => emqx_ds_shared_sub_proto:agent() - } + }, + rank_progress := emqx_ds_shared_sub_leader_rank_progress:t() }. -export_type([ @@ -139,7 +143,8 @@ init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) -> start_time => now_ms() - ?START_TIME_THRESHOLD, stream_progresses => #{}, stream_owners => #{}, - agents => #{} + agents => #{}, + rank_progress => emqx_ds_shared_sub_leader_rank_progress:init() }, {ok, ?leader_waiting_registration, Data}. @@ -254,37 +259,87 @@ terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) -> %% * Revoke streams from agents having too many streams %% * Assign streams to agents having too few streams -renew_streams(#{start_time := StartTime, stream_progresses := Progresses, topic := Topic} = Data0) -> +renew_streams( + #{ + start_time := StartTime, + stream_progresses := Progresses, + topic := Topic, + rank_progress := RankProgress0 + } = Data0 +) -> TopicFilter = emqx_topic:words(Topic), - {_, Streams} = lists:unzip( - emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime) + StreamsWRanks = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), + + %% Discard streams that are already replayed and init new + {NewStreamsWRanks, RankProgress1} = emqx_ds_shared_sub_leader_rank_progress:add_streams( + StreamsWRanks, RankProgress0 ), - %% TODO https://emqx.atlassian.net/browse/EMQX-12572 - %% Handle stream removal - NewProgresses = lists:foldl( - fun(Stream, ProgressesAcc) -> - case ProgressesAcc of - #{Stream := _} -> - ProgressesAcc; + {NewProgresses, VanishedProgresses} = update_progresses( + Progresses, NewStreamsWRanks, TopicFilter, StartTime + ), + Data1 = removed_vanished_streams(Data0, VanishedProgresses), + Data2 = Data1#{stream_progresses => NewProgresses, rank_progress => RankProgress1}, + Data3 = revoke_streams(Data2), + Data4 = assign_streams(Data3), + ?SLOG(info, #{ + msg => leader_renew_streams, + topic_filter => TopicFilter, + new_streams => length(NewStreamsWRanks) + }), + Data4. + +update_progresses(Progresses, NewStreamsWRanks, TopicFilter, StartTime) -> + lists:foldl( + fun({Rank, Stream}, {NewProgressesAcc, OldProgressesAcc}) -> + case OldProgressesAcc of + #{Stream := StreamData} -> + { + NewProgressesAcc#{Stream => StreamData}, + maps:remove(Stream, OldProgressesAcc) + }; _ -> {ok, It} = emqx_ds:make_iterator( ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ), - ProgressesAcc#{Stream => It} + {NewProgressesAcc#{Stream => #{iterator => It, rank => Rank}}, OldProgressesAcc} end end, - Progresses, - Streams + {#{}, Progresses}, + NewStreamsWRanks + ). + +%% We just remove disappeared streams from anywhere. +%% +%% If streams disappear from DS during leader being in replaying state +%% this is an abnormal situation (we should receive `end_of_stream` first), +%% but clients clients are unlikely to report any progress on them. +%% +%% If streams disappear after long leader sleep, it is a normal situation. +%% This removal will be a part of initialization before any agents connect. +removed_vanished_streams(Data0, VanishedProgresses) -> + VanishedStreams = maps:keys(VanishedProgresses), + Data1 = lists:foldl( + fun(Stream, #{stream_owners := StreamOwners0} = DataAcc) -> + case StreamOwners0 of + #{Stream := Agent} -> + #{streams := Streams0, revoked_streams := RevokedStreams0} = + AgentState0 = get_agent_state(Data0, Agent), + Streams1 = Streams0 -- [Stream], + RevokedStreams1 = RevokedStreams0 -- [Stream], + AgentState1 = AgentState0#{ + streams => Streams1, + revoked_streams => RevokedStreams1 + }, + set_agent_state(DataAcc, Agent, AgentState1); + _ -> + DataAcc + end + end, + Data0, + VanishedStreams ), - Data1 = Data0#{stream_progresses => NewProgresses}, - ?SLOG(info, #{ - msg => leader_renew_streams, - topic_filter => TopicFilter, - streams => length(Streams) - }), - Data2 = revoke_streams(Data1), - Data3 = assign_streams(Data2), - Data3. + Data2 = unassign_streams(Data1, VanishedStreams), + Data2. %% We revoke streams from agents that have too many streams (> desired_stream_count_per_agent). %% We revoke only from replaying agents. @@ -528,15 +583,19 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version) -> Data0; {?waiting_replaying, AgentVersion} -> %% Agent finished updating, now replaying - Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), - AgentState1 = update_agent_timeout(AgentState0), - AgentState2 = agent_transition_to_replaying(Agent, AgentState1), - set_agent_state(Data1, Agent, AgentState2); + {Data1, AgentState1} = update_stream_progresses( + Data0, Agent, AgentState0, AgentStreamProgresses + ), + AgentState2 = update_agent_timeout(AgentState1), + AgentState3 = agent_transition_to_replaying(Agent, AgentState2), + set_agent_state(Data1, Agent, AgentState3); {?replaying, AgentVersion} -> %% Common case, agent is replaying - Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), - AgentState1 = update_agent_timeout(AgentState0), - set_agent_state(Data1, Agent, AgentState1); + {Data1, AgentState1} = update_stream_progresses( + Data0, Agent, AgentState0, AgentStreamProgresses + ), + AgentState2 = update_agent_timeout(AgentState1), + set_agent_state(Data1, Agent, AgentState2); {OtherState, OtherVersion} -> ?tp(warning, unexpected_update, #{ agent => Agent, @@ -549,24 +608,63 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version) -> end. update_stream_progresses( - #{stream_progresses := StreamProgresses0, stream_owners := StreamOwners} = Data, + #{stream_progresses := StreamProgresses0, stream_owners := StreamOwners} = Data0, Agent, + AgentState0, ReceivedStreamProgresses ) -> - StreamProgresses1 = lists:foldl( - fun(#{stream := Stream, iterator := It}, ProgressesAcc) -> + {StreamProgresses1, ReplayedStreams} = lists:foldl( + fun(#{stream := Stream, iterator := It}, {ProgressesAcc, ReplayedStreamsAcc}) -> case StreamOwners of #{Stream := Agent} -> - ProgressesAcc#{Stream => It}; + StreamData0 = maps:get(Stream, ProgressesAcc), + case It of + end_of_stream -> + Rank = maps:get(rank, StreamData0), + {maps:remove(Stream, ProgressesAcc), ReplayedStreamsAcc#{Stream => Rank}}; + _ -> + StreamData1 = StreamData0#{iterator => It}, + {ProgressesAcc#{Stream => StreamData1}, ReplayedStreamsAcc} + end; _ -> - ProgressesAcc + {ProgressesAcc, ReplayedStreamsAcc} end end, - StreamProgresses0, + {StreamProgresses0, #{}}, ReceivedStreamProgresses ), - Data#{ - stream_progresses => StreamProgresses1 + Data1 = update_rank_progress(Data0, ReplayedStreams), + Data2 = Data1#{stream_progresses => StreamProgresses1}, + AgentState1 = filter_replayed_streams(AgentState0, ReplayedStreams), + {Data2, AgentState1}. + +update_rank_progress(#{rank_progress := RankProgress0} = Data, ReplayedStreams) -> + RankProgress1 = maps:fold( + fun(Stream, Rank, RankProgressAcc) -> + emqx_ds_shared_sub_leader_rank_progress:set_replayed({Rank, Stream}, RankProgressAcc) + end, + RankProgress0, + ReplayedStreams + ), + Data#{rank_progress => RankProgress1}. + +%% No need to revoke fully replayed streams. We do not assign them anymore. +%% The agent's session also will drop replayed streams itself. +filter_replayed_streams( + #{streams := Streams0, revoked_streams := RevokedStreams0} = AgentState0, + ReplayedStreams +) -> + Streams1 = lists:filter( + fun(Stream) -> not maps:is_key(Stream, ReplayedStreams) end, + Streams0 + ), + RevokedStreams1 = lists:filter( + fun(Stream) -> not maps:is_key(Stream, ReplayedStreams) end, + RevokedStreams0 + ), + AgentState0#{ + streams => Streams1, + revoked_streams => RevokedStreams1 }. clean_revoked_streams( @@ -613,41 +711,49 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers case {State, VersionOld, VersionNew} of {?waiting_updating, AgentPrevVersion, AgentVersion} -> %% Client started updating - Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), - AgentState1 = update_agent_timeout(AgentState0), - {AgentState2, Data2} = clean_revoked_streams( - Data1, Agent, AgentState1, AgentStreamProgresses + {Data1, AgentState1} = update_stream_progresses( + Data0, Agent, AgentState0, AgentStreamProgresses ), - AgentState3 = - case AgentState2 of + AgentState2 = update_agent_timeout(AgentState1), + {AgentState3, Data2} = clean_revoked_streams( + Data1, Agent, AgentState2, AgentStreamProgresses + ), + AgentState4 = + case AgentState3 of #{revoked_streams := []} -> - agent_transition_to_waiting_replaying(Data1, Agent, AgentState2); + agent_transition_to_waiting_replaying(Data1, Agent, AgentState3); _ -> - agent_transition_to_updating(Agent, AgentState2) + agent_transition_to_updating(Agent, AgentState3) end, - set_agent_state(Data2, Agent, AgentState3); + set_agent_state(Data2, Agent, AgentState4); {?updating, AgentPrevVersion, AgentVersion} -> - Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), - AgentState1 = update_agent_timeout(AgentState0), - {AgentState2, Data2} = clean_revoked_streams( - Data1, Agent, AgentState1, AgentStreamProgresses + {Data1, AgentState1} = update_stream_progresses( + Data0, Agent, AgentState0, AgentStreamProgresses ), - AgentState3 = - case AgentState2 of + AgentState2 = update_agent_timeout(AgentState1), + {AgentState3, Data2} = clean_revoked_streams( + Data1, Agent, AgentState2, AgentStreamProgresses + ), + AgentState4 = + case AgentState3 of #{revoked_streams := []} -> - agent_transition_to_waiting_replaying(Data1, Agent, AgentState2); + agent_transition_to_waiting_replaying(Data1, Agent, AgentState3); _ -> - AgentState2 + AgentState3 end, - set_agent_state(Data2, Agent, AgentState3); + set_agent_state(Data2, Agent, AgentState4); {?waiting_replaying, _, AgentVersion} -> - Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), - AgentState1 = update_agent_timeout(AgentState0), - set_agent_state(Data1, Agent, AgentState1); + {Data1, AgentState1} = update_stream_progresses( + Data0, Agent, AgentState0, AgentStreamProgresses + ), + AgentState2 = update_agent_timeout(AgentState1), + set_agent_state(Data1, Agent, AgentState2); {?replaying, _, AgentVersion} -> - Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), - AgentState1 = update_agent_timeout(AgentState0), - set_agent_state(Data1, Agent, AgentState1); + {Data1, AgentState1} = update_stream_progresses( + Data0, Agent, AgentState0, AgentStreamProgresses + ), + AgentState2 = update_agent_timeout(AgentState1), + set_agent_state(Data1, Agent, AgentState2); {OtherState, OtherVersionOld, OtherVersionNew} -> ?tp(warning, unexpected_update, #{ agent => Agent, @@ -798,9 +904,10 @@ desired_stream_count_per_agent(#{stream_progresses := StreamProgresses}, AgentCo stream_progresses(#{stream_progresses := StreamProgresses} = _Data, Streams) -> lists:map( fun(Stream) -> + StreamData = maps:get(Stream, StreamProgresses), #{ stream => Stream, - iterator => maps:get(Stream, StreamProgresses) + iterator => maps:get(iterator, StreamData) } end, Streams diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl new file mode 100644 index 000000000..689c4ba89 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl @@ -0,0 +1,115 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_leader_rank_progress). + +-include_lib("emqx/include/logger.hrl"). + +-export([ + init/0, + set_replayed/2, + add_streams/2 +]). + +%% "shard" +-type rank_x() :: emqx_ds:rank_x(). + +%% "generation" +-type rank_y() :: emqx_ds:rank_y(). + +%% shard progress +-type x_progress() :: #{ + %% All streams with given rank_x and rank_y =< min_y are replayed. + min_y := rank_y(), + + ys := #{ + rank_y() => #{ + emqx_ds:stream() => _IdReplayed :: boolean() + } + } +}. + +-type t() :: #{ + rank_x() => x_progress() +}. + +-spec init() -> t(). +init() -> #{}. + +-spec set_replayed(emqx_ds:stream_rank(), t()) -> t(). +set_replayed({{RankX, RankY}, Stream}, State) -> + case State of + #{RankX := #{ys := #{RankY := #{Stream := false} = RankYStreams} = Ys0}} -> + Ys1 = Ys0#{RankY => RankYStreams#{Stream => true}}, + {MinY, Ys2} = update_min_y(maps:to_list(Ys1)), + State#{RankX => #{min_y => MinY, ys => Ys2}}; + _ -> + ?SLOG( + warning, + leader_rank_progress_double_or_invalid_update, + #{ + rank_x => RankX, + rank_y => RankY, + state => State + } + ), + State + end. + +-spec add_streams([{emqx_ds:stream_rank(), emqx_ds:stream()}], t()) -> false | {true, t()}. +add_streams(StreamsWithRanks, State) -> + SortedStreamsWithRanks = lists:sort( + fun({{_RankX1, RankY1}, _Stream1}, {{_RankX2, RankY2}, _Stream2}) -> + RankY1 =< RankY2 + end, + StreamsWithRanks + ), + lists:foldl( + fun({Rank, Stream} = StreamWithRank, {StreamAcc, StateAcc0}) -> + case add_stream({Rank, Stream}, StateAcc0) of + {true, StateAcc1} -> + {[StreamWithRank | StreamAcc], StateAcc1}; + false -> + {StreamAcc, StateAcc0} + end + end, + {[], State}, + SortedStreamsWithRanks + ). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +add_stream({{RankX, RankY}, Stream}, State0) -> + case State0 of + #{RankX := #{min_y := MinY}} when RankY =< MinY -> + false; + #{RankX := #{ys := #{RankY := #{Stream := true}}}} -> + false; + _ -> + XProgress = maps:get(RankX, State0, #{min_y => RankY - 1, ys => #{}}), + Ys0 = maps:get(ys, XProgress), + RankYStreams0 = maps:get(RankY, Ys0, #{}), + RankYStreams1 = RankYStreams0#{Stream => false}, + Ys1 = Ys0#{RankY => RankYStreams1}, + State1 = State0#{RankX => XProgress#{ys => Ys1}}, + {true, State1} + end. + +update_min_y([{RankY, RankYStreams} | Rest] = Ys) -> + case {has_unreplayed_streams(RankYStreams), Rest} of + {true, _} -> + {RankY, maps:from_list(Ys)}; + {false, []} -> + {RankY - 1, #{}}; + {false, _} -> + update_min_y(Rest) + end. + +has_unreplayed_streams(RankYStreams) -> + lists:any( + fun(IsReplayed) -> not IsReplayed end, + maps:values(RankYStreams) + ).