feat(queue): implement full protocol between agent and leader

This commit is contained in:
Ilya Averyanov 2024-06-26 14:19:06 +03:00
parent c831f0772f
commit 082514f557
6 changed files with 933 additions and 182 deletions

View File

@ -123,10 +123,12 @@ on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
Progress = fold_shared_stream_states(
fun(TopicFilter, Stream, SRS, Acc) ->
#srs{it_begin = BeginIt} = SRS,
StreamProgress = #{
topic_filter => TopicFilter,
stream => Stream,
iterator => BeginIt
iterator => BeginIt,
use_finished => is_use_finished(S, SRS)
},
[StreamProgress | Acc]
end,
@ -336,3 +338,6 @@ agent_opts(#{session_id := SessionId}) ->
-dialyzer({nowarn_function, now_ms/0}).
now_ms() ->
erlang:system_time(millisecond).
is_use_finished(S, #srs{unsubscribed = Unsubscribed} = SRS) ->
Unsubscribed andalso emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S).

View File

@ -56,20 +56,30 @@ on_unsubscribe(State, TopicFilter) ->
renew_streams(#{} = State) ->
fetch_stream_events(State).
on_stream_progress(State, _StreamProgress) ->
%% TODO https://emqx.atlassian.net/browse/EMQX-12572
%% Send to leader
State.
on_stream_progress(State, StreamProgresses) ->
ProgressesByGroup = stream_progresses_by_group(StreamProgresses),
lists:foldl(
fun({Group, GroupProgresses}, StateAcc) ->
with_group_sm(StateAcc, Group, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_stream_progress(GSM, GroupProgresses)
end)
end,
State,
maps:to_list(ProgressesByGroup)
).
on_info(State, ?leader_lease_streams_match(Group, StreamProgresses, Version)) ->
on_info(State, ?leader_lease_streams_match(Group, Leader, StreamProgresses, Version)) ->
?SLOG(info, #{
msg => leader_lease_streams,
group => Group,
streams => StreamProgresses,
version => Version
version => Version,
leader => Leader
}),
with_group_sm(State, Group, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_lease_streams(GSM, StreamProgresses, Version)
emqx_ds_shared_sub_group_sm:handle_leader_lease_streams(
GSM, Leader, StreamProgresses, Version
)
end);
on_info(State, ?leader_renew_stream_lease_match(Group, Version)) ->
?SLOG(info, #{
@ -80,6 +90,37 @@ on_info(State, ?leader_renew_stream_lease_match(Group, Version)) ->
with_group_sm(State, Group, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version)
end);
on_info(State, ?leader_renew_stream_lease_match(Group, VersionOld, VersionNew)) ->
?SLOG(info, #{
msg => leader_renew_stream_lease,
group => Group,
version_old => VersionOld,
version_new => VersionNew
}),
with_group_sm(State, Group, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew)
end);
on_info(State, ?leader_update_streams_match(Group, VersionOld, VersionNew, StreamsNew)) ->
?SLOG(info, #{
msg => leader_update_streams,
group => Group,
version_old => VersionOld,
version_new => VersionNew,
streams_new => StreamsNew
}),
with_group_sm(State, Group, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_update_streams(
GSM, VersionOld, VersionNew, StreamsNew
)
end);
on_info(State, ?leader_invalidate_match(Group)) ->
?SLOG(info, #{
msg => leader_invalidate,
group => Group
}),
with_group_sm(State, Group, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_invalidate(GSM)
end);
%% Generic messages sent by group_sm's to themselves (timeouts).
on_info(State, #message_to_group_sm{group = Group, message = Message}) ->
with_group_sm(State, Group, fun(GSM) ->
@ -156,3 +197,20 @@ with_group_sm(State, Group, Fun) ->
%% Error?
State
end.
stream_progresses_by_group(StreamProgresses) ->
lists:foldl(
fun(#{topic_filter := #share{group = Group}} = Progress0, Acc) ->
Progress1 = maps:remove(topic_filter, Progress0),
maps:update_with(
Group,
fun(GroupStreams0) ->
[Progress1 | GroupStreams0]
end,
[Progress1],
Acc
)
end,
#{},
StreamProgresses
).

View File

@ -16,14 +16,24 @@
new/1,
%% Leader messages
handle_leader_lease_streams/3,
handle_leader_lease_streams/4,
handle_leader_renew_stream_lease/2,
handle_leader_renew_stream_lease/3,
handle_leader_update_streams/4,
handle_leader_invalidate/1,
%% Self-initiated messages
handle_info/2,
%% API
fetch_stream_events/1
fetch_stream_events/1,
handle_stream_progress/2
]).
-export_type([
group_sm/0,
options/0,
state/0
]).
-type options() :: #{
@ -32,7 +42,31 @@
send_after := fun((non_neg_integer(), term()) -> reference())
}.
%% Subscription states
-type stream_lease_event() ::
#{
type => lease,
stream => emqx_ds:stream(),
iterator => emqx_ds:iterator()
}
| #{
type => revoke,
stream => emqx_ds:stream()
}.
-type external_lease_event() ::
#{
type => lease,
stream => emqx_ds:stream(),
iterator => emqx_ds:iterator(),
topic_filter => emqx_persistent_session_ds:share_topic_filter()
}
| #{
type => revoke,
stream => emqx_ds:stream(),
topic_filter => emqx_persistent_session_ds:share_topic_filter()
}.
%% GroupSM States
-define(connecting, connecting).
-define(replaying, replaying).
@ -40,26 +74,47 @@
-type state() :: ?connecting | ?replaying | ?updating.
-type group_sm() :: #{
topic_filter => emqx_persistent_session_ds:share_topic_filter(),
agent => emqx_ds_shared_sub_proto:agent(),
send_after => fun((non_neg_integer(), term()) -> reference()),
state => state(),
state_data => map(),
state_timers => map()
-type connecting_data() :: #{}.
-type replaying_data() :: #{
leader => emqx_ds_shared_sub_proto:leader(),
streams => #{emqx_ds:stream() => emqx_ds:iterator()},
version => emqx_ds_shared_sub_proto:version(),
prev_version => undefined
}.
-type updating_data() :: #{
leader => emqx_ds_shared_sub_proto:leader(),
streams => #{emqx_ds:stream() => emqx_ds:iterator()},
version => emqx_ds_shared_sub_proto:version(),
prev_version => emqx_ds_shared_sub_proto:version()
}.
-type state_data() :: connecting_data() | replaying_data() | updating_data().
-record(state_timeout, {
id :: reference(),
name :: atom(),
message :: term()
}).
-record(timer, {
ref :: reference(),
id :: reference()
}).
-type timer_name() :: atom().
-type timer() :: #timer{}.
-type group_sm() :: #{
topic_filter => emqx_persistent_session_ds:share_topic_filter(),
agent => emqx_ds_shared_sub_proto:agent(),
send_after => fun((non_neg_integer(), term()) -> reference()),
stream_lease_events => list(stream_lease_event()),
state => state(),
state_data => state_data(),
state_timers => #{timer_name() => timer()}
}.
%%-----------------------------------------------------------------------
%% Constants
%%-----------------------------------------------------------------------
@ -94,11 +149,12 @@ new(#{
},
transition(GSM0, ?connecting, #{}).
-spec fetch_stream_events(group_sm()) -> {group_sm(), list(external_lease_event())}.
fetch_stream_events(
#{
state := ?replaying,
state := _State,
topic_filter := TopicFilter,
state_data := #{stream_lease_events := Events0} = Data
stream_lease_events := Events0
} = GSM
) ->
Events1 = lists:map(
@ -107,14 +163,7 @@ fetch_stream_events(
end,
Events0
),
{
GSM#{
state_data => Data#{stream_lease_events => []}
},
Events1
};
fetch_stream_events(GSM) ->
{GSM, []}.
{GSM#{stream_lease_events => []}, Events1}.
%%-----------------------------------------------------------------------
%% Event Handlers
@ -128,37 +177,23 @@ handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) ->
ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT).
handle_leader_lease_streams(
#{state := ?connecting, topic_filter := TopicFilter} = GSM0, StreamProgresses, Version
#{state := ?connecting, topic_filter := TopicFilter} = GSM0, Leader, StreamProgresses, Version
) ->
?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}),
Streams = lists:foldl(
fun(#{stream := Stream, iterator := It}, Acc) ->
Acc#{Stream => It}
end,
#{},
StreamProgresses
),
StreamLeaseEvents = lists:map(
fun(#{stream := Stream, iterator := It}) ->
#{
type => lease,
stream => Stream,
iterator => It
}
end,
StreamProgresses
),
Streams = progresses_to_map(StreamProgresses),
StreamLeaseEvents = progresses_to_lease_events(StreamProgresses),
transition(
GSM0,
?replaying,
#{
leader => Leader,
streams => Streams,
stream_lease_events => StreamLeaseEvents,
prev_version => undefined,
version => Version
}
},
StreamLeaseEvents
);
handle_leader_lease_streams(GSM, _StreamProgresses, _Version) ->
handle_leader_lease_streams(GSM, _Leader, _StreamProgresses, _Version) ->
GSM.
handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) ->
@ -172,13 +207,6 @@ handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0
handle_replaying(GSM) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT).
handle_leader_renew_stream_lease(
#{state := ?replaying, state_data := #{version := Version}} = GSM, Version
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
handle_leader_renew_stream_lease(GSM, _Version) ->
GSM.
handle_renew_lease_timeout(GSM) ->
?tp(debug, renew_lease_timeout, #{}),
transition(GSM, ?connecting, #{}).
@ -187,8 +215,140 @@ handle_renew_lease_timeout(GSM) ->
%% Updating state
handle_updating(GSM) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT).
%%-----------------------------------------------------------------------
%% Common handlers
handle_leader_update_streams(
#{
state := ?replaying,
stream_data := #{streams := Streams0, version := VersionOld} = StateData
} = GSM,
VersionOld,
VersionNew,
StreamProgresses
) ->
{AddEvents, Streams1} = lists:foldl(
fun(#{stream := Stream, iterator := It}, {AddEventAcc, StreamsAcc}) ->
case maps:is_key(Stream, StreamsAcc) of
true ->
%% We prefer our own progress
{AddEventAcc, StreamsAcc};
false ->
{
[#{type => lease, stream => Stream, iterator => It} | AddEventAcc],
StreamsAcc#{Stream => It}
}
end
end,
{[], Streams0},
StreamProgresses
),
NewStreamMap = progresses_to_map(StreamProgresses),
{RevokeEvents, Streams2} = lists:foldl(
fun(Stream, {RevokeEventAcc, StreamsAcc}) ->
case maps:is_key(Stream, NewStreamMap) of
true ->
{RevokeEventAcc, StreamsAcc};
false ->
{
[#{type => revoke, stream => Stream} | RevokeEventAcc],
maps:remove(Stream, StreamsAcc)
}
end
end,
{[], Streams1},
maps:keys(Streams1)
),
StreamLeaseEvents = AddEvents ++ RevokeEvents,
transition(
GSM,
?updating,
StateData#{
streams => Streams2,
prev_version => VersionOld,
version => VersionNew
},
StreamLeaseEvents
);
handle_leader_update_streams(
#{
state := ?updating,
stream_data := #{version := VersionNew} = _StreamData
} = GSM,
_VersionOld,
VersionNew,
_StreamProgresses
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
handle_leader_update_streams(GSM, _VersionOld, _VersionNew, _StreamProgresses) ->
%% Unexpected versions or state
transition(GSM, ?connecting, #{}).
handle_leader_renew_stream_lease(
#{state := ?replaying, state_data := #{version := Version}} = GSM, Version
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
handle_leader_renew_stream_lease(
#{state := ?updating, state_data := #{version := Version} = StateData} = GSM, Version
) ->
transition(
GSM,
?replaying,
StateData#{prev_version => undefined}
);
handle_leader_renew_stream_lease(GSM, _Version) ->
GSM.
handle_leader_renew_stream_lease(
#{state := ?replaying, state_data := #{version := Version}} = GSM, VersionOld, VersionNew
) when VersionOld =:= Version orelse VersionNew =:= Version ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
handle_leader_renew_stream_lease(
#{state := ?updating, state_data := #{version := VersionNew, prev_version := VersionOld}} = GSM,
VersionOld,
VersionNew
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
handle_leader_renew_stream_lease(GSM, _VersionOld, _VersionNew) ->
transition(GSM, ?connecting, #{}).
handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) ->
GSM;
handle_stream_progress(
#{
state := ?replaying,
state_data := #{
agent := Agent,
leader := Leader,
version := Version
}
} = _GSM,
StreamProgresses
) ->
ok = emqx_ds_shared_sub_proto:agent_update_stream_states(
Leader, Agent, StreamProgresses, Version
);
handle_stream_progress(
#{
state := ?updating,
state_data := #{
agent := Agent,
leader := Leader,
version := Version,
prev_version := PrevVersion
}
} = _GSM,
StreamProgresses
) ->
ok = emqx_ds_shared_sub_proto:agent_update_stream_states(
Leader, Agent, StreamProgresses, PrevVersion, Version
).
handle_leader_invalidate(GSM) ->
transition(GSM, ?connecting, #{}).
%%-----------------------------------------------------------------------
%% Internal API
%%-----------------------------------------------------------------------
@ -225,6 +385,9 @@ handle_info(GSM, _Info) ->
%%--------------------------------------------------------------------
transition(GSM0, NewState, NewStateData) ->
transition(GSM0, NewState, NewStateData, []).
transition(GSM0, NewState, NewStateData, LeaseEvents) ->
Timers = maps:get(state_timers, GSM0, #{}),
TimerNames = maps:keys(Timers),
GSM1 = lists:foldl(
@ -237,7 +400,8 @@ transition(GSM0, NewState, NewStateData) ->
GSM2 = GSM1#{
state => NewState,
state_data => NewStateData,
state_timers => #{}
state_timers => #{},
stream_lease_events => LeaseEvents
},
run_enter_callback(GSM2).
@ -280,3 +444,24 @@ run_enter_callback(#{state := ?replaying} = GSM) ->
handle_replaying(GSM);
run_enter_callback(#{state := ?updating} = GSM) ->
handle_updating(GSM).
progresses_to_lease_events(StreamProgresses) ->
lists:map(
fun(#{stream := Stream, iterator := It}) ->
#{
type => lease,
stream => Stream,
iterator => It
}
end,
StreamProgresses
).
progresses_to_map(StreamProgresses) ->
lists:foldl(
fun(#{stream := Stream, iterator := It}, Acc) ->
Acc#{Stream => It}
end,
#{},
StreamProgresses
).

View File

@ -6,10 +6,12 @@
-behaviour(gen_statem).
-include("emqx_ds_shared_sub_proto.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_persistent_message.hrl").
-include("emqx_ds_shared_sub_proto.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-export([
register/2,
@ -28,10 +30,21 @@
topic_filter := emqx_persistent_session_ds:share_topic_filter()
}.
-type stream_assignment() :: #{
%% Agent states
-define(waiting_replaying, waiting_replaying).
-define(replaying, replaying).
-define(waiting_updating, waiting_updating).
-define(updating, updating).
-type agent_state() :: #{
%% Our view of group gm's status
%% it lags the actual state
state := emqx_ds_shared_sub_agent:status(),
prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()),
version := emqx_ds_shared_sub_proto:version(),
streams := list(emqx_ds:stream())
streams := list(emqx_ds:stream()),
revoked_streams := list(emqx_ds:stream())
}.
-type data() :: #{
@ -46,10 +59,10 @@
stream_progresses := #{
emqx_ds:stream() => emqx_ds:iterator()
},
agent_stream_assignments := #{
emqx_ds_shared_sub_proto:agent() => stream_assignment()
agents := #{
emqx_ds_shared_sub_proto:agent() => agent_state()
},
stream_assignments := #{
stream_owners := #{
emqx_ds:stream() => emqx_ds_shared_sub_proto:agent()
}
}.
@ -61,8 +74,8 @@
%% States
-define(waiting_registration, waiting_registration).
-define(replaying, replaying).
-define(leader_waiting_registration, leader_waiting_registration).
-define(leader_replaying, leader_replaying).
%% Events
@ -71,13 +84,17 @@
}).
-record(renew_streams, {}).
-record(renew_leases, {}).
-record(drop_timeout, {}).
%% Constants
%% TODO https://emqx.atlassian.net/browse/EMQX-12574
%% Move to settings
-define(RENEW_LEASE_INTERVAL, 5000).
-define(RENEW_STREAMS_INTERVAL, 5000).
-define(RENEW_LEASE_INTERVAL, 1000).
-define(RENEW_STREAMS_INTERVAL, 1000).
-define(DROP_TIMEOUT_INTERVAL, 1000).
-define(AGENT_TIMEOUT, 5000).
%%--------------------------------------------------------------------
%% API
@ -115,17 +132,17 @@ init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) ->
Data = #{
group => Group,
topic => Topic,
router_id => router_id(),
router_id => gen_router_id(),
stream_progresses => #{},
stream_assignments => #{},
agent_stream_assignments => #{}
stream_owners => #{},
agents => #{}
},
{ok, ?waiting_registration, Data}.
{ok, ?leader_waiting_registration, Data}.
%%--------------------------------------------------------------------
%% waiting_registration state
handle_event({call, From}, #register{register_fun = Fun}, ?waiting_registration, Data) ->
handle_event({call, From}, #register{register_fun = Fun}, ?leader_waiting_registration, Data) ->
Self = self(),
case Fun() of
Self ->
@ -135,25 +152,44 @@ handle_event({call, From}, #register{register_fun = Fun}, ?waiting_registration,
end;
%%--------------------------------------------------------------------
%% repalying state
handle_event(enter, _OldState, ?replaying, #{topic := Topic, router_id := RouterId} = _Data) ->
handle_event(enter, _OldState, ?leader_replaying, #{topic := Topic, router_id := RouterId} = _Data) ->
ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId),
{keep_state_and_data, [
{state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}},
{state_timeout, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}},
{state_timeout, 0, #renew_streams{}}
]};
handle_event(state_timeout, #renew_streams{}, ?replaying, Data0) ->
handle_event(state_timeout, #renew_streams{}, ?leader_replaying, Data0) ->
Data1 = renew_streams(Data0),
{keep_state, Data1, {state_timeout, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}};
handle_event(state_timeout, #renew_leases{}, ?replaying, Data0) ->
handle_event(state_timeout, #renew_leases{}, ?leader_replaying, Data0) ->
Data1 = renew_leases(Data0),
{keep_state, Data1, {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}};
handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?replaying, Data0) ->
handle_event(state_timeout, #drop_timeout{}, ?leader_replaying, Data0) ->
Data1 = drop_timeout_agents(Data0),
{keep_state, Data1, {state_timeout, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}};
handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?leader_replaying, Data0) ->
Data1 = connect_agent(Data0, Agent),
{keep_state, Data1};
handle_event(
info, ?agent_update_stream_states_match(Agent, StreamProgresses, Version), ?replaying, Data0
info,
?agent_update_stream_states_match(Agent, StreamProgresses, Version),
?leader_replaying,
Data0
) ->
Data1 = update_agent_stream_states(Data0, Agent, StreamProgresses, Version),
Data1 = with_agent(Data0, Agent, fun() ->
update_agent_stream_states(Data0, Agent, StreamProgresses, Version)
end),
{keep_state, Data1};
handle_event(
info,
?agent_update_stream_states_match(Agent, StreamProgresses, VersionOld, VersionNew),
?leader_replaying,
Data0
) ->
Data1 = with_agent(Data0, Agent, fun() ->
update_agent_stream_states(Data0, Agent, StreamProgresses, VersionOld, VersionNew)
end),
{keep_state, Data1};
%%--------------------------------------------------------------------
%% fallback
@ -172,9 +208,16 @@ terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) ->
ok.
%%--------------------------------------------------------------------
%% Internal functions
%% Event handlers
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Renew streams
%% * Find new streams in DS
%% * Revoke streams from agents having too many streams
%% * Assign streams to agents having too few streams
renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) ->
TopicFilter = emqx_topic:words(Topic),
StartTime = now_ms(),
@ -198,25 +241,109 @@ renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) ->
Progresses,
Streams
),
%% TODO https://emqx.atlassian.net/browse/EMQX-12572
%% Initiate reassigment
Data1 = Data0#{stream_progresses => NewProgresses},
?SLOG(info, #{
msg => leader_renew_streams,
topic_filter => TopicFilter,
streams => length(Streams)
}),
Data0#{stream_progresses => NewProgresses}.
Data2 = revoke_streams(Data1),
Data3 = assign_streams(Data2),
Data3.
%% TODO https://emqx.atlassian.net/browse/EMQX-12572
%% This just gives unassigned streams to the connecting agent,
%% we need to implement actual stream (re)assignment.
connect_agent(
%% We revoke streams from agents that have too many streams (> desired_streams_per_agent).
%% We revoke only from replaying agents.
%% After revoking, no unassigned streams appear. Streams will become unassigned
%% only after agents report them as acked and unsubscribed.
revoke_streams(Data0) ->
DesiredStreamsPerAgent = desired_streams_per_agent(Data0),
Agents = replaying_agents(Data0),
lists:foldl(
fun(Agent, DataAcc) ->
revoke_excess_streams_from_agent(DataAcc, Agent, DesiredStreamsPerAgent)
end,
Data0,
Agents
).
revoke_excess_streams_from_agent(Data0, Agent, DesiredCount) ->
#{streams := Streams0, revoked_streams := []} = AgentState0 = get_agent_state(Data0, Agent),
RevokeCount = length(Streams0) - DesiredCount,
AgentState1 =
case RevokeCount > 0 of
false ->
AgentState0;
true ->
revoke_streams_from_agent(Data0, Agent, AgentState0, RevokeCount)
end,
set_agent_state(Data0, Agent, AgentState1).
revoke_streams_from_agent(
Data,
Agent,
#{
group := Group,
agent_stream_assignments := AgentStreamAssignments0,
stream_assignments := StreamAssignments0,
stream_progresses := StreamProgresses
} = Data0,
streams := Streams0, revoked_streams := []
} = AgentState0,
RevokeCount
) ->
RevokedStreams = select_streams_for_revoke(Data, AgentState0, RevokeCount),
Streams = Streams0 -- RevokedStreams,
agent_transition_to_waiting_updating(Data, Agent, AgentState0, Streams, RevokedStreams).
select_streams_for_revoke(
_Data, #{streams := Streams, revoked_streams := []} = _AgentState, RevokeCount
) ->
%% TODO
%% Some intellectual logic should be used regarding:
%% * shard ids (better spread shards across different streams);
%% * stream stats (how much data was replayed from stream,
%% heavy streams should be distributed across different agents);
%% * data locality (agents better preserve streams with data available on the agent's node)
lists:sublist(shuffle(Streams), RevokeCount).
%% We assign streams to agents that have too few streams (< desired_streams_per_agent).
%% We assign only to replaying agents.
assign_streams(Data0) ->
DesiredStreamsPerAgent = desired_streams_per_agent(Data0),
Agents = replaying_agents(Data0),
lists:foldl(
fun(Agent, DataAcc) ->
assign_lacking_streams(DataAcc, Agent, DesiredStreamsPerAgent)
end,
Data0,
Agents
).
assign_lacking_streams(Data0, Agent, DesiredCount) ->
#{streams := Streams0, revoked_streams := []} = get_agent_state(Data0, Agent),
AssignCount = DesiredCount - length(Streams0),
case AssignCount > 0 of
false ->
Data0;
true ->
assign_streams_to_agent(Data0, Agent, AssignCount)
end.
assign_streams_to_agent(Data0, Agent, AssignCount) ->
StreamsToAssign = select_streams_for_assign(Data0, Agent, AssignCount),
Data1 = set_stream_ownership_to_agent(Data0, Agent, StreamsToAssign),
#{agents := #{Agent := AgentState0}} = Data1,
#{streams := Streams0, revoked_streams := []} = AgentState0,
Streams1 = Streams0 ++ StreamsToAssign,
AgentState1 = agent_transition_to_waiting_updating(Data0, Agent, AgentState0, Streams1, []),
set_agent_state(Data1, Agent, AgentState1).
select_streams_for_assign(Data0, _Agent, AssignCount) ->
%% TODO
%% Some intellectual logic should be used. See `select_streams_for_revoke/3`.
UnassignedStreams = unassigned_streams(Data0),
lists:sublist(shuffle(UnassignedStreams), AssignCount).
%%--------------------------------------------------------------------
%% Handle a newly connected agent
connect_agent(
#{group := Group} = Data,
Agent
) ->
?SLOG(info, #{
@ -224,103 +351,382 @@ connect_agent(
agent => Agent,
group => Group
}),
{AgentStreamAssignments, StreamAssignments} =
case AgentStreamAssignments0 of
#{Agent := _} ->
{AgentStreamAssignments0, StreamAssignments0};
_ ->
UnassignedStreams = unassigned_streams(Data0),
Version = 0,
StreamAssignment = #{
prev_version => undefined,
version => Version,
streams => UnassignedStreams
},
AgentStreamAssignments1 = AgentStreamAssignments0#{Agent => StreamAssignment},
StreamAssignments1 = lists:foldl(
fun(Stream, Acc) ->
Acc#{Stream => Agent}
end,
StreamAssignments0,
UnassignedStreams
),
StreamLease = lists:map(
fun(Stream) ->
#{
stream => Stream,
iterator => maps:get(Stream, StreamProgresses)
}
end,
UnassignedStreams
),
?SLOG(info, #{
msg => leader_lease_streams,
agent => Agent,
group => Group,
streams => length(StreamLease),
version => Version
}),
ok = emqx_ds_shared_sub_proto:leader_lease_streams(
Agent, Group, StreamLease, Version
),
{AgentStreamAssignments1, StreamAssignments1}
end,
Data0#{
agent_stream_assignments => AgentStreamAssignments, stream_assignments => StreamAssignments
}.
DesiredCount = desired_streams_per_agent(Data),
assign_initial_streams_to_agent(Data, Agent, DesiredCount).
renew_leases(#{group := Group, agent_stream_assignments := AgentStreamAssignments} = Data) ->
ok = lists:foreach(
fun({Agent, #{version := Version}}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version)
assign_initial_streams_to_agent(Data, Agent, AssignCount) ->
InitialStreamsToAssign = select_streams_for_assign(Data, Agent, AssignCount),
Data1 = set_stream_ownership_to_agent(Data, Agent, InitialStreamsToAssign),
AgentState = agent_transition_to_initial_waiting_replaying(
Data1, Agent, InitialStreamsToAssign
),
set_agent_state(Data1, Agent, AgentState).
%%--------------------------------------------------------------------
%% Drop agents that stopped reporting progress
drop_timeout_agents(#{agents := Agents} = Data) ->
Now = now_ms(),
lists:foldl(
fun({Agent, #{update_deadline := Deadline} = _AgentState}, DataAcc) ->
case Deadline < Now of
true ->
?SLOG(info, #{
msg => leader_agent_timeout,
agent => Agent
}),
drop_invalidate_agent(DataAcc, Agent);
false ->
DataAcc
end
end,
maps:to_list(AgentStreamAssignments)
Data,
maps:to_list(Agents)
).
%%--------------------------------------------------------------------
%% Send lease confirmations to agents
renew_leases(#{agents := AgentStates} = Data) ->
ok = lists:foreach(
fun({Agent, AgentState}) ->
renew_lease(Data, Agent, AgentState)
end,
maps:to_list(AgentStates)
),
Data.
update_agent_stream_states(
#{
agent_stream_assignments := AgentStreamAssignments,
stream_assignments := StreamAssignments,
stream_progresses := StreamProgresses0
} = Data0,
Agent,
AgentStreamProgresses,
Version
) ->
AgentVersion = emqx_utils_maps:deep_get([Agent, version], AgentStreamAssignments, undefined),
AgentPrevVersion = emqx_utils_maps:deep_get(
[Agent, prev_version], AgentStreamAssignments, undefined
renew_lease(#{group := Group}, Agent, #{state := ?replaying, version := Version}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version);
renew_lease(#{group := Group}, Agent, #{state := ?waiting_replaying, version := Version}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version);
renew_lease(#{group := Group} = Data, Agent, #{
streams := Streams, state := ?waiting_updating, version := Version, prev_version := PrevVersion
}) ->
StreamProgresses = stream_progresses(Data, Streams),
ok = emqx_ds_shared_sub_proto:leader_update_streams(
Agent, Group, PrevVersion, Version, StreamProgresses
),
case AgentVersion == Version orelse AgentPrevVersion == Version of
false ->
%% TODO https://emqx.atlassian.net/browse/EMQX-12572
%% send invalidate to agent
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version, PrevVersion);
renew_lease(#{group := Group}, Agent, #{
state := ?updating, version := Version, prev_version := PrevVersion
}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version, PrevVersion).
%%--------------------------------------------------------------------
%% Handle stream progress updates from agent in replaying state
update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version) ->
#{state := State, version := AgentVersion, prev_version := AgentPrevVersion} =
AgentState0 = get_agent_state(Data0, Agent),
case {State, Version} of
{?waiting_updating, AgentPrevVersion} ->
%% Stale update, ignoring
Data0;
true ->
StreamProgresses1 = lists:foldl(
fun(#{stream := Stream, iterator := It}, ProgressesAcc) ->
%% Assert Stream is assigned to Agent
Agent = maps:get(Stream, StreamAssignments),
ProgressesAcc#{Stream => It}
end,
StreamProgresses0,
AgentStreamProgresses
),
Data0#{stream_progresses => StreamProgresses1}
{?waiting_replaying, AgentVersion} ->
%% Agent finished updating, now replaying
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
AgentState2 = agent_transition_to_replaying(AgentState1),
set_agent_state(Data1, Agent, AgentState2);
{?replaying, AgentVersion} ->
%% Common case, agent is replaying
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
set_agent_state(Data1, Agent, AgentState1);
{OtherState, OtherVersion} ->
?tp(warning, unexpected_update, #{
agent => Agent,
update_version => OtherVersion,
state => OtherState,
our_agent_version => AgentVersion,
our_agent_prev_version => AgentPrevVersion
}),
drop_invalidate_agent(Data0, Agent)
end.
update_stream_progresses(
#{stream_progresses := StreamProgresses0, stream_owners := StreamOwners} = Data,
Agent,
ReceivedStreamProgresses
) ->
StreamProgresses1 = lists:foldl(
fun(#{stream := Stream, iterator := It}, ProgressesAcc) ->
case StreamOwners of
#{Stream := Agent} ->
ProgressesAcc#{Stream => It};
_ ->
ProgressesAcc
end
end,
StreamProgresses0,
ReceivedStreamProgresses
),
Data#{
stream_progresses => StreamProgresses1
}.
clean_revoked_streams(
Data0, #{revoked_streams := RevokedStreams0} = AgentState0, ReceivedStreamProgresses
) ->
FinishedReportedStreams = maps:from_list(
lists:filtermap(
fun
(
#{
stream := Stream,
use_finished := true
}
) ->
{true, {Stream, true}};
(_) ->
false
end,
ReceivedStreamProgresses
)
),
{FinishedStreams, StillRevokingStreams} = lists:partition(
fun(Stream) ->
maps:is_key(Stream, FinishedReportedStreams)
end,
RevokedStreams0
),
Data1 = unassign_streams(Data0, FinishedStreams),
AgentState1 = AgentState0#{revoked_streams => StillRevokingStreams},
{AgentState1, Data1}.
unassign_streams(#{stream_owners := StreamOwners0} = Data, Streams) ->
StreamOwners1 = lists:foldl(
fun(Stream, StreamOwnersAcc) ->
maps:remove(Stream, StreamOwnersAcc)
end,
StreamOwners0,
Streams
),
Data#{
stream_owners => StreamOwners1
}.
%%--------------------------------------------------------------------
%% Handle stream progress updates from agent in updating (VersionOld -> VersionNew) state
update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, VersionNew) ->
#{state := State, version := AgentVersion, prev_version := AgentPrevVersion} =
AgentState0 = get_agent_state(Data0, Agent),
case {State, VersionOld, VersionNew} of
{?waiting_updating, AgentPrevVersion, AgentVersion} ->
%% Client started updating
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
{AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses),
AgentState3 =
case AgentState2 of
#{revoke_streams := []} ->
agent_transition_to_waiting_replaying(AgentState2);
_ ->
agent_transition_to_updating(AgentState2)
end,
set_agent_state(Data2, Agent, AgentState3);
{?updating, AgentPrevVersion, AgentVersion} ->
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
{AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses),
AgentState3 =
case AgentState2 of
#{revoke_streams := []} ->
agent_transition_to_waiting_replaying(AgentState2);
_ ->
AgentState2
end,
set_agent_state(Data2, Agent, AgentState3);
{?waiting_replaying, _, AgentVersion} ->
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
set_agent_state(Data1, Agent, AgentState1);
{?replaying, _, AgentVersion} ->
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
set_agent_state(Data1, Agent, AgentState1);
{OtherState, OtherVersionOld, OtherVersionNew} ->
?tp(warning, unexpected_update, #{
agent => Agent,
update_version_old => OtherVersionOld,
update_version_new => OtherVersionNew,
state => OtherState,
our_agent_version => AgentVersion,
our_agent_prev_version => AgentPrevVersion
}),
drop_invalidate_agent(Data0, Agent)
end.
%%--------------------------------------------------------------------
%% Agent state transitions
%%--------------------------------------------------------------------
agent_transition_to_waiting_updating(
#{group := Group} = Data,
Agent,
#{version := Version, prev_version := undefined} = AgentState0,
Streams,
RevokedStreams
) ->
NewVersion = next_version(Version),
AgentState1 = AgentState0#{
state => ?waiting_updating,
streams => Streams,
revoked_streams => RevokedStreams,
prev_version => Version,
version => NewVersion
},
StreamProgresses = stream_progresses(Data, Streams),
ok = emqx_ds_shared_sub_proto:leader_update_streams(
Agent, Group, Version, NewVersion, StreamProgresses
),
AgentState1.
agent_transition_to_waiting_replaying(AgentState0) ->
AgentState0#{
state => ?waiting_replaying,
revoked_streams => []
}.
agent_transition_to_initial_waiting_replaying(
#{group := Group} = Data, Agent, InitialStreams
) ->
Version = 0,
StreamProgresses = stream_progresses(Data, InitialStreams),
Leader = this_leader(Data),
ok = emqx_ds_shared_sub_proto:leader_lease_streams(
Agent, Group, Leader, StreamProgresses, Version
),
#{
state => ?waiting_replaying,
version => Version,
prev_version => undefined,
streams => InitialStreams,
revoked_streams => [],
update_deadline => now_ms() + ?AGENT_TIMEOUT
}.
agent_transition_to_replaying(#{state := ?waiting_replaying} = AgentState) ->
AgentState#{
state => ?replaying,
prev_version => undefined
}.
agent_transition_to_updating(#{state := ?waiting_updating} = AgentState) ->
AgentState#{state => ?updating}.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
router_id() ->
gen_router_id() ->
emqx_guid:to_hexstr(emqx_guid:gen()).
now_ms() ->
erlang:system_time(millisecond).
unassigned_streams(#{stream_progresses := StreamProgresses, stream_assignments := StreamAssignments}) ->
unassigned_streams(#{stream_progresses := StreamProgresses, stream_owners := StreamOwners}) ->
Streams = maps:keys(StreamProgresses),
AssignedStreams = maps:keys(StreamAssignments),
AssignedStreams = maps:keys(StreamOwners),
Streams -- AssignedStreams.
%% Those who are not connecting or updating, i.e. not in a transient state.
replaying_agents(#{agents := AgentStates}) ->
lists:filtermap(
fun
({Agent, #{state := ?replaying}}) ->
{true, Agent};
(_) ->
false
end,
maps:to_list(AgentStates)
).
desired_streams_per_agent(#{agents := AgentStates, stream_progresses := StreamProgresses}) ->
AgentCount = maps:size(AgentStates),
case AgentCount of
0 ->
0;
_ ->
StreamCount = maps:size(StreamProgresses),
(StreamCount div AgentCount) + 1
end.
stream_progresses(#{stream_progresses := StreamProgresses} = _Data, Streams) ->
lists:map(
fun(Stream) ->
#{
stream => Stream,
iterator => maps:get(Stream, StreamProgresses)
}
end,
Streams
).
next_version(Version) ->
Version + 1.
shuffle(L0) ->
L1 = lists:map(
fun(A) ->
{rand:uniform(), A}
end,
L0
),
L2 = lists:sort(L1),
{_, L} = lists:unzip(L2),
L.
set_stream_ownership_to_agent(#{stream_owners := StreamOwners0} = Data, Agent, Streams) ->
StreamOwners1 = lists:foldl(
fun(Stream, Acc) ->
Acc#{Stream => Agent}
end,
StreamOwners0,
Streams
),
Data#{
stream_owners => StreamOwners1
}.
set_agent_state(#{agents := Agents} = Data, Agent, AgentState) ->
Data#{
agents => Agents#{Agent => AgentState}
}.
update_agent_timeout(AgentState) ->
AgentState#{
update_deadline => now_ms() + ?AGENT_TIMEOUT
}.
get_agent_state(#{agents := Agents} = _Data, Agent) ->
maps:get(Agent, Agents).
this_leader(_Data) ->
self().
drop_agent(#{agents := Agents} = Data0, Agent) ->
AgentState = get_agent_state(Data0, Agent),
#{streams := Streams, revoked_streams := RevokedStreams} = AgentState,
AllStreams = Streams ++ RevokedStreams,
Data1 = unassign_streams(Data0, AllStreams),
Data1#{agents => maps:remove(Agent, Agents)}.
invalidate_agent(#{group := Group}, Agent) ->
ok = emqx_ds_shared_sub_proto:leader_invalidate(Agent, Group).
drop_invalidate_agent(Data0, Agent) ->
Data1 = drop_agent(Data0, Agent),
ok = invalidate_agent(Data1, Agent),
Data1.
with_agent(#{agents := Agents} = Data, Agent, Fun) ->
case Agents of
#{Agent := _} ->
Fun();
_ ->
Data
end.

View File

@ -13,9 +13,13 @@
-export([
agent_connect_leader/3,
agent_update_stream_states/4,
agent_update_stream_states/5,
leader_lease_streams/4,
leader_renew_stream_lease/3
leader_lease_streams/5,
leader_renew_stream_lease/3,
leader_renew_stream_lease/4,
leader_update_streams/5,
leader_invalidate/2
]).
-type agent() :: pid().
@ -29,6 +33,12 @@
iterator := emqx_ds:iterator()
}.
-type agent_stream_progress() :: #{
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator(),
use_finished := boolean()
}.
-export_type([
agent/0,
leader/0,
@ -44,20 +54,27 @@ agent_connect_leader(ToLeader, FromAgent, TopicFilter) ->
_ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)),
ok.
-spec agent_update_stream_states(leader(), agent(), list(stream_progress()), version()) -> ok.
-spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok.
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
_ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)),
ok.
%% ...
-spec agent_update_stream_states(
leader(), agent(), list(agent_stream_progress()), version(), version()
) -> ok.
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) ->
_ = erlang:send(
ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew)
),
ok.
%% leader -> agent messages
-spec leader_lease_streams(agent(), group(), list(stream_progress()), version()) -> ok.
leader_lease_streams(ToAgent, OfGroup, Streams, Version) ->
-spec leader_lease_streams(agent(), group(), leader(), list(stream_progress()), version()) -> ok.
leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_lease_streams(OfGroup, Streams, Version)
?leader_lease_streams(OfGroup, Leader, Streams, Version)
),
ok.
@ -69,4 +86,26 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
),
ok.
%% ...
-spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok.
leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew)
),
ok.
-spec leader_update_streams(agent(), group(), version(), version(), list(stream_progress())) -> ok.
leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew)
),
ok.
-spec leader_invalidate(agent(), group()) -> ok.
leader_invalidate(ToAgent, OfGroup) ->
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_invalidate(OfGroup)
),
ok.

