diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 510d6a45f..de277ece8 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -38,7 +38,7 @@ -define(updating, updating). -type agent_state() :: #{ - %% Our view of group gm's status + %% Our view of group sm's status %% it lags the actual state state := ?waiting_replaying | ?replaying | ?waiting_updating | ?updating, prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()), @@ -109,6 +109,7 @@ -define(DROP_TIMEOUT_INTERVAL, 1000). -define(AGENT_TIMEOUT, 5000). +-define(MAX_NOT_REPLAYING, 5000). -define(START_TIME_THRESHOLD, 5000). @@ -535,13 +536,24 @@ disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) -> %% Drop agents that stopped reporting progress drop_timeout_agents(#{agents := Agents} = Data) -> - Now = now_ms(), + Now = now_ms_monotonic(), lists:foldl( - fun({Agent, #{update_deadline := Deadline} = _AgentState}, DataAcc) -> - case Deadline < Now of + fun( + {Agent, + #{update_deadline := UpdateDeadline, not_replaying_deadline := NoReplayingDeadline} = + _AgentState}, + DataAcc + ) -> + case + (UpdateDeadline < Now) orelse + (is_integer(NoReplayingDeadline) andalso NoReplayingDeadline < Now) + of true -> ?SLOG(info, #{ msg => leader_agent_timeout, + now => Now, + update_deadline => UpdateDeadline, + not_replaying_deadline => NoReplayingDeadline, agent => Agent }), drop_invalidate_agent(DataAcc, Agent); @@ -805,11 +817,12 @@ agent_transition_to_waiting_updating( prev_version => Version, version => NewVersion }, + AgentState2 = renew_no_replaying_deadline(AgentState1), StreamProgresses = stream_progresses(Data, Streams), ok = emqx_ds_shared_sub_proto:leader_update_streams( Agent, Group, Version, NewVersion, StreamProgresses ), - AgentState1. + AgentState2. agent_transition_to_waiting_replaying( #{group := Group} = _Data, Agent, #{state := OldState, version := Version} = AgentState0 @@ -820,10 +833,11 @@ agent_transition_to_waiting_replaying( new_state => ?waiting_replaying }), ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version), - AgentState0#{ + AgentState1 = AgentState0#{ state => ?waiting_replaying, revoked_streams => [] - }. + }, + renew_no_replaying_deadline(AgentState1). agent_transition_to_initial_waiting_replaying( #{group := Group} = Data, Agent, AgentMetadata, InitialStreams @@ -839,15 +853,16 @@ agent_transition_to_initial_waiting_replaying( ok = emqx_ds_shared_sub_proto:leader_lease_streams( Agent, Group, Leader, StreamProgresses, Version ), - #{ + AgentState = #{ metadata => AgentMetadata, state => ?waiting_replaying, version => Version, prev_version => undefined, streams => InitialStreams, revoked_streams => [], - update_deadline => now_ms() + ?AGENT_TIMEOUT - }. + update_deadline => now_ms_monotonic() + ?AGENT_TIMEOUT + }, + renew_no_replaying_deadline(AgentState). agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState) -> ?tp(warning, shared_sub_leader_agent_state_transition, #{ @@ -857,16 +872,18 @@ agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState }), AgentState#{ state => ?replaying, - prev_version => undefined + prev_version => undefined, + not_replaying_deadline => undefined }. -agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState) -> +agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState0) -> ?tp(warning, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => ?waiting_updating, new_state => ?updating }), - AgentState#{state => ?updating}. + AgentState1 = AgentState0#{state => ?updating}, + renew_no_replaying_deadline(AgentState1). %%-------------------------------------------------------------------- %% Helper functions @@ -878,6 +895,20 @@ gen_router_id() -> now_ms() -> erlang:system_time(millisecond). +now_ms_monotonic() -> + erlang:monotonic_time(millisecond). + +renew_no_replaying_deadline(#{not_replaying_deadline := undefined} = AgentState) -> + AgentState#{ + not_replaying_deadline => now_ms_monotonic() + ?MAX_NOT_REPLAYING + }; +renew_no_replaying_deadline(#{not_replaying_deadline := _Deadline} = AgentState) -> + AgentState; +renew_no_replaying_deadline(#{} = AgentState) -> + AgentState#{ + not_replaying_deadline => now_ms_monotonic() + ?MAX_NOT_REPLAYING + }. + unassigned_streams(#{stream_states := StreamStates, stream_owners := StreamOwners}) -> Streams = maps:keys(StreamStates), AssignedStreams = maps:keys(StreamOwners), @@ -960,7 +991,7 @@ set_agent_state(#{agents := Agents} = Data, Agent, AgentState) -> update_agent_timeout(AgentState) -> AgentState#{ - update_deadline => now_ms() + ?AGENT_TIMEOUT + update_deadline => now_ms_monotonic() + ?AGENT_TIMEOUT }. get_agent_state(#{agents := Agents} = _Data, Agent) ->