feat(queue): kick agents that do not return to the replaying state for long

This commit is contained in:
Ilya Averyanov 2024-07-04 21:24:21 +03:00
parent 1496f7f778
commit 649cf88042
1 changed files with 45 additions and 14 deletions

View File

@ -38,7 +38,7 @@
-define(updating, updating). -define(updating, updating).
-type agent_state() :: #{ -type agent_state() :: #{
%% Our view of group gm's status %% Our view of group sm's status
%% it lags the actual state %% it lags the actual state
state := ?waiting_replaying | ?replaying | ?waiting_updating | ?updating, state := ?waiting_replaying | ?replaying | ?waiting_updating | ?updating,
prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()), prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()),
@ -109,6 +109,7 @@
-define(DROP_TIMEOUT_INTERVAL, 1000). -define(DROP_TIMEOUT_INTERVAL, 1000).
-define(AGENT_TIMEOUT, 5000). -define(AGENT_TIMEOUT, 5000).
-define(MAX_NOT_REPLAYING, 5000).
-define(START_TIME_THRESHOLD, 5000). -define(START_TIME_THRESHOLD, 5000).
@ -535,13 +536,24 @@ disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) ->
%% Drop agents that stopped reporting progress %% Drop agents that stopped reporting progress
drop_timeout_agents(#{agents := Agents} = Data) -> drop_timeout_agents(#{agents := Agents} = Data) ->
Now = now_ms(), Now = now_ms_monotonic(),
lists:foldl( lists:foldl(
fun({Agent, #{update_deadline := Deadline} = _AgentState}, DataAcc) -> fun(
case Deadline < Now of {Agent,
#{update_deadline := UpdateDeadline, not_replaying_deadline := NoReplayingDeadline} =
_AgentState},
DataAcc
) ->
case
(UpdateDeadline < Now) orelse
(is_integer(NoReplayingDeadline) andalso NoReplayingDeadline < Now)
of
true -> true ->
?SLOG(info, #{ ?SLOG(info, #{
msg => leader_agent_timeout, msg => leader_agent_timeout,
now => Now,
update_deadline => UpdateDeadline,
not_replaying_deadline => NoReplayingDeadline,
agent => Agent agent => Agent
}), }),
drop_invalidate_agent(DataAcc, Agent); drop_invalidate_agent(DataAcc, Agent);
@ -805,11 +817,12 @@ agent_transition_to_waiting_updating(
prev_version => Version, prev_version => Version,
version => NewVersion version => NewVersion
}, },
AgentState2 = renew_no_replaying_deadline(AgentState1),
StreamProgresses = stream_progresses(Data, Streams), StreamProgresses = stream_progresses(Data, Streams),
ok = emqx_ds_shared_sub_proto:leader_update_streams( ok = emqx_ds_shared_sub_proto:leader_update_streams(
Agent, Group, Version, NewVersion, StreamProgresses Agent, Group, Version, NewVersion, StreamProgresses
), ),
AgentState1. AgentState2.
agent_transition_to_waiting_replaying( agent_transition_to_waiting_replaying(
#{group := Group} = _Data, Agent, #{state := OldState, version := Version} = AgentState0 #{group := Group} = _Data, Agent, #{state := OldState, version := Version} = AgentState0
@ -820,10 +833,11 @@ agent_transition_to_waiting_replaying(
new_state => ?waiting_replaying new_state => ?waiting_replaying
}), }),
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version), ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version),
AgentState0#{ AgentState1 = AgentState0#{
state => ?waiting_replaying, state => ?waiting_replaying,
revoked_streams => [] revoked_streams => []
}. },
renew_no_replaying_deadline(AgentState1).
agent_transition_to_initial_waiting_replaying( agent_transition_to_initial_waiting_replaying(
#{group := Group} = Data, Agent, AgentMetadata, InitialStreams #{group := Group} = Data, Agent, AgentMetadata, InitialStreams
@ -839,15 +853,16 @@ agent_transition_to_initial_waiting_replaying(
ok = emqx_ds_shared_sub_proto:leader_lease_streams( ok = emqx_ds_shared_sub_proto:leader_lease_streams(
Agent, Group, Leader, StreamProgresses, Version Agent, Group, Leader, StreamProgresses, Version
), ),
#{ AgentState = #{
metadata => AgentMetadata, metadata => AgentMetadata,
state => ?waiting_replaying, state => ?waiting_replaying,
version => Version, version => Version,
prev_version => undefined, prev_version => undefined,
streams => InitialStreams, streams => InitialStreams,
revoked_streams => [], revoked_streams => [],
update_deadline => now_ms() + ?AGENT_TIMEOUT update_deadline => now_ms_monotonic() + ?AGENT_TIMEOUT
}. },
renew_no_replaying_deadline(AgentState).
agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState) -> agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState) ->
?tp(warning, shared_sub_leader_agent_state_transition, #{ ?tp(warning, shared_sub_leader_agent_state_transition, #{
@ -857,16 +872,18 @@ agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState
}), }),
AgentState#{ AgentState#{
state => ?replaying, state => ?replaying,
prev_version => undefined prev_version => undefined,
not_replaying_deadline => undefined
}. }.
agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState) -> agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState0) ->
?tp(warning, shared_sub_leader_agent_state_transition, #{ ?tp(warning, shared_sub_leader_agent_state_transition, #{
agent => Agent, agent => Agent,
old_state => ?waiting_updating, old_state => ?waiting_updating,
new_state => ?updating new_state => ?updating
}), }),
AgentState#{state => ?updating}. AgentState1 = AgentState0#{state => ?updating},
renew_no_replaying_deadline(AgentState1).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
@ -878,6 +895,20 @@ gen_router_id() ->
now_ms() -> now_ms() ->
erlang:system_time(millisecond). erlang:system_time(millisecond).
now_ms_monotonic() ->
erlang:monotonic_time(millisecond).
renew_no_replaying_deadline(#{not_replaying_deadline := undefined} = AgentState) ->
AgentState#{
not_replaying_deadline => now_ms_monotonic() + ?MAX_NOT_REPLAYING
};
renew_no_replaying_deadline(#{not_replaying_deadline := _Deadline} = AgentState) ->
AgentState;
renew_no_replaying_deadline(#{} = AgentState) ->
AgentState#{
not_replaying_deadline => now_ms_monotonic() + ?MAX_NOT_REPLAYING
}.
unassigned_streams(#{stream_states := StreamStates, stream_owners := StreamOwners}) -> unassigned_streams(#{stream_states := StreamStates, stream_owners := StreamOwners}) ->
Streams = maps:keys(StreamStates), Streams = maps:keys(StreamStates),
AssignedStreams = maps:keys(StreamOwners), AssignedStreams = maps:keys(StreamOwners),
@ -960,7 +991,7 @@ set_agent_state(#{agents := Agents} = Data, Agent, AgentState) ->
update_agent_timeout(AgentState) -> update_agent_timeout(AgentState) ->
AgentState#{ AgentState#{
update_deadline => now_ms() + ?AGENT_TIMEOUT update_deadline => now_ms_monotonic() + ?AGENT_TIMEOUT
}. }.
get_agent_state(#{agents := Agents} = _Data, Agent) -> get_agent_state(#{agents := Agents} = _Data, Agent) ->