View File

@ -49,6 +49,22 @@
agent := Agent
}).
-define(agent_update_stream_states(Agent, StreamStates, VersionOld, VersionNew), #{
type => ?agent_update_stream_states_msg,
stream_states => StreamStates,
version_old => VersionOld,
version_new => VersionNew,
agent => Agent
}).
-define(agent_update_stream_states_match(Agent, StreamStates, VersionOld, VersionNew), #{
type := ?agent_update_stream_states_msg,
stream_states := StreamStates,
version_old := VersionOld,
version_new := VersionNew,
agent := Agent
}).
%% leader messages, sent from the leader to the agent
%% Agent may have several shared subscriptions, so may talk to several leaders
%% `group` field is used to identify the leader.
@ -56,17 +72,19 @@
-define(leader_lease_streams_msg, leader_lease_streams).
-define(leader_renew_stream_lease_msg, leader_renew_stream_lease).
-define(leader_lease_streams(Group, Streams, Version), #{
-define(leader_lease_streams(Group, Leader, Streams, Version), #{
type => ?leader_lease_streams_msg,
streams => Streams,
version => Version,
leader => Leader,
group => Group
}).
-define(leader_lease_streams_match(Group, Streams, Version), #{
-define(leader_lease_streams_match(Group, Leader, Streams, Version), #{
type := ?leader_lease_streams_msg,
streams := Streams,
version := Version,
leader := Leader,
group := Group
}).
@ -82,4 +100,44 @@
group := Group
}).
-define(leader_renew_stream_lease(Group, VersionOld, VersionNew), #{
type => ?leader_renew_stream_lease_msg,
version_old => VersionOld,
version_new => VersionNew,
group => Group
}).
-define(leader_renew_stream_lease_match(Group, VersionOld, VersionNew), #{
type := ?leader_renew_stream_lease_msg,
version_old := VersionOld,
version_new := VersionNew,
group := Group
}).
-define(leader_update_streams(Group, VersionOld, VersionNew, StreamsNew), #{
type => leader_update_streams,
version_old => VersionOld,
version_new => VersionNew,
streams_new => StreamsNew,
group => Group
}).
-define(leader_update_streams_match(Group, VersionOld, VersionNew, StreamsNew), #{
type := leader_update_streams,
version_old := VersionOld,
version_new := VersionNew,
streams_new := StreamsNew,
group := Group
}).
-define(leader_invalidate(Group), #{
type => leader_invalidate,
group => Group
}).
-define(leader_invalidate_match(Group), #{
type := leader_invalidate,
group := Group
}).
-endif.