feat(queue): rearrange leader's code

This commit is contained in:
Ilya Averyanov 2024-07-10 22:25:13 +03:00
parent b8e8f7c8e0
commit 9307a82004
1 changed files with 85 additions and 94 deletions

View File

@ -198,7 +198,6 @@ handle_event({timeout, #renew_leases{}}, #renew_leases{}, ?leader_active, Data0)
{{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}}; {{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}};
%% drop_timeout timer %% drop_timeout timer
handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0) -> handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0) ->
% ?tp(warning, shared_sub_leader_timeout, #{timeout => drop_timeout}),
Data1 = drop_timeout_agents(Data0), Data1 = drop_timeout_agents(Data0),
{keep_state, Data1, {keep_state, Data1,
{{timeout, #drop_timeout{}}, ?dq_config(leader_drop_timeout_interval_ms), #drop_timeout{}}}; {{timeout, #drop_timeout{}}, ?dq_config(leader_drop_timeout_interval_ms), #drop_timeout{}}};
@ -207,7 +206,6 @@ handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0)
handle_event( handle_event(
info, ?agent_connect_leader_match(Agent, AgentMetadata, _TopicFilter), ?leader_active, Data0 info, ?agent_connect_leader_match(Agent, AgentMetadata, _TopicFilter), ?leader_active, Data0
) -> ) ->
% ?tp(warning, shared_sub_leader_connect_agent, #{agent => Agent}),
Data1 = connect_agent(Data0, Agent, AgentMetadata), Data1 = connect_agent(Data0, Agent, AgentMetadata),
{keep_state, Data1}; {keep_state, Data1};
handle_event( handle_event(
@ -216,7 +214,6 @@ handle_event(
?leader_active, ?leader_active,
Data0 Data0
) -> ) ->
% ?tp(warning, shared_sub_leader_update_stream_states, #{agent => Agent, version => Version}),
Data1 = with_agent(Data0, Agent, fun() -> Data1 = with_agent(Data0, Agent, fun() ->
update_agent_stream_states(Data0, Agent, StreamProgresses, Version) update_agent_stream_states(Data0, Agent, StreamProgresses, Version)
end), end),
@ -227,9 +224,6 @@ handle_event(
?leader_active, ?leader_active,
Data0 Data0
) -> ) ->
% ?tp(warning, shared_sub_leader_update_stream_states, #{
% agent => Agent, version_old => VersionOld, version_new => VersionNew
% }),
Data1 = with_agent(Data0, Agent, fun() -> Data1 = with_agent(Data0, Agent, fun() ->
update_agent_stream_states(Data0, Agent, StreamProgresses, VersionOld, VersionNew) update_agent_stream_states(Data0, Agent, StreamProgresses, VersionOld, VersionNew)
end), end),
@ -240,9 +234,6 @@ handle_event(
?leader_active, ?leader_active,
Data0 Data0
) -> ) ->
% ?tp(warning, shared_sub_leader_disconnect, #{
% agent => Agent, version => Version
% }),
Data1 = with_agent(Data0, Agent, fun() -> Data1 = with_agent(Data0, Agent, fun() ->
disconnect_agent(Data0, Agent, StreamProgresses, Version) disconnect_agent(Data0, Agent, StreamProgresses, Version)
end), end),
@ -463,6 +454,69 @@ select_streams_for_assign(Data0, _Agent, AssignCount) ->
UnassignedStreams = unassigned_streams(Data0), UnassignedStreams = unassigned_streams(Data0),
lists:sublist(shuffle(UnassignedStreams), AssignCount). lists:sublist(shuffle(UnassignedStreams), AssignCount).
%%--------------------------------------------------------------------
%% renew_leases - send lease confirmations to agents
renew_leases(#{agents := AgentStates} = Data) ->
?tp(warning, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}),
ok = lists:foreach(
fun({Agent, AgentState}) ->
renew_lease(Data, Agent, AgentState)
end,
maps:to_list(AgentStates)
),
Data.
renew_lease(#{group_id := GroupId}, Agent, #{state := ?replaying, version := Version}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version);
renew_lease(#{group_id := GroupId}, Agent, #{state := ?waiting_replaying, version := Version}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version);
renew_lease(#{group_id := GroupId} = Data, Agent, #{
streams := Streams, state := ?waiting_updating, version := Version, prev_version := PrevVersion
}) ->
StreamProgresses = stream_progresses(Data, Streams),
ok = emqx_ds_shared_sub_proto:leader_update_streams(
Agent, GroupId, PrevVersion, Version, StreamProgresses
),
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version);
renew_lease(#{group_id := GroupId}, Agent, #{
state := ?updating, version := Version, prev_version := PrevVersion
}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version).
%%--------------------------------------------------------------------
%% Drop agents that stopped reporting progress
drop_timeout_agents(#{agents := Agents} = Data) ->
Now = now_ms_monotonic(),
lists:foldl(
fun(
{Agent,
#{update_deadline := UpdateDeadline, not_replaying_deadline := NoReplayingDeadline} =
_AgentState},
DataAcc
) ->
case
(UpdateDeadline < Now) orelse
(is_integer(NoReplayingDeadline) andalso NoReplayingDeadline < Now)
of
true ->
?SLOG(info, #{
msg => leader_agent_timeout,
now => Now,
update_deadline => UpdateDeadline,
not_replaying_deadline => NoReplayingDeadline,
agent => Agent
}),
drop_invalidate_agent(DataAcc, Agent);
false ->
DataAcc
end
end,
Data,
maps:to_list(Agents)
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle a newly connected agent %% Handle a newly connected agent
@ -519,91 +573,6 @@ reconnect_agent(
Data2 = unassign_streams(Data1, OldRevokedStreams), Data2 = unassign_streams(Data1, OldRevokedStreams),
Data2. Data2.
%%--------------------------------------------------------------------
%% Disconnect agent gracefully
disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) ->
case get_agent_state(Data0, Agent) of
#{version := Version} ->
?tp(warning, shared_sub_leader_disconnect_agent, #{
agent => Agent,
version => Version
}),
Data1 = update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version),
Data2 = drop_agent(Data1, Agent),
Data2;
_ ->
?tp(warning, shared_sub_leader_unexpected_disconnect, #{
agent => Agent,
version => Version
}),
Data1 = drop_agent(Data0, Agent),
Data1
end.
%%--------------------------------------------------------------------
%% Drop agents that stopped reporting progress
drop_timeout_agents(#{agents := Agents} = Data) ->
Now = now_ms_monotonic(),
lists:foldl(
fun(
{Agent,
#{update_deadline := UpdateDeadline, not_replaying_deadline := NoReplayingDeadline} =
_AgentState},
DataAcc
) ->
case
(UpdateDeadline < Now) orelse
(is_integer(NoReplayingDeadline) andalso NoReplayingDeadline < Now)
of
true ->
?SLOG(info, #{
msg => leader_agent_timeout,
now => Now,
update_deadline => UpdateDeadline,
not_replaying_deadline => NoReplayingDeadline,
agent => Agent
}),
drop_invalidate_agent(DataAcc, Agent);
false ->
DataAcc
end
end,
Data,
maps:to_list(Agents)
).
%%--------------------------------------------------------------------
%% Send lease confirmations to agents
renew_leases(#{agents := AgentStates} = Data) ->
?tp(warning, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}),
ok = lists:foreach(
fun({Agent, AgentState}) ->
renew_lease(Data, Agent, AgentState)
end,
maps:to_list(AgentStates)
),
Data.
renew_lease(#{group_id := GroupId}, Agent, #{state := ?replaying, version := Version}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version);
renew_lease(#{group_id := GroupId}, Agent, #{state := ?waiting_replaying, version := Version}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version);
renew_lease(#{group_id := GroupId} = Data, Agent, #{
streams := Streams, state := ?waiting_updating, version := Version, prev_version := PrevVersion
}) ->
StreamProgresses = stream_progresses(Data, Streams),
ok = emqx_ds_shared_sub_proto:leader_update_streams(
Agent, GroupId, PrevVersion, Version, StreamProgresses
),
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version);
renew_lease(#{group_id := GroupId}, Agent, #{
state := ?updating, version := Version, prev_version := PrevVersion
}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle stream progress updates from agent in replaying state %% Handle stream progress updates from agent in replaying state
@ -801,6 +770,28 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers
drop_invalidate_agent(Data0, Agent) drop_invalidate_agent(Data0, Agent)
end. end.
%%--------------------------------------------------------------------
%% Disconnect agent gracefully
disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) ->
case get_agent_state(Data0, Agent) of
#{version := Version} ->
?tp(warning, shared_sub_leader_disconnect_agent, #{
agent => Agent,
version => Version
}),
Data1 = update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version),
Data2 = drop_agent(Data1, Agent),
Data2;
_ ->
?tp(warning, shared_sub_leader_unexpected_disconnect, #{
agent => Agent,
version => Version
}),
Data1 = drop_agent(Data0, Agent),
Data1
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Agent state transitions %% Agent state transitions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------