feat(queue): rename leader' stream_progresses to stream_states

This commit is contained in:
Ilya Averyanov 2024-07-03 23:21:21 +03:00
parent 7d004b37da
commit 53d4cd3174
1 changed files with 53 additions and 39 deletions

View File

@ -48,28 +48,37 @@
revoked_streams := list(emqx_ds:stream()) revoked_streams := list(emqx_ds:stream())
}. }.
-type stream_state() :: #{
iterator => emqx_ds:iterator(),
rank => emqx_ds:stream_rank()
}.
%% TODO https://emqx.atlassian.net/browse/EMQX-12307
%% Some data should be persisted
-type data() :: #{ -type data() :: #{
%%
%% Persistent data
%%
group := emqx_types:group(), group := emqx_types:group(),
topic := emqx_types:topic(), topic := emqx_types:topic(),
%% For ds router, not an actual session_id %% For ds router, not an actual session_id
router_id := binary(), router_id := binary(),
%% TODO https://emqx.atlassian.net/browse/EMQX-12307
%% Persist progress
%% TODO https://emqx.atlassian.net/browse/EMQX-12575 %% TODO https://emqx.atlassian.net/browse/EMQX-12575
%% Implement some stats to assign evenly? %% Implement some stats to assign evenly?
stream_progresses := #{ stream_states := #{
emqx_ds:stream() => #{ emqx_ds:stream() => stream_state()
iterator => emqx_ds:iterator(),
rank => emqx_ds:stream_rank()
}
}, },
rank_progress := emqx_ds_shared_sub_leader_rank_progress:t(),
%%
%% Ephimeral data, should not be persisted
%%
agents := #{ agents := #{
emqx_ds_shared_sub_proto:agent() => agent_state() emqx_ds_shared_sub_proto:agent() => agent_state()
}, },
stream_owners := #{ stream_owners := #{
emqx_ds:stream() => emqx_ds_shared_sub_proto:agent() emqx_ds:stream() => emqx_ds_shared_sub_proto:agent()
}, }
rank_progress := emqx_ds_shared_sub_leader_rank_progress:t()
}. }.
-export_type([ -export_type([
@ -141,7 +150,7 @@ init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) ->
topic => Topic, topic => Topic,
router_id => gen_router_id(), router_id => gen_router_id(),
start_time => now_ms() - ?START_TIME_THRESHOLD, start_time => now_ms() - ?START_TIME_THRESHOLD,
stream_progresses => #{}, stream_states => #{},
stream_owners => #{}, stream_owners => #{},
agents => #{}, agents => #{},
rank_progress => emqx_ds_shared_sub_leader_rank_progress:init() rank_progress => emqx_ds_shared_sub_leader_rank_progress:init()
@ -262,7 +271,7 @@ terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) ->
renew_streams( renew_streams(
#{ #{
start_time := StartTime, start_time := StartTime,
stream_progresses := Progresses, stream_states := StreamStates,
topic := Topic, topic := Topic,
rank_progress := RankProgress0 rank_progress := RankProgress0
} = Data0 } = Data0
@ -274,11 +283,11 @@ renew_streams(
{NewStreamsWRanks, RankProgress1} = emqx_ds_shared_sub_leader_rank_progress:add_streams( {NewStreamsWRanks, RankProgress1} = emqx_ds_shared_sub_leader_rank_progress:add_streams(
StreamsWRanks, RankProgress0 StreamsWRanks, RankProgress0
), ),
{NewProgresses, VanishedProgresses} = update_progresses( {NewStreamStates, VanishedStreamStates} = update_progresses(
Progresses, NewStreamsWRanks, TopicFilter, StartTime StreamStates, NewStreamsWRanks, TopicFilter, StartTime
), ),
Data1 = removed_vanished_streams(Data0, VanishedProgresses), Data1 = removed_vanished_streams(Data0, VanishedStreamStates),
Data2 = Data1#{stream_progresses => NewProgresses, rank_progress => RankProgress1}, Data2 = Data1#{stream_states => NewStreamStates, rank_progress => RankProgress1},
Data3 = revoke_streams(Data2), Data3 = revoke_streams(Data2),
Data4 = assign_streams(Data3), Data4 = assign_streams(Data3),
?SLOG(info, #{ ?SLOG(info, #{
@ -288,23 +297,26 @@ renew_streams(
}), }),
Data4. Data4.
update_progresses(Progresses, NewStreamsWRanks, TopicFilter, StartTime) -> update_progresses(StreamStates, NewStreamsWRanks, TopicFilter, StartTime) ->
lists:foldl( lists:foldl(
fun({Rank, Stream}, {NewProgressesAcc, OldProgressesAcc}) -> fun({Rank, Stream}, {NewStreamStatesAcc, OldStreamStatesAcc}) ->
case OldProgressesAcc of case OldStreamStatesAcc of
#{Stream := StreamData} -> #{Stream := StreamData} ->
{ {
NewProgressesAcc#{Stream => StreamData}, NewStreamStatesAcc#{Stream => StreamData},
maps:remove(Stream, OldProgressesAcc) maps:remove(Stream, OldStreamStatesAcc)
}; };
_ -> _ ->
{ok, It} = emqx_ds:make_iterator( {ok, It} = emqx_ds:make_iterator(
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
), ),
{NewProgressesAcc#{Stream => #{iterator => It, rank => Rank}}, OldProgressesAcc} {
NewStreamStatesAcc#{Stream => #{iterator => It, rank => Rank}},
OldStreamStatesAcc
}
end end
end, end,
{#{}, Progresses}, {#{}, StreamStates},
NewStreamsWRanks NewStreamsWRanks
). ).
@ -316,8 +328,8 @@ update_progresses(Progresses, NewStreamsWRanks, TopicFilter, StartTime) ->
%% %%
%% If streams disappear after long leader sleep, it is a normal situation. %% If streams disappear after long leader sleep, it is a normal situation.
%% This removal will be a part of initialization before any agents connect. %% This removal will be a part of initialization before any agents connect.
removed_vanished_streams(Data0, VanishedProgresses) -> removed_vanished_streams(Data0, VanishedStreamStates) ->
VanishedStreams = maps:keys(VanishedProgresses), VanishedStreams = maps:keys(VanishedStreamStates),
Data1 = lists:foldl( Data1 = lists:foldl(
fun(Stream, #{stream_owners := StreamOwners0} = DataAcc) -> fun(Stream, #{stream_owners := StreamOwners0} = DataAcc) ->
case StreamOwners0 of case StreamOwners0 of
@ -608,33 +620,35 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version) ->
end. end.
update_stream_progresses( update_stream_progresses(
#{stream_progresses := StreamProgresses0, stream_owners := StreamOwners} = Data0, #{stream_states := StreamStates0, stream_owners := StreamOwners} = Data0,
Agent, Agent,
AgentState0, AgentState0,
ReceivedStreamProgresses ReceivedStreamProgresses
) -> ) ->
{StreamProgresses1, ReplayedStreams} = lists:foldl( {StreamStates1, ReplayedStreams} = lists:foldl(
fun(#{stream := Stream, iterator := It}, {ProgressesAcc, ReplayedStreamsAcc}) -> fun(#{stream := Stream, iterator := It}, {StreamStatesAcc, ReplayedStreamsAcc}) ->
case StreamOwners of case StreamOwners of
#{Stream := Agent} -> #{Stream := Agent} ->
StreamData0 = maps:get(Stream, ProgressesAcc), StreamData0 = maps:get(Stream, StreamStatesAcc),
case It of case It of
end_of_stream -> end_of_stream ->
Rank = maps:get(rank, StreamData0), Rank = maps:get(rank, StreamData0),
{maps:remove(Stream, ProgressesAcc), ReplayedStreamsAcc#{Stream => Rank}}; {maps:remove(Stream, StreamStatesAcc), ReplayedStreamsAcc#{
Stream => Rank
}};
_ -> _ ->
StreamData1 = StreamData0#{iterator => It}, StreamData1 = StreamData0#{iterator => It},
{ProgressesAcc#{Stream => StreamData1}, ReplayedStreamsAcc} {StreamStatesAcc#{Stream => StreamData1}, ReplayedStreamsAcc}
end; end;
_ -> _ ->
{ProgressesAcc, ReplayedStreamsAcc} {StreamStatesAcc, ReplayedStreamsAcc}
end end
end, end,
{StreamProgresses0, #{}}, {StreamStates0, #{}},
ReceivedStreamProgresses ReceivedStreamProgresses
), ),
Data1 = update_rank_progress(Data0, ReplayedStreams), Data1 = update_rank_progress(Data0, ReplayedStreams),
Data2 = Data1#{stream_progresses => StreamProgresses1}, Data2 = Data1#{stream_states => StreamStates1},
AgentState1 = filter_replayed_streams(AgentState0, ReplayedStreams), AgentState1 = filter_replayed_streams(AgentState0, ReplayedStreams),
{Data2, AgentState1}. {Data2, AgentState1}.
@ -864,8 +878,8 @@ gen_router_id() ->
now_ms() -> now_ms() ->
erlang:system_time(millisecond). erlang:system_time(millisecond).
unassigned_streams(#{stream_progresses := StreamProgresses, stream_owners := StreamOwners}) -> unassigned_streams(#{stream_states := StreamStates, stream_owners := StreamOwners}) ->
Streams = maps:keys(StreamProgresses), Streams = maps:keys(StreamStates),
AssignedStreams = maps:keys(StreamOwners), AssignedStreams = maps:keys(StreamOwners),
Streams -- AssignedStreams. Streams -- AssignedStreams.
@ -887,12 +901,12 @@ desired_stream_count_per_agent(#{agents := AgentStates} = Data) ->
desired_stream_count_for_new_agent(#{agents := AgentStates} = Data) -> desired_stream_count_for_new_agent(#{agents := AgentStates} = Data) ->
desired_stream_count_per_agent(Data, maps:size(AgentStates) + 1). desired_stream_count_per_agent(Data, maps:size(AgentStates) + 1).
desired_stream_count_per_agent(#{stream_progresses := StreamProgresses}, AgentCount) -> desired_stream_count_per_agent(#{stream_states := StreamStates}, AgentCount) ->
case AgentCount of case AgentCount of
0 -> 0 ->
0; 0;
_ -> _ ->
StreamCount = maps:size(StreamProgresses), StreamCount = maps:size(StreamStates),
case StreamCount rem AgentCount of case StreamCount rem AgentCount of
0 -> 0 ->
StreamCount div AgentCount; StreamCount div AgentCount;
@ -901,10 +915,10 @@ desired_stream_count_per_agent(#{stream_progresses := StreamProgresses}, AgentCo
end end
end. end.
stream_progresses(#{stream_progresses := StreamProgresses} = _Data, Streams) -> stream_progresses(#{stream_states := StreamStates} = _Data, Streams) ->
lists:map( lists:map(
fun(Stream) -> fun(Stream) ->
StreamData = maps:get(Stream, StreamProgresses), StreamData = maps:get(Stream, StreamStates),
#{ #{
stream => Stream, stream => Stream,
iterator => maps:get(iterator, StreamData) iterator => maps:get(iterator, StreamData)