diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 196d667c6..f0d194dfc 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -48,28 +48,37 @@ revoked_streams := list(emqx_ds:stream()) }. +-type stream_state() :: #{ + iterator => emqx_ds:iterator(), + rank => emqx_ds:stream_rank() +}. + +%% TODO https://emqx.atlassian.net/browse/EMQX-12307 +%% Some data should be persisted -type data() :: #{ + %% + %% Persistent data + %% group := emqx_types:group(), topic := emqx_types:topic(), %% For ds router, not an actual session_id router_id := binary(), - %% TODO https://emqx.atlassian.net/browse/EMQX-12307 - %% Persist progress %% TODO https://emqx.atlassian.net/browse/EMQX-12575 %% Implement some stats to assign evenly? - stream_progresses := #{ - emqx_ds:stream() => #{ - iterator => emqx_ds:iterator(), - rank => emqx_ds:stream_rank() - } + stream_states := #{ + emqx_ds:stream() => stream_state() }, + rank_progress := emqx_ds_shared_sub_leader_rank_progress:t(), + + %% + %% Ephimeral data, should not be persisted + %% agents := #{ emqx_ds_shared_sub_proto:agent() => agent_state() }, stream_owners := #{ emqx_ds:stream() => emqx_ds_shared_sub_proto:agent() - }, - rank_progress := emqx_ds_shared_sub_leader_rank_progress:t() + } }. -export_type([ @@ -141,7 +150,7 @@ init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) -> topic => Topic, router_id => gen_router_id(), start_time => now_ms() - ?START_TIME_THRESHOLD, - stream_progresses => #{}, + stream_states => #{}, stream_owners => #{}, agents => #{}, rank_progress => emqx_ds_shared_sub_leader_rank_progress:init() @@ -262,7 +271,7 @@ terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) -> renew_streams( #{ start_time := StartTime, - stream_progresses := Progresses, + stream_states := StreamStates, topic := Topic, rank_progress := RankProgress0 } = Data0 @@ -274,11 +283,11 @@ renew_streams( {NewStreamsWRanks, RankProgress1} = emqx_ds_shared_sub_leader_rank_progress:add_streams( StreamsWRanks, RankProgress0 ), - {NewProgresses, VanishedProgresses} = update_progresses( - Progresses, NewStreamsWRanks, TopicFilter, StartTime + {NewStreamStates, VanishedStreamStates} = update_progresses( + StreamStates, NewStreamsWRanks, TopicFilter, StartTime ), - Data1 = removed_vanished_streams(Data0, VanishedProgresses), - Data2 = Data1#{stream_progresses => NewProgresses, rank_progress => RankProgress1}, + Data1 = removed_vanished_streams(Data0, VanishedStreamStates), + Data2 = Data1#{stream_states => NewStreamStates, rank_progress => RankProgress1}, Data3 = revoke_streams(Data2), Data4 = assign_streams(Data3), ?SLOG(info, #{ @@ -288,23 +297,26 @@ renew_streams( }), Data4. -update_progresses(Progresses, NewStreamsWRanks, TopicFilter, StartTime) -> +update_progresses(StreamStates, NewStreamsWRanks, TopicFilter, StartTime) -> lists:foldl( - fun({Rank, Stream}, {NewProgressesAcc, OldProgressesAcc}) -> - case OldProgressesAcc of + fun({Rank, Stream}, {NewStreamStatesAcc, OldStreamStatesAcc}) -> + case OldStreamStatesAcc of #{Stream := StreamData} -> { - NewProgressesAcc#{Stream => StreamData}, - maps:remove(Stream, OldProgressesAcc) + NewStreamStatesAcc#{Stream => StreamData}, + maps:remove(Stream, OldStreamStatesAcc) }; _ -> {ok, It} = emqx_ds:make_iterator( ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ), - {NewProgressesAcc#{Stream => #{iterator => It, rank => Rank}}, OldProgressesAcc} + { + NewStreamStatesAcc#{Stream => #{iterator => It, rank => Rank}}, + OldStreamStatesAcc + } end end, - {#{}, Progresses}, + {#{}, StreamStates}, NewStreamsWRanks ). @@ -316,8 +328,8 @@ update_progresses(Progresses, NewStreamsWRanks, TopicFilter, StartTime) -> %% %% If streams disappear after long leader sleep, it is a normal situation. %% This removal will be a part of initialization before any agents connect. -removed_vanished_streams(Data0, VanishedProgresses) -> - VanishedStreams = maps:keys(VanishedProgresses), +removed_vanished_streams(Data0, VanishedStreamStates) -> + VanishedStreams = maps:keys(VanishedStreamStates), Data1 = lists:foldl( fun(Stream, #{stream_owners := StreamOwners0} = DataAcc) -> case StreamOwners0 of @@ -608,33 +620,35 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version) -> end. update_stream_progresses( - #{stream_progresses := StreamProgresses0, stream_owners := StreamOwners} = Data0, + #{stream_states := StreamStates0, stream_owners := StreamOwners} = Data0, Agent, AgentState0, ReceivedStreamProgresses ) -> - {StreamProgresses1, ReplayedStreams} = lists:foldl( - fun(#{stream := Stream, iterator := It}, {ProgressesAcc, ReplayedStreamsAcc}) -> + {StreamStates1, ReplayedStreams} = lists:foldl( + fun(#{stream := Stream, iterator := It}, {StreamStatesAcc, ReplayedStreamsAcc}) -> case StreamOwners of #{Stream := Agent} -> - StreamData0 = maps:get(Stream, ProgressesAcc), + StreamData0 = maps:get(Stream, StreamStatesAcc), case It of end_of_stream -> Rank = maps:get(rank, StreamData0), - {maps:remove(Stream, ProgressesAcc), ReplayedStreamsAcc#{Stream => Rank}}; + {maps:remove(Stream, StreamStatesAcc), ReplayedStreamsAcc#{ + Stream => Rank + }}; _ -> StreamData1 = StreamData0#{iterator => It}, - {ProgressesAcc#{Stream => StreamData1}, ReplayedStreamsAcc} + {StreamStatesAcc#{Stream => StreamData1}, ReplayedStreamsAcc} end; _ -> - {ProgressesAcc, ReplayedStreamsAcc} + {StreamStatesAcc, ReplayedStreamsAcc} end end, - {StreamProgresses0, #{}}, + {StreamStates0, #{}}, ReceivedStreamProgresses ), Data1 = update_rank_progress(Data0, ReplayedStreams), - Data2 = Data1#{stream_progresses => StreamProgresses1}, + Data2 = Data1#{stream_states => StreamStates1}, AgentState1 = filter_replayed_streams(AgentState0, ReplayedStreams), {Data2, AgentState1}. @@ -864,8 +878,8 @@ gen_router_id() -> now_ms() -> erlang:system_time(millisecond). -unassigned_streams(#{stream_progresses := StreamProgresses, stream_owners := StreamOwners}) -> - Streams = maps:keys(StreamProgresses), +unassigned_streams(#{stream_states := StreamStates, stream_owners := StreamOwners}) -> + Streams = maps:keys(StreamStates), AssignedStreams = maps:keys(StreamOwners), Streams -- AssignedStreams. @@ -887,12 +901,12 @@ desired_stream_count_per_agent(#{agents := AgentStates} = Data) -> desired_stream_count_for_new_agent(#{agents := AgentStates} = Data) -> desired_stream_count_per_agent(Data, maps:size(AgentStates) + 1). -desired_stream_count_per_agent(#{stream_progresses := StreamProgresses}, AgentCount) -> +desired_stream_count_per_agent(#{stream_states := StreamStates}, AgentCount) -> case AgentCount of 0 -> 0; _ -> - StreamCount = maps:size(StreamProgresses), + StreamCount = maps:size(StreamStates), case StreamCount rem AgentCount of 0 -> StreamCount div AgentCount; @@ -901,10 +915,10 @@ desired_stream_count_per_agent(#{stream_progresses := StreamProgresses}, AgentCo end end. -stream_progresses(#{stream_progresses := StreamProgresses} = _Data, Streams) -> +stream_progresses(#{stream_states := StreamStates} = _Data, Streams) -> lists:map( fun(Stream) -> - StreamData = maps:get(Stream, StreamProgresses), + StreamData = maps:get(Stream, StreamStates), #{ stream => Stream, iterator => maps:get(iterator, StreamData)