diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 3a2081f1b..24b78155f 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -198,7 +198,6 @@ handle_event({timeout, #renew_leases{}}, #renew_leases{}, ?leader_active, Data0) {{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}}; %% drop_timeout timer handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0) -> - % ?tp(warning, shared_sub_leader_timeout, #{timeout => drop_timeout}), Data1 = drop_timeout_agents(Data0), {keep_state, Data1, {{timeout, #drop_timeout{}}, ?dq_config(leader_drop_timeout_interval_ms), #drop_timeout{}}}; @@ -207,7 +206,6 @@ handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0) handle_event( info, ?agent_connect_leader_match(Agent, AgentMetadata, _TopicFilter), ?leader_active, Data0 ) -> - % ?tp(warning, shared_sub_leader_connect_agent, #{agent => Agent}), Data1 = connect_agent(Data0, Agent, AgentMetadata), {keep_state, Data1}; handle_event( @@ -216,7 +214,6 @@ handle_event( ?leader_active, Data0 ) -> - % ?tp(warning, shared_sub_leader_update_stream_states, #{agent => Agent, version => Version}), Data1 = with_agent(Data0, Agent, fun() -> update_agent_stream_states(Data0, Agent, StreamProgresses, Version) end), @@ -227,9 +224,6 @@ handle_event( ?leader_active, Data0 ) -> - % ?tp(warning, shared_sub_leader_update_stream_states, #{ - % agent => Agent, version_old => VersionOld, version_new => VersionNew - % }), Data1 = with_agent(Data0, Agent, fun() -> update_agent_stream_states(Data0, Agent, StreamProgresses, VersionOld, VersionNew) end), @@ -240,9 +234,6 @@ handle_event( ?leader_active, Data0 ) -> - % ?tp(warning, shared_sub_leader_disconnect, #{ - % agent => Agent, version => Version - % }), Data1 = with_agent(Data0, Agent, fun() -> disconnect_agent(Data0, Agent, StreamProgresses, Version) end), @@ -463,6 +454,69 @@ select_streams_for_assign(Data0, _Agent, AssignCount) -> UnassignedStreams = unassigned_streams(Data0), lists:sublist(shuffle(UnassignedStreams), AssignCount). +%%-------------------------------------------------------------------- +%% renew_leases - send lease confirmations to agents + +renew_leases(#{agents := AgentStates} = Data) -> + ?tp(warning, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}), + ok = lists:foreach( + fun({Agent, AgentState}) -> + renew_lease(Data, Agent, AgentState) + end, + maps:to_list(AgentStates) + ), + Data. + +renew_lease(#{group_id := GroupId}, Agent, #{state := ?replaying, version := Version}) -> + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version); +renew_lease(#{group_id := GroupId}, Agent, #{state := ?waiting_replaying, version := Version}) -> + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version); +renew_lease(#{group_id := GroupId} = Data, Agent, #{ + streams := Streams, state := ?waiting_updating, version := Version, prev_version := PrevVersion +}) -> + StreamProgresses = stream_progresses(Data, Streams), + ok = emqx_ds_shared_sub_proto:leader_update_streams( + Agent, GroupId, PrevVersion, Version, StreamProgresses + ), + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version); +renew_lease(#{group_id := GroupId}, Agent, #{ + state := ?updating, version := Version, prev_version := PrevVersion +}) -> + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version). + +%%-------------------------------------------------------------------- +%% Drop agents that stopped reporting progress + +drop_timeout_agents(#{agents := Agents} = Data) -> + Now = now_ms_monotonic(), + lists:foldl( + fun( + {Agent, + #{update_deadline := UpdateDeadline, not_replaying_deadline := NoReplayingDeadline} = + _AgentState}, + DataAcc + ) -> + case + (UpdateDeadline < Now) orelse + (is_integer(NoReplayingDeadline) andalso NoReplayingDeadline < Now) + of + true -> + ?SLOG(info, #{ + msg => leader_agent_timeout, + now => Now, + update_deadline => UpdateDeadline, + not_replaying_deadline => NoReplayingDeadline, + agent => Agent + }), + drop_invalidate_agent(DataAcc, Agent); + false -> + DataAcc + end + end, + Data, + maps:to_list(Agents) + ). + %%-------------------------------------------------------------------- %% Handle a newly connected agent @@ -519,91 +573,6 @@ reconnect_agent( Data2 = unassign_streams(Data1, OldRevokedStreams), Data2. -%%-------------------------------------------------------------------- -%% Disconnect agent gracefully - -disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) -> - case get_agent_state(Data0, Agent) of - #{version := Version} -> - ?tp(warning, shared_sub_leader_disconnect_agent, #{ - agent => Agent, - version => Version - }), - Data1 = update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version), - Data2 = drop_agent(Data1, Agent), - Data2; - _ -> - ?tp(warning, shared_sub_leader_unexpected_disconnect, #{ - agent => Agent, - version => Version - }), - Data1 = drop_agent(Data0, Agent), - Data1 - end. - -%%-------------------------------------------------------------------- -%% Drop agents that stopped reporting progress - -drop_timeout_agents(#{agents := Agents} = Data) -> - Now = now_ms_monotonic(), - lists:foldl( - fun( - {Agent, - #{update_deadline := UpdateDeadline, not_replaying_deadline := NoReplayingDeadline} = - _AgentState}, - DataAcc - ) -> - case - (UpdateDeadline < Now) orelse - (is_integer(NoReplayingDeadline) andalso NoReplayingDeadline < Now) - of - true -> - ?SLOG(info, #{ - msg => leader_agent_timeout, - now => Now, - update_deadline => UpdateDeadline, - not_replaying_deadline => NoReplayingDeadline, - agent => Agent - }), - drop_invalidate_agent(DataAcc, Agent); - false -> - DataAcc - end - end, - Data, - maps:to_list(Agents) - ). - -%%-------------------------------------------------------------------- -%% Send lease confirmations to agents - -renew_leases(#{agents := AgentStates} = Data) -> - ?tp(warning, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}), - ok = lists:foreach( - fun({Agent, AgentState}) -> - renew_lease(Data, Agent, AgentState) - end, - maps:to_list(AgentStates) - ), - Data. - -renew_lease(#{group_id := GroupId}, Agent, #{state := ?replaying, version := Version}) -> - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version); -renew_lease(#{group_id := GroupId}, Agent, #{state := ?waiting_replaying, version := Version}) -> - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version); -renew_lease(#{group_id := GroupId} = Data, Agent, #{ - streams := Streams, state := ?waiting_updating, version := Version, prev_version := PrevVersion -}) -> - StreamProgresses = stream_progresses(Data, Streams), - ok = emqx_ds_shared_sub_proto:leader_update_streams( - Agent, GroupId, PrevVersion, Version, StreamProgresses - ), - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version); -renew_lease(#{group_id := GroupId}, Agent, #{ - state := ?updating, version := Version, prev_version := PrevVersion -}) -> - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version). - %%-------------------------------------------------------------------- %% Handle stream progress updates from agent in replaying state @@ -801,6 +770,28 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers drop_invalidate_agent(Data0, Agent) end. +%%-------------------------------------------------------------------- +%% Disconnect agent gracefully + +disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) -> + case get_agent_state(Data0, Agent) of + #{version := Version} -> + ?tp(warning, shared_sub_leader_disconnect_agent, #{ + agent => Agent, + version => Version + }), + Data1 = update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version), + Data2 = drop_agent(Data1, Agent), + Data2; + _ -> + ?tp(warning, shared_sub_leader_unexpected_disconnect, #{ + agent => Agent, + version => Version + }), + Data1 = drop_agent(Data0, Agent), + Data1 + end. + %%-------------------------------------------------------------------- %% Agent state transitions %%--------------------------------------------------------------------