feat(queue): implement stream finalization

This commit is contained in:
Ilya Averyanov 2024-07-03 22:48:04 +03:00
parent e5547005eb
commit 7d004b37da
2 changed files with 287 additions and 65 deletions

View File

@ -58,14 +58,18 @@
%% TODO https://emqx.atlassian.net/browse/EMQX-12575
%% Implement some stats to assign evenly?
stream_progresses := #{
emqx_ds:stream() => emqx_ds:iterator()
emqx_ds:stream() => #{
iterator => emqx_ds:iterator(),
rank => emqx_ds:stream_rank()
}
},
agents := #{
emqx_ds_shared_sub_proto:agent() => agent_state()
},
stream_owners := #{
emqx_ds:stream() => emqx_ds_shared_sub_proto:agent()
}
},
rank_progress := emqx_ds_shared_sub_leader_rank_progress:t()
}.
-export_type([
@ -139,7 +143,8 @@ init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) ->
start_time => now_ms() - ?START_TIME_THRESHOLD,
stream_progresses => #{},
stream_owners => #{},
agents => #{}
agents => #{},
rank_progress => emqx_ds_shared_sub_leader_rank_progress:init()
},
{ok, ?leader_waiting_registration, Data}.
@ -254,37 +259,87 @@ terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) ->
%% * Revoke streams from agents having too many streams
%% * Assign streams to agents having too few streams
renew_streams(#{start_time := StartTime, stream_progresses := Progresses, topic := Topic} = Data0) ->
renew_streams(
#{
start_time := StartTime,
stream_progresses := Progresses,
topic := Topic,
rank_progress := RankProgress0
} = Data0
) ->
TopicFilter = emqx_topic:words(Topic),
{_, Streams} = lists:unzip(
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime)
StreamsWRanks = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
%% Discard streams that are already replayed and init new
{NewStreamsWRanks, RankProgress1} = emqx_ds_shared_sub_leader_rank_progress:add_streams(
StreamsWRanks, RankProgress0
),
%% TODO https://emqx.atlassian.net/browse/EMQX-12572
%% Handle stream removal
NewProgresses = lists:foldl(
fun(Stream, ProgressesAcc) ->
case ProgressesAcc of
#{Stream := _} ->
ProgressesAcc;
{NewProgresses, VanishedProgresses} = update_progresses(
Progresses, NewStreamsWRanks, TopicFilter, StartTime
),
Data1 = removed_vanished_streams(Data0, VanishedProgresses),
Data2 = Data1#{stream_progresses => NewProgresses, rank_progress => RankProgress1},
Data3 = revoke_streams(Data2),
Data4 = assign_streams(Data3),
?SLOG(info, #{
msg => leader_renew_streams,
topic_filter => TopicFilter,
new_streams => length(NewStreamsWRanks)
}),
Data4.
update_progresses(Progresses, NewStreamsWRanks, TopicFilter, StartTime) ->
lists:foldl(
fun({Rank, Stream}, {NewProgressesAcc, OldProgressesAcc}) ->
case OldProgressesAcc of
#{Stream := StreamData} ->
{
NewProgressesAcc#{Stream => StreamData},
maps:remove(Stream, OldProgressesAcc)
};
_ ->
{ok, It} = emqx_ds:make_iterator(
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
),
ProgressesAcc#{Stream => It}
{NewProgressesAcc#{Stream => #{iterator => It, rank => Rank}}, OldProgressesAcc}
end
end,
Progresses,
Streams
{#{}, Progresses},
NewStreamsWRanks
).
%% We just remove disappeared streams from anywhere.
%%
%% If streams disappear from DS during leader being in replaying state
%% this is an abnormal situation (we should receive `end_of_stream` first),
%% but clients clients are unlikely to report any progress on them.
%%
%% If streams disappear after long leader sleep, it is a normal situation.
%% This removal will be a part of initialization before any agents connect.
removed_vanished_streams(Data0, VanishedProgresses) ->
VanishedStreams = maps:keys(VanishedProgresses),
Data1 = lists:foldl(
fun(Stream, #{stream_owners := StreamOwners0} = DataAcc) ->
case StreamOwners0 of
#{Stream := Agent} ->
#{streams := Streams0, revoked_streams := RevokedStreams0} =
AgentState0 = get_agent_state(Data0, Agent),
Streams1 = Streams0 -- [Stream],
RevokedStreams1 = RevokedStreams0 -- [Stream],
AgentState1 = AgentState0#{
streams => Streams1,
revoked_streams => RevokedStreams1
},
set_agent_state(DataAcc, Agent, AgentState1);
_ ->
DataAcc
end
end,
Data0,
VanishedStreams
),
Data1 = Data0#{stream_progresses => NewProgresses},
?SLOG(info, #{
msg => leader_renew_streams,
topic_filter => TopicFilter,
streams => length(Streams)
}),
Data2 = revoke_streams(Data1),
Data3 = assign_streams(Data2),
Data3.
Data2 = unassign_streams(Data1, VanishedStreams),
Data2.
%% We revoke streams from agents that have too many streams (> desired_stream_count_per_agent).
%% We revoke only from replaying agents.
@ -528,15 +583,19 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version) ->
Data0;
{?waiting_replaying, AgentVersion} ->
%% Agent finished updating, now replaying
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
AgentState2 = agent_transition_to_replaying(Agent, AgentState1),
set_agent_state(Data1, Agent, AgentState2);
{Data1, AgentState1} = update_stream_progresses(
Data0, Agent, AgentState0, AgentStreamProgresses
),
AgentState2 = update_agent_timeout(AgentState1),
AgentState3 = agent_transition_to_replaying(Agent, AgentState2),
set_agent_state(Data1, Agent, AgentState3);
{?replaying, AgentVersion} ->
%% Common case, agent is replaying
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
set_agent_state(Data1, Agent, AgentState1);
{Data1, AgentState1} = update_stream_progresses(
Data0, Agent, AgentState0, AgentStreamProgresses
),
AgentState2 = update_agent_timeout(AgentState1),
set_agent_state(Data1, Agent, AgentState2);
{OtherState, OtherVersion} ->
?tp(warning, unexpected_update, #{
agent => Agent,
@ -549,24 +608,63 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version) ->
end.
update_stream_progresses(
#{stream_progresses := StreamProgresses0, stream_owners := StreamOwners} = Data,
#{stream_progresses := StreamProgresses0, stream_owners := StreamOwners} = Data0,
Agent,
AgentState0,
ReceivedStreamProgresses
) ->
StreamProgresses1 = lists:foldl(
fun(#{stream := Stream, iterator := It}, ProgressesAcc) ->
{StreamProgresses1, ReplayedStreams} = lists:foldl(
fun(#{stream := Stream, iterator := It}, {ProgressesAcc, ReplayedStreamsAcc}) ->
case StreamOwners of
#{Stream := Agent} ->
ProgressesAcc#{Stream => It};
StreamData0 = maps:get(Stream, ProgressesAcc),
case It of
end_of_stream ->
Rank = maps:get(rank, StreamData0),
{maps:remove(Stream, ProgressesAcc), ReplayedStreamsAcc#{Stream => Rank}};
_ ->
StreamData1 = StreamData0#{iterator => It},
{ProgressesAcc#{Stream => StreamData1}, ReplayedStreamsAcc}
end;
_ ->
ProgressesAcc
{ProgressesAcc, ReplayedStreamsAcc}
end
end,
StreamProgresses0,
{StreamProgresses0, #{}},
ReceivedStreamProgresses
),
Data#{
stream_progresses => StreamProgresses1
Data1 = update_rank_progress(Data0, ReplayedStreams),
Data2 = Data1#{stream_progresses => StreamProgresses1},
AgentState1 = filter_replayed_streams(AgentState0, ReplayedStreams),
{Data2, AgentState1}.
update_rank_progress(#{rank_progress := RankProgress0} = Data, ReplayedStreams) ->
RankProgress1 = maps:fold(
fun(Stream, Rank, RankProgressAcc) ->
emqx_ds_shared_sub_leader_rank_progress:set_replayed({Rank, Stream}, RankProgressAcc)
end,
RankProgress0,
ReplayedStreams
),
Data#{rank_progress => RankProgress1}.
%% No need to revoke fully replayed streams. We do not assign them anymore.
%% The agent's session also will drop replayed streams itself.
filter_replayed_streams(
#{streams := Streams0, revoked_streams := RevokedStreams0} = AgentState0,
ReplayedStreams
) ->
Streams1 = lists:filter(
fun(Stream) -> not maps:is_key(Stream, ReplayedStreams) end,
Streams0
),
RevokedStreams1 = lists:filter(
fun(Stream) -> not maps:is_key(Stream, ReplayedStreams) end,
RevokedStreams0
),
AgentState0#{
streams => Streams1,
revoked_streams => RevokedStreams1
}.
clean_revoked_streams(
@ -613,41 +711,49 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers
case {State, VersionOld, VersionNew} of
{?waiting_updating, AgentPrevVersion, AgentVersion} ->
%% Client started updating
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
{AgentState2, Data2} = clean_revoked_streams(
Data1, Agent, AgentState1, AgentStreamProgresses
{Data1, AgentState1} = update_stream_progresses(
Data0, Agent, AgentState0, AgentStreamProgresses
),
AgentState3 =
case AgentState2 of
AgentState2 = update_agent_timeout(AgentState1),
{AgentState3, Data2} = clean_revoked_streams(
Data1, Agent, AgentState2, AgentStreamProgresses
),
AgentState4 =
case AgentState3 of
#{revoked_streams := []} ->
agent_transition_to_waiting_replaying(Data1, Agent, AgentState2);
agent_transition_to_waiting_replaying(Data1, Agent, AgentState3);
_ ->
agent_transition_to_updating(Agent, AgentState2)
agent_transition_to_updating(Agent, AgentState3)
end,
set_agent_state(Data2, Agent, AgentState3);
set_agent_state(Data2, Agent, AgentState4);
{?updating, AgentPrevVersion, AgentVersion} ->
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
{AgentState2, Data2} = clean_revoked_streams(
Data1, Agent, AgentState1, AgentStreamProgresses
{Data1, AgentState1} = update_stream_progresses(
Data0, Agent, AgentState0, AgentStreamProgresses
),
AgentState3 =
case AgentState2 of
AgentState2 = update_agent_timeout(AgentState1),
{AgentState3, Data2} = clean_revoked_streams(
Data1, Agent, AgentState2, AgentStreamProgresses
),
AgentState4 =
case AgentState3 of
#{revoked_streams := []} ->
agent_transition_to_waiting_replaying(Data1, Agent, AgentState2);
agent_transition_to_waiting_replaying(Data1, Agent, AgentState3);
_ ->
AgentState2
AgentState3
end,
set_agent_state(Data2, Agent, AgentState3);
set_agent_state(Data2, Agent, AgentState4);
{?waiting_replaying, _, AgentVersion} ->
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
set_agent_state(Data1, Agent, AgentState1);
{Data1, AgentState1} = update_stream_progresses(
Data0, Agent, AgentState0, AgentStreamProgresses
),
AgentState2 = update_agent_timeout(AgentState1),
set_agent_state(Data1, Agent, AgentState2);
{?replaying, _, AgentVersion} ->
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
set_agent_state(Data1, Agent, AgentState1);
{Data1, AgentState1} = update_stream_progresses(
Data0, Agent, AgentState0, AgentStreamProgresses
),
AgentState2 = update_agent_timeout(AgentState1),
set_agent_state(Data1, Agent, AgentState2);
{OtherState, OtherVersionOld, OtherVersionNew} ->
?tp(warning, unexpected_update, #{
agent => Agent,
@ -798,9 +904,10 @@ desired_stream_count_per_agent(#{stream_progresses := StreamProgresses}, AgentCo
stream_progresses(#{stream_progresses := StreamProgresses} = _Data, Streams) ->
lists:map(
fun(Stream) ->
StreamData = maps:get(Stream, StreamProgresses),
#{
stream => Stream,
iterator => maps:get(Stream, StreamProgresses)
iterator => maps:get(iterator, StreamData)
}
end,
Streams

View File

@ -0,0 +1,115 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_leader_rank_progress).
-include_lib("emqx/include/logger.hrl").
-export([
init/0,
set_replayed/2,
add_streams/2
]).
%% "shard"
-type rank_x() :: emqx_ds:rank_x().
%% "generation"
-type rank_y() :: emqx_ds:rank_y().
%% shard progress
-type x_progress() :: #{
%% All streams with given rank_x and rank_y =< min_y are replayed.
min_y := rank_y(),
ys := #{
rank_y() => #{
emqx_ds:stream() => _IdReplayed :: boolean()
}
}
}.
-type t() :: #{
rank_x() => x_progress()
}.
-spec init() -> t().
init() -> #{}.
-spec set_replayed(emqx_ds:stream_rank(), t()) -> t().
set_replayed({{RankX, RankY}, Stream}, State) ->
case State of
#{RankX := #{ys := #{RankY := #{Stream := false} = RankYStreams} = Ys0}} ->
Ys1 = Ys0#{RankY => RankYStreams#{Stream => true}},
{MinY, Ys2} = update_min_y(maps:to_list(Ys1)),
State#{RankX => #{min_y => MinY, ys => Ys2}};
_ ->
?SLOG(
warning,
leader_rank_progress_double_or_invalid_update,
#{
rank_x => RankX,
rank_y => RankY,
state => State
}
),
State
end.
-spec add_streams([{emqx_ds:stream_rank(), emqx_ds:stream()}], t()) -> false | {true, t()}.
add_streams(StreamsWithRanks, State) ->
SortedStreamsWithRanks = lists:sort(
fun({{_RankX1, RankY1}, _Stream1}, {{_RankX2, RankY2}, _Stream2}) ->
RankY1 =< RankY2
end,
StreamsWithRanks
),
lists:foldl(
fun({Rank, Stream} = StreamWithRank, {StreamAcc, StateAcc0}) ->
case add_stream({Rank, Stream}, StateAcc0) of
{true, StateAcc1} ->
{[StreamWithRank | StreamAcc], StateAcc1};
false ->
{StreamAcc, StateAcc0}
end
end,
{[], State},
SortedStreamsWithRanks
).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
add_stream({{RankX, RankY}, Stream}, State0) ->
case State0 of
#{RankX := #{min_y := MinY}} when RankY =< MinY ->
false;
#{RankX := #{ys := #{RankY := #{Stream := true}}}} ->
false;
_ ->
XProgress = maps:get(RankX, State0, #{min_y => RankY - 1, ys => #{}}),
Ys0 = maps:get(ys, XProgress),
RankYStreams0 = maps:get(RankY, Ys0, #{}),
RankYStreams1 = RankYStreams0#{Stream => false},
Ys1 = Ys0#{RankY => RankYStreams1},
State1 = State0#{RankX => XProgress#{ys => Ys1}},
{true, State1}
end.
update_min_y([{RankY, RankYStreams} | Rest] = Ys) ->
case {has_unreplayed_streams(RankYStreams), Rest} of
{true, _} ->
{RankY, maps:from_list(Ys)};
{false, []} ->
{RankY - 1, #{}};
{false, _} ->
update_min_y(Rest)
end.
has_unreplayed_streams(RankYStreams) ->
lists:any(
fun(IsReplayed) -> not IsReplayed end,
maps:values(RankYStreams)
).