From 082514f557900dd93c1945c12f71254700417f50 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 26 Jun 2024 14:19:06 +0300 Subject: [PATCH] feat(queue): implement full protocol between agent and leader --- ...emqx_persistent_session_ds_shared_subs.erl | 7 +- .../src/emqx_ds_shared_sub_agent.erl | 72 +- .../src/emqx_ds_shared_sub_group_sm.erl | 285 ++++++-- .../src/emqx_ds_shared_sub_leader.erl | 634 ++++++++++++++---- .../src/emqx_ds_shared_sub_proto.erl | 55 +- .../src/emqx_ds_shared_sub_proto.hrl | 62 +- 6 files changed, 933 insertions(+), 182 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index c4e929640..f3aaa146e 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -123,10 +123,12 @@ on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) -> Progress = fold_shared_stream_states( fun(TopicFilter, Stream, SRS, Acc) -> #srs{it_begin = BeginIt} = SRS, + StreamProgress = #{ topic_filter => TopicFilter, stream => Stream, - iterator => BeginIt + iterator => BeginIt, + use_finished => is_use_finished(S, SRS) }, [StreamProgress | Acc] end, @@ -336,3 +338,6 @@ agent_opts(#{session_id := SessionId}) -> -dialyzer({nowarn_function, now_ms/0}). now_ms() -> erlang:system_time(millisecond). + +is_use_finished(S, #srs{unsubscribed = Unsubscribed} = SRS) -> + Unsubscribed andalso emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index 29745aa4a..6e43e0a65 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -56,20 +56,30 @@ on_unsubscribe(State, TopicFilter) -> renew_streams(#{} = State) -> fetch_stream_events(State). -on_stream_progress(State, _StreamProgress) -> - %% TODO https://emqx.atlassian.net/browse/EMQX-12572 - %% Send to leader - State. +on_stream_progress(State, StreamProgresses) -> + ProgressesByGroup = stream_progresses_by_group(StreamProgresses), + lists:foldl( + fun({Group, GroupProgresses}, StateAcc) -> + with_group_sm(StateAcc, Group, fun(GSM) -> + emqx_ds_shared_sub_group_sm:handle_stream_progress(GSM, GroupProgresses) + end) + end, + State, + maps:to_list(ProgressesByGroup) + ). -on_info(State, ?leader_lease_streams_match(Group, StreamProgresses, Version)) -> +on_info(State, ?leader_lease_streams_match(Group, Leader, StreamProgresses, Version)) -> ?SLOG(info, #{ msg => leader_lease_streams, group => Group, streams => StreamProgresses, - version => Version + version => Version, + leader => Leader }), with_group_sm(State, Group, fun(GSM) -> - emqx_ds_shared_sub_group_sm:handle_leader_lease_streams(GSM, StreamProgresses, Version) + emqx_ds_shared_sub_group_sm:handle_leader_lease_streams( + GSM, Leader, StreamProgresses, Version + ) end); on_info(State, ?leader_renew_stream_lease_match(Group, Version)) -> ?SLOG(info, #{ @@ -80,6 +90,37 @@ on_info(State, ?leader_renew_stream_lease_match(Group, Version)) -> with_group_sm(State, Group, fun(GSM) -> emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version) end); +on_info(State, ?leader_renew_stream_lease_match(Group, VersionOld, VersionNew)) -> + ?SLOG(info, #{ + msg => leader_renew_stream_lease, + group => Group, + version_old => VersionOld, + version_new => VersionNew + }), + with_group_sm(State, Group, fun(GSM) -> + emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) + end); +on_info(State, ?leader_update_streams_match(Group, VersionOld, VersionNew, StreamsNew)) -> + ?SLOG(info, #{ + msg => leader_update_streams, + group => Group, + version_old => VersionOld, + version_new => VersionNew, + streams_new => StreamsNew + }), + with_group_sm(State, Group, fun(GSM) -> + emqx_ds_shared_sub_group_sm:handle_leader_update_streams( + GSM, VersionOld, VersionNew, StreamsNew + ) + end); +on_info(State, ?leader_invalidate_match(Group)) -> + ?SLOG(info, #{ + msg => leader_invalidate, + group => Group + }), + with_group_sm(State, Group, fun(GSM) -> + emqx_ds_shared_sub_group_sm:handle_leader_invalidate(GSM) + end); %% Generic messages sent by group_sm's to themselves (timeouts). on_info(State, #message_to_group_sm{group = Group, message = Message}) -> with_group_sm(State, Group, fun(GSM) -> @@ -156,3 +197,20 @@ with_group_sm(State, Group, Fun) -> %% Error? State end. + +stream_progresses_by_group(StreamProgresses) -> + lists:foldl( + fun(#{topic_filter := #share{group = Group}} = Progress0, Acc) -> + Progress1 = maps:remove(topic_filter, Progress0), + maps:update_with( + Group, + fun(GroupStreams0) -> + [Progress1 | GroupStreams0] + end, + [Progress1], + Acc + ) + end, + #{}, + StreamProgresses + ). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index 5ddff8518..eb13e7147 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -16,14 +16,24 @@ new/1, %% Leader messages - handle_leader_lease_streams/3, + handle_leader_lease_streams/4, handle_leader_renew_stream_lease/2, + handle_leader_renew_stream_lease/3, + handle_leader_update_streams/4, + handle_leader_invalidate/1, %% Self-initiated messages handle_info/2, %% API - fetch_stream_events/1 + fetch_stream_events/1, + handle_stream_progress/2 +]). + +-export_type([ + group_sm/0, + options/0, + state/0 ]). -type options() :: #{ @@ -32,7 +42,31 @@ send_after := fun((non_neg_integer(), term()) -> reference()) }. -%% Subscription states +-type stream_lease_event() :: + #{ + type => lease, + stream => emqx_ds:stream(), + iterator => emqx_ds:iterator() + } + | #{ + type => revoke, + stream => emqx_ds:stream() + }. + +-type external_lease_event() :: + #{ + type => lease, + stream => emqx_ds:stream(), + iterator => emqx_ds:iterator(), + topic_filter => emqx_persistent_session_ds:share_topic_filter() + } + | #{ + type => revoke, + stream => emqx_ds:stream(), + topic_filter => emqx_persistent_session_ds:share_topic_filter() + }. + +%% GroupSM States -define(connecting, connecting). -define(replaying, replaying). @@ -40,26 +74,47 @@ -type state() :: ?connecting | ?replaying | ?updating. --type group_sm() :: #{ - topic_filter => emqx_persistent_session_ds:share_topic_filter(), - agent => emqx_ds_shared_sub_proto:agent(), - send_after => fun((non_neg_integer(), term()) -> reference()), - - state => state(), - state_data => map(), - state_timers => map() +-type connecting_data() :: #{}. +-type replaying_data() :: #{ + leader => emqx_ds_shared_sub_proto:leader(), + streams => #{emqx_ds:stream() => emqx_ds:iterator()}, + version => emqx_ds_shared_sub_proto:version(), + prev_version => undefined }. +-type updating_data() :: #{ + leader => emqx_ds_shared_sub_proto:leader(), + streams => #{emqx_ds:stream() => emqx_ds:iterator()}, + version => emqx_ds_shared_sub_proto:version(), + prev_version => emqx_ds_shared_sub_proto:version() +}. + +-type state_data() :: connecting_data() | replaying_data() | updating_data(). -record(state_timeout, { id :: reference(), name :: atom(), message :: term() }). + -record(timer, { ref :: reference(), id :: reference() }). +-type timer_name() :: atom(). +-type timer() :: #timer{}. + +-type group_sm() :: #{ + topic_filter => emqx_persistent_session_ds:share_topic_filter(), + agent => emqx_ds_shared_sub_proto:agent(), + send_after => fun((non_neg_integer(), term()) -> reference()), + stream_lease_events => list(stream_lease_event()), + + state => state(), + state_data => state_data(), + state_timers => #{timer_name() => timer()} +}. + %%----------------------------------------------------------------------- %% Constants %%----------------------------------------------------------------------- @@ -94,11 +149,12 @@ new(#{ }, transition(GSM0, ?connecting, #{}). +-spec fetch_stream_events(group_sm()) -> {group_sm(), list(external_lease_event())}. fetch_stream_events( #{ - state := ?replaying, + state := _State, topic_filter := TopicFilter, - state_data := #{stream_lease_events := Events0} = Data + stream_lease_events := Events0 } = GSM ) -> Events1 = lists:map( @@ -107,14 +163,7 @@ fetch_stream_events( end, Events0 ), - { - GSM#{ - state_data => Data#{stream_lease_events => []} - }, - Events1 - }; -fetch_stream_events(GSM) -> - {GSM, []}. + {GSM#{stream_lease_events => []}, Events1}. %%----------------------------------------------------------------------- %% Event Handlers @@ -128,37 +177,23 @@ handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) -> ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT). handle_leader_lease_streams( - #{state := ?connecting, topic_filter := TopicFilter} = GSM0, StreamProgresses, Version + #{state := ?connecting, topic_filter := TopicFilter} = GSM0, Leader, StreamProgresses, Version ) -> ?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}), - Streams = lists:foldl( - fun(#{stream := Stream, iterator := It}, Acc) -> - Acc#{Stream => It} - end, - #{}, - StreamProgresses - ), - StreamLeaseEvents = lists:map( - fun(#{stream := Stream, iterator := It}) -> - #{ - type => lease, - stream => Stream, - iterator => It - } - end, - StreamProgresses - ), + Streams = progresses_to_map(StreamProgresses), + StreamLeaseEvents = progresses_to_lease_events(StreamProgresses), transition( GSM0, ?replaying, #{ + leader => Leader, streams => Streams, - stream_lease_events => StreamLeaseEvents, prev_version => undefined, version => Version - } + }, + StreamLeaseEvents ); -handle_leader_lease_streams(GSM, _StreamProgresses, _Version) -> +handle_leader_lease_streams(GSM, _Leader, _StreamProgresses, _Version) -> GSM. handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) -> @@ -172,13 +207,6 @@ handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0 handle_replaying(GSM) -> ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT). -handle_leader_renew_stream_lease( - #{state := ?replaying, state_data := #{version := Version}} = GSM, Version -) -> - ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); -handle_leader_renew_stream_lease(GSM, _Version) -> - GSM. - handle_renew_lease_timeout(GSM) -> ?tp(debug, renew_lease_timeout, #{}), transition(GSM, ?connecting, #{}). @@ -187,8 +215,140 @@ handle_renew_lease_timeout(GSM) -> %% Updating state handle_updating(GSM) -> + ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT). + +%%----------------------------------------------------------------------- +%% Common handlers + +handle_leader_update_streams( + #{ + state := ?replaying, + stream_data := #{streams := Streams0, version := VersionOld} = StateData + } = GSM, + VersionOld, + VersionNew, + StreamProgresses +) -> + {AddEvents, Streams1} = lists:foldl( + fun(#{stream := Stream, iterator := It}, {AddEventAcc, StreamsAcc}) -> + case maps:is_key(Stream, StreamsAcc) of + true -> + %% We prefer our own progress + {AddEventAcc, StreamsAcc}; + false -> + { + [#{type => lease, stream => Stream, iterator => It} | AddEventAcc], + StreamsAcc#{Stream => It} + } + end + end, + {[], Streams0}, + StreamProgresses + ), + NewStreamMap = progresses_to_map(StreamProgresses), + {RevokeEvents, Streams2} = lists:foldl( + fun(Stream, {RevokeEventAcc, StreamsAcc}) -> + case maps:is_key(Stream, NewStreamMap) of + true -> + {RevokeEventAcc, StreamsAcc}; + false -> + { + [#{type => revoke, stream => Stream} | RevokeEventAcc], + maps:remove(Stream, StreamsAcc) + } + end + end, + {[], Streams1}, + maps:keys(Streams1) + ), + StreamLeaseEvents = AddEvents ++ RevokeEvents, + transition( + GSM, + ?updating, + StateData#{ + streams => Streams2, + prev_version => VersionOld, + version => VersionNew + }, + StreamLeaseEvents + ); +handle_leader_update_streams( + #{ + state := ?updating, + stream_data := #{version := VersionNew} = _StreamData + } = GSM, + _VersionOld, + VersionNew, + _StreamProgresses +) -> + ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); +handle_leader_update_streams(GSM, _VersionOld, _VersionNew, _StreamProgresses) -> + %% Unexpected versions or state + transition(GSM, ?connecting, #{}). + +handle_leader_renew_stream_lease( + #{state := ?replaying, state_data := #{version := Version}} = GSM, Version +) -> + ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); +handle_leader_renew_stream_lease( + #{state := ?updating, state_data := #{version := Version} = StateData} = GSM, Version +) -> + transition( + GSM, + ?replaying, + StateData#{prev_version => undefined} + ); +handle_leader_renew_stream_lease(GSM, _Version) -> GSM. +handle_leader_renew_stream_lease( + #{state := ?replaying, state_data := #{version := Version}} = GSM, VersionOld, VersionNew +) when VersionOld =:= Version orelse VersionNew =:= Version -> + ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); +handle_leader_renew_stream_lease( + #{state := ?updating, state_data := #{version := VersionNew, prev_version := VersionOld}} = GSM, + VersionOld, + VersionNew +) -> + ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); +handle_leader_renew_stream_lease(GSM, _VersionOld, _VersionNew) -> + transition(GSM, ?connecting, #{}). + +handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) -> + GSM; +handle_stream_progress( + #{ + state := ?replaying, + state_data := #{ + agent := Agent, + leader := Leader, + version := Version + } + } = _GSM, + StreamProgresses +) -> + ok = emqx_ds_shared_sub_proto:agent_update_stream_states( + Leader, Agent, StreamProgresses, Version + ); +handle_stream_progress( + #{ + state := ?updating, + state_data := #{ + agent := Agent, + leader := Leader, + version := Version, + prev_version := PrevVersion + } + } = _GSM, + StreamProgresses +) -> + ok = emqx_ds_shared_sub_proto:agent_update_stream_states( + Leader, Agent, StreamProgresses, PrevVersion, Version + ). + +handle_leader_invalidate(GSM) -> + transition(GSM, ?connecting, #{}). + %%----------------------------------------------------------------------- %% Internal API %%----------------------------------------------------------------------- @@ -225,6 +385,9 @@ handle_info(GSM, _Info) -> %%-------------------------------------------------------------------- transition(GSM0, NewState, NewStateData) -> + transition(GSM0, NewState, NewStateData, []). + +transition(GSM0, NewState, NewStateData, LeaseEvents) -> Timers = maps:get(state_timers, GSM0, #{}), TimerNames = maps:keys(Timers), GSM1 = lists:foldl( @@ -237,7 +400,8 @@ transition(GSM0, NewState, NewStateData) -> GSM2 = GSM1#{ state => NewState, state_data => NewStateData, - state_timers => #{} + state_timers => #{}, + stream_lease_events => LeaseEvents }, run_enter_callback(GSM2). @@ -280,3 +444,24 @@ run_enter_callback(#{state := ?replaying} = GSM) -> handle_replaying(GSM); run_enter_callback(#{state := ?updating} = GSM) -> handle_updating(GSM). + +progresses_to_lease_events(StreamProgresses) -> + lists:map( + fun(#{stream := Stream, iterator := It}) -> + #{ + type => lease, + stream => Stream, + iterator => It + } + end, + StreamProgresses + ). + +progresses_to_map(StreamProgresses) -> + lists:foldl( + fun(#{stream := Stream, iterator := It}, Acc) -> + Acc#{Stream => It} + end, + #{}, + StreamProgresses + ). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 5323595cf..3f2a85424 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -6,10 +6,12 @@ -behaviour(gen_statem). +-include("emqx_ds_shared_sub_proto.hrl"). + -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_persistent_message.hrl"). --include("emqx_ds_shared_sub_proto.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -export([ register/2, @@ -28,10 +30,21 @@ topic_filter := emqx_persistent_session_ds:share_topic_filter() }. --type stream_assignment() :: #{ +%% Agent states + +-define(waiting_replaying, waiting_replaying). +-define(replaying, replaying). +-define(waiting_updating, waiting_updating). +-define(updating, updating). + +-type agent_state() :: #{ + %% Our view of group gm's status + %% it lags the actual state + state := emqx_ds_shared_sub_agent:status(), prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()), version := emqx_ds_shared_sub_proto:version(), - streams := list(emqx_ds:stream()) + streams := list(emqx_ds:stream()), + revoked_streams := list(emqx_ds:stream()) }. -type data() :: #{ @@ -46,10 +59,10 @@ stream_progresses := #{ emqx_ds:stream() => emqx_ds:iterator() }, - agent_stream_assignments := #{ - emqx_ds_shared_sub_proto:agent() => stream_assignment() + agents := #{ + emqx_ds_shared_sub_proto:agent() => agent_state() }, - stream_assignments := #{ + stream_owners := #{ emqx_ds:stream() => emqx_ds_shared_sub_proto:agent() } }. @@ -61,8 +74,8 @@ %% States --define(waiting_registration, waiting_registration). --define(replaying, replaying). +-define(leader_waiting_registration, leader_waiting_registration). +-define(leader_replaying, leader_replaying). %% Events @@ -71,13 +84,17 @@ }). -record(renew_streams, {}). -record(renew_leases, {}). +-record(drop_timeout, {}). %% Constants %% TODO https://emqx.atlassian.net/browse/EMQX-12574 %% Move to settings --define(RENEW_LEASE_INTERVAL, 5000). --define(RENEW_STREAMS_INTERVAL, 5000). +-define(RENEW_LEASE_INTERVAL, 1000). +-define(RENEW_STREAMS_INTERVAL, 1000). +-define(DROP_TIMEOUT_INTERVAL, 1000). + +-define(AGENT_TIMEOUT, 5000). %%-------------------------------------------------------------------- %% API @@ -115,17 +132,17 @@ init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) -> Data = #{ group => Group, topic => Topic, - router_id => router_id(), + router_id => gen_router_id(), stream_progresses => #{}, - stream_assignments => #{}, - agent_stream_assignments => #{} + stream_owners => #{}, + agents => #{} }, - {ok, ?waiting_registration, Data}. + {ok, ?leader_waiting_registration, Data}. %%-------------------------------------------------------------------- %% waiting_registration state -handle_event({call, From}, #register{register_fun = Fun}, ?waiting_registration, Data) -> +handle_event({call, From}, #register{register_fun = Fun}, ?leader_waiting_registration, Data) -> Self = self(), case Fun() of Self -> @@ -135,25 +152,44 @@ handle_event({call, From}, #register{register_fun = Fun}, ?waiting_registration, end; %%-------------------------------------------------------------------- %% repalying state -handle_event(enter, _OldState, ?replaying, #{topic := Topic, router_id := RouterId} = _Data) -> +handle_event(enter, _OldState, ?leader_replaying, #{topic := Topic, router_id := RouterId} = _Data) -> ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId), {keep_state_and_data, [ {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}, + {state_timeout, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}, {state_timeout, 0, #renew_streams{}} ]}; -handle_event(state_timeout, #renew_streams{}, ?replaying, Data0) -> +handle_event(state_timeout, #renew_streams{}, ?leader_replaying, Data0) -> Data1 = renew_streams(Data0), {keep_state, Data1, {state_timeout, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}}; -handle_event(state_timeout, #renew_leases{}, ?replaying, Data0) -> +handle_event(state_timeout, #renew_leases{}, ?leader_replaying, Data0) -> Data1 = renew_leases(Data0), {keep_state, Data1, {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}}; -handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?replaying, Data0) -> +handle_event(state_timeout, #drop_timeout{}, ?leader_replaying, Data0) -> + Data1 = drop_timeout_agents(Data0), + {keep_state, Data1, {state_timeout, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}}; +handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?leader_replaying, Data0) -> Data1 = connect_agent(Data0, Agent), {keep_state, Data1}; handle_event( - info, ?agent_update_stream_states_match(Agent, StreamProgresses, Version), ?replaying, Data0 + info, + ?agent_update_stream_states_match(Agent, StreamProgresses, Version), + ?leader_replaying, + Data0 ) -> - Data1 = update_agent_stream_states(Data0, Agent, StreamProgresses, Version), + Data1 = with_agent(Data0, Agent, fun() -> + update_agent_stream_states(Data0, Agent, StreamProgresses, Version) + end), + {keep_state, Data1}; +handle_event( + info, + ?agent_update_stream_states_match(Agent, StreamProgresses, VersionOld, VersionNew), + ?leader_replaying, + Data0 +) -> + Data1 = with_agent(Data0, Agent, fun() -> + update_agent_stream_states(Data0, Agent, StreamProgresses, VersionOld, VersionNew) + end), {keep_state, Data1}; %%-------------------------------------------------------------------- %% fallback @@ -172,9 +208,16 @@ terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) -> ok. %%-------------------------------------------------------------------- -%% Internal functions +%% Event handlers %%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- +%% Renew streams + +%% * Find new streams in DS +%% * Revoke streams from agents having too many streams +%% * Assign streams to agents having too few streams + renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) -> TopicFilter = emqx_topic:words(Topic), StartTime = now_ms(), @@ -198,25 +241,109 @@ renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) -> Progresses, Streams ), - %% TODO https://emqx.atlassian.net/browse/EMQX-12572 - %% Initiate reassigment + Data1 = Data0#{stream_progresses => NewProgresses}, ?SLOG(info, #{ msg => leader_renew_streams, topic_filter => TopicFilter, streams => length(Streams) }), - Data0#{stream_progresses => NewProgresses}. + Data2 = revoke_streams(Data1), + Data3 = assign_streams(Data2), + Data3. -%% TODO https://emqx.atlassian.net/browse/EMQX-12572 -%% This just gives unassigned streams to the connecting agent, -%% we need to implement actual stream (re)assignment. -connect_agent( +%% We revoke streams from agents that have too many streams (> desired_streams_per_agent). +%% We revoke only from replaying agents. +%% After revoking, no unassigned streams appear. Streams will become unassigned +%% only after agents report them as acked and unsubscribed. +revoke_streams(Data0) -> + DesiredStreamsPerAgent = desired_streams_per_agent(Data0), + Agents = replaying_agents(Data0), + lists:foldl( + fun(Agent, DataAcc) -> + revoke_excess_streams_from_agent(DataAcc, Agent, DesiredStreamsPerAgent) + end, + Data0, + Agents + ). + +revoke_excess_streams_from_agent(Data0, Agent, DesiredCount) -> + #{streams := Streams0, revoked_streams := []} = AgentState0 = get_agent_state(Data0, Agent), + RevokeCount = length(Streams0) - DesiredCount, + AgentState1 = + case RevokeCount > 0 of + false -> + AgentState0; + true -> + revoke_streams_from_agent(Data0, Agent, AgentState0, RevokeCount) + end, + set_agent_state(Data0, Agent, AgentState1). + +revoke_streams_from_agent( + Data, + Agent, #{ - group := Group, - agent_stream_assignments := AgentStreamAssignments0, - stream_assignments := StreamAssignments0, - stream_progresses := StreamProgresses - } = Data0, + streams := Streams0, revoked_streams := [] + } = AgentState0, + RevokeCount +) -> + RevokedStreams = select_streams_for_revoke(Data, AgentState0, RevokeCount), + Streams = Streams0 -- RevokedStreams, + agent_transition_to_waiting_updating(Data, Agent, AgentState0, Streams, RevokedStreams). + +select_streams_for_revoke( + _Data, #{streams := Streams, revoked_streams := []} = _AgentState, RevokeCount +) -> + %% TODO + %% Some intellectual logic should be used regarding: + %% * shard ids (better spread shards across different streams); + %% * stream stats (how much data was replayed from stream, + %% heavy streams should be distributed across different agents); + %% * data locality (agents better preserve streams with data available on the agent's node) + lists:sublist(shuffle(Streams), RevokeCount). + +%% We assign streams to agents that have too few streams (< desired_streams_per_agent). +%% We assign only to replaying agents. +assign_streams(Data0) -> + DesiredStreamsPerAgent = desired_streams_per_agent(Data0), + Agents = replaying_agents(Data0), + lists:foldl( + fun(Agent, DataAcc) -> + assign_lacking_streams(DataAcc, Agent, DesiredStreamsPerAgent) + end, + Data0, + Agents + ). + +assign_lacking_streams(Data0, Agent, DesiredCount) -> + #{streams := Streams0, revoked_streams := []} = get_agent_state(Data0, Agent), + AssignCount = DesiredCount - length(Streams0), + case AssignCount > 0 of + false -> + Data0; + true -> + assign_streams_to_agent(Data0, Agent, AssignCount) + end. + +assign_streams_to_agent(Data0, Agent, AssignCount) -> + StreamsToAssign = select_streams_for_assign(Data0, Agent, AssignCount), + Data1 = set_stream_ownership_to_agent(Data0, Agent, StreamsToAssign), + #{agents := #{Agent := AgentState0}} = Data1, + #{streams := Streams0, revoked_streams := []} = AgentState0, + Streams1 = Streams0 ++ StreamsToAssign, + AgentState1 = agent_transition_to_waiting_updating(Data0, Agent, AgentState0, Streams1, []), + set_agent_state(Data1, Agent, AgentState1). + +select_streams_for_assign(Data0, _Agent, AssignCount) -> + %% TODO + %% Some intellectual logic should be used. See `select_streams_for_revoke/3`. + UnassignedStreams = unassigned_streams(Data0), + lists:sublist(shuffle(UnassignedStreams), AssignCount). + +%%-------------------------------------------------------------------- +%% Handle a newly connected agent + +connect_agent( + #{group := Group} = Data, Agent ) -> ?SLOG(info, #{ @@ -224,103 +351,382 @@ connect_agent( agent => Agent, group => Group }), - {AgentStreamAssignments, StreamAssignments} = - case AgentStreamAssignments0 of - #{Agent := _} -> - {AgentStreamAssignments0, StreamAssignments0}; - _ -> - UnassignedStreams = unassigned_streams(Data0), - Version = 0, - StreamAssignment = #{ - prev_version => undefined, - version => Version, - streams => UnassignedStreams - }, - AgentStreamAssignments1 = AgentStreamAssignments0#{Agent => StreamAssignment}, - StreamAssignments1 = lists:foldl( - fun(Stream, Acc) -> - Acc#{Stream => Agent} - end, - StreamAssignments0, - UnassignedStreams - ), - StreamLease = lists:map( - fun(Stream) -> - #{ - stream => Stream, - iterator => maps:get(Stream, StreamProgresses) - } - end, - UnassignedStreams - ), - ?SLOG(info, #{ - msg => leader_lease_streams, - agent => Agent, - group => Group, - streams => length(StreamLease), - version => Version - }), - ok = emqx_ds_shared_sub_proto:leader_lease_streams( - Agent, Group, StreamLease, Version - ), - {AgentStreamAssignments1, StreamAssignments1} - end, - Data0#{ - agent_stream_assignments => AgentStreamAssignments, stream_assignments => StreamAssignments - }. + DesiredCount = desired_streams_per_agent(Data), + assign_initial_streams_to_agent(Data, Agent, DesiredCount). -renew_leases(#{group := Group, agent_stream_assignments := AgentStreamAssignments} = Data) -> - ok = lists:foreach( - fun({Agent, #{version := Version}}) -> - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version) +assign_initial_streams_to_agent(Data, Agent, AssignCount) -> + InitialStreamsToAssign = select_streams_for_assign(Data, Agent, AssignCount), + Data1 = set_stream_ownership_to_agent(Data, Agent, InitialStreamsToAssign), + AgentState = agent_transition_to_initial_waiting_replaying( + Data1, Agent, InitialStreamsToAssign + ), + set_agent_state(Data1, Agent, AgentState). + +%%-------------------------------------------------------------------- +%% Drop agents that stopped reporting progress + +drop_timeout_agents(#{agents := Agents} = Data) -> + Now = now_ms(), + lists:foldl( + fun({Agent, #{update_deadline := Deadline} = _AgentState}, DataAcc) -> + case Deadline < Now of + true -> + ?SLOG(info, #{ + msg => leader_agent_timeout, + agent => Agent + }), + drop_invalidate_agent(DataAcc, Agent); + false -> + DataAcc + end end, - maps:to_list(AgentStreamAssignments) + Data, + maps:to_list(Agents) + ). + +%%-------------------------------------------------------------------- +%% Send lease confirmations to agents + +renew_leases(#{agents := AgentStates} = Data) -> + ok = lists:foreach( + fun({Agent, AgentState}) -> + renew_lease(Data, Agent, AgentState) + end, + maps:to_list(AgentStates) ), Data. -update_agent_stream_states( - #{ - agent_stream_assignments := AgentStreamAssignments, - stream_assignments := StreamAssignments, - stream_progresses := StreamProgresses0 - } = Data0, - Agent, - AgentStreamProgresses, - Version -) -> - AgentVersion = emqx_utils_maps:deep_get([Agent, version], AgentStreamAssignments, undefined), - AgentPrevVersion = emqx_utils_maps:deep_get( - [Agent, prev_version], AgentStreamAssignments, undefined +renew_lease(#{group := Group}, Agent, #{state := ?replaying, version := Version}) -> + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version); +renew_lease(#{group := Group}, Agent, #{state := ?waiting_replaying, version := Version}) -> + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version); +renew_lease(#{group := Group} = Data, Agent, #{ + streams := Streams, state := ?waiting_updating, version := Version, prev_version := PrevVersion +}) -> + StreamProgresses = stream_progresses(Data, Streams), + ok = emqx_ds_shared_sub_proto:leader_update_streams( + Agent, Group, PrevVersion, Version, StreamProgresses ), - case AgentVersion == Version orelse AgentPrevVersion == Version of - false -> - %% TODO https://emqx.atlassian.net/browse/EMQX-12572 - %% send invalidate to agent + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version, PrevVersion); +renew_lease(#{group := Group}, Agent, #{ + state := ?updating, version := Version, prev_version := PrevVersion +}) -> + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version, PrevVersion). + +%%-------------------------------------------------------------------- +%% Handle stream progress updates from agent in replaying state + +update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version) -> + #{state := State, version := AgentVersion, prev_version := AgentPrevVersion} = + AgentState0 = get_agent_state(Data0, Agent), + case {State, Version} of + {?waiting_updating, AgentPrevVersion} -> + %% Stale update, ignoring Data0; - true -> - StreamProgresses1 = lists:foldl( - fun(#{stream := Stream, iterator := It}, ProgressesAcc) -> - %% Assert Stream is assigned to Agent - Agent = maps:get(Stream, StreamAssignments), - ProgressesAcc#{Stream => It} - end, - StreamProgresses0, - AgentStreamProgresses - ), - Data0#{stream_progresses => StreamProgresses1} + {?waiting_replaying, AgentVersion} -> + %% Agent finished updating, now replaying + Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), + AgentState1 = update_agent_timeout(AgentState0), + AgentState2 = agent_transition_to_replaying(AgentState1), + set_agent_state(Data1, Agent, AgentState2); + {?replaying, AgentVersion} -> + %% Common case, agent is replaying + Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), + AgentState1 = update_agent_timeout(AgentState0), + set_agent_state(Data1, Agent, AgentState1); + {OtherState, OtherVersion} -> + ?tp(warning, unexpected_update, #{ + agent => Agent, + update_version => OtherVersion, + state => OtherState, + our_agent_version => AgentVersion, + our_agent_prev_version => AgentPrevVersion + }), + drop_invalidate_agent(Data0, Agent) end. +update_stream_progresses( + #{stream_progresses := StreamProgresses0, stream_owners := StreamOwners} = Data, + Agent, + ReceivedStreamProgresses +) -> + StreamProgresses1 = lists:foldl( + fun(#{stream := Stream, iterator := It}, ProgressesAcc) -> + case StreamOwners of + #{Stream := Agent} -> + ProgressesAcc#{Stream => It}; + _ -> + ProgressesAcc + end + end, + StreamProgresses0, + ReceivedStreamProgresses + ), + Data#{ + stream_progresses => StreamProgresses1 + }. + +clean_revoked_streams( + Data0, #{revoked_streams := RevokedStreams0} = AgentState0, ReceivedStreamProgresses +) -> + FinishedReportedStreams = maps:from_list( + lists:filtermap( + fun + ( + #{ + stream := Stream, + use_finished := true + } + ) -> + {true, {Stream, true}}; + (_) -> + false + end, + ReceivedStreamProgresses + ) + ), + {FinishedStreams, StillRevokingStreams} = lists:partition( + fun(Stream) -> + maps:is_key(Stream, FinishedReportedStreams) + end, + RevokedStreams0 + ), + Data1 = unassign_streams(Data0, FinishedStreams), + AgentState1 = AgentState0#{revoked_streams => StillRevokingStreams}, + {AgentState1, Data1}. + +unassign_streams(#{stream_owners := StreamOwners0} = Data, Streams) -> + StreamOwners1 = lists:foldl( + fun(Stream, StreamOwnersAcc) -> + maps:remove(Stream, StreamOwnersAcc) + end, + StreamOwners0, + Streams + ), + Data#{ + stream_owners => StreamOwners1 + }. + +%%-------------------------------------------------------------------- +%% Handle stream progress updates from agent in updating (VersionOld -> VersionNew) state + +update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, VersionNew) -> + #{state := State, version := AgentVersion, prev_version := AgentPrevVersion} = + AgentState0 = get_agent_state(Data0, Agent), + case {State, VersionOld, VersionNew} of + {?waiting_updating, AgentPrevVersion, AgentVersion} -> + %% Client started updating + Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), + AgentState1 = update_agent_timeout(AgentState0), + {AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses), + AgentState3 = + case AgentState2 of + #{revoke_streams := []} -> + agent_transition_to_waiting_replaying(AgentState2); + _ -> + agent_transition_to_updating(AgentState2) + end, + set_agent_state(Data2, Agent, AgentState3); + {?updating, AgentPrevVersion, AgentVersion} -> + Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), + AgentState1 = update_agent_timeout(AgentState0), + {AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses), + AgentState3 = + case AgentState2 of + #{revoke_streams := []} -> + agent_transition_to_waiting_replaying(AgentState2); + _ -> + AgentState2 + end, + set_agent_state(Data2, Agent, AgentState3); + {?waiting_replaying, _, AgentVersion} -> + Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), + AgentState1 = update_agent_timeout(AgentState0), + set_agent_state(Data1, Agent, AgentState1); + {?replaying, _, AgentVersion} -> + Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), + AgentState1 = update_agent_timeout(AgentState0), + set_agent_state(Data1, Agent, AgentState1); + {OtherState, OtherVersionOld, OtherVersionNew} -> + ?tp(warning, unexpected_update, #{ + agent => Agent, + update_version_old => OtherVersionOld, + update_version_new => OtherVersionNew, + state => OtherState, + our_agent_version => AgentVersion, + our_agent_prev_version => AgentPrevVersion + }), + drop_invalidate_agent(Data0, Agent) + end. + +%%-------------------------------------------------------------------- +%% Agent state transitions +%%-------------------------------------------------------------------- + +agent_transition_to_waiting_updating( + #{group := Group} = Data, + Agent, + #{version := Version, prev_version := undefined} = AgentState0, + Streams, + RevokedStreams +) -> + NewVersion = next_version(Version), + + AgentState1 = AgentState0#{ + state => ?waiting_updating, + streams => Streams, + revoked_streams => RevokedStreams, + prev_version => Version, + version => NewVersion + }, + StreamProgresses = stream_progresses(Data, Streams), + ok = emqx_ds_shared_sub_proto:leader_update_streams( + Agent, Group, Version, NewVersion, StreamProgresses + ), + AgentState1. + +agent_transition_to_waiting_replaying(AgentState0) -> + AgentState0#{ + state => ?waiting_replaying, + revoked_streams => [] + }. + +agent_transition_to_initial_waiting_replaying( + #{group := Group} = Data, Agent, InitialStreams +) -> + Version = 0, + StreamProgresses = stream_progresses(Data, InitialStreams), + Leader = this_leader(Data), + ok = emqx_ds_shared_sub_proto:leader_lease_streams( + Agent, Group, Leader, StreamProgresses, Version + ), + #{ + state => ?waiting_replaying, + version => Version, + prev_version => undefined, + streams => InitialStreams, + revoked_streams => [], + update_deadline => now_ms() + ?AGENT_TIMEOUT + }. + +agent_transition_to_replaying(#{state := ?waiting_replaying} = AgentState) -> + AgentState#{ + state => ?replaying, + prev_version => undefined + }. + +agent_transition_to_updating(#{state := ?waiting_updating} = AgentState) -> + AgentState#{state => ?updating}. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- -router_id() -> +gen_router_id() -> emqx_guid:to_hexstr(emqx_guid:gen()). now_ms() -> erlang:system_time(millisecond). -unassigned_streams(#{stream_progresses := StreamProgresses, stream_assignments := StreamAssignments}) -> +unassigned_streams(#{stream_progresses := StreamProgresses, stream_owners := StreamOwners}) -> Streams = maps:keys(StreamProgresses), - AssignedStreams = maps:keys(StreamAssignments), + AssignedStreams = maps:keys(StreamOwners), Streams -- AssignedStreams. + +%% Those who are not connecting or updating, i.e. not in a transient state. +replaying_agents(#{agents := AgentStates}) -> + lists:filtermap( + fun + ({Agent, #{state := ?replaying}}) -> + {true, Agent}; + (_) -> + false + end, + maps:to_list(AgentStates) + ). + +desired_streams_per_agent(#{agents := AgentStates, stream_progresses := StreamProgresses}) -> + AgentCount = maps:size(AgentStates), + case AgentCount of + 0 -> + 0; + _ -> + StreamCount = maps:size(StreamProgresses), + (StreamCount div AgentCount) + 1 + end. + +stream_progresses(#{stream_progresses := StreamProgresses} = _Data, Streams) -> + lists:map( + fun(Stream) -> + #{ + stream => Stream, + iterator => maps:get(Stream, StreamProgresses) + } + end, + Streams + ). + +next_version(Version) -> + Version + 1. + +shuffle(L0) -> + L1 = lists:map( + fun(A) -> + {rand:uniform(), A} + end, + L0 + ), + L2 = lists:sort(L1), + {_, L} = lists:unzip(L2), + L. + +set_stream_ownership_to_agent(#{stream_owners := StreamOwners0} = Data, Agent, Streams) -> + StreamOwners1 = lists:foldl( + fun(Stream, Acc) -> + Acc#{Stream => Agent} + end, + StreamOwners0, + Streams + ), + Data#{ + stream_owners => StreamOwners1 + }. + +set_agent_state(#{agents := Agents} = Data, Agent, AgentState) -> + Data#{ + agents => Agents#{Agent => AgentState} + }. + +update_agent_timeout(AgentState) -> + AgentState#{ + update_deadline => now_ms() + ?AGENT_TIMEOUT + }. + +get_agent_state(#{agents := Agents} = _Data, Agent) -> + maps:get(Agent, Agents). + +this_leader(_Data) -> + self(). + +drop_agent(#{agents := Agents} = Data0, Agent) -> + AgentState = get_agent_state(Data0, Agent), + #{streams := Streams, revoked_streams := RevokedStreams} = AgentState, + AllStreams = Streams ++ RevokedStreams, + Data1 = unassign_streams(Data0, AllStreams), + Data1#{agents => maps:remove(Agent, Agents)}. + +invalidate_agent(#{group := Group}, Agent) -> + ok = emqx_ds_shared_sub_proto:leader_invalidate(Agent, Group). + +drop_invalidate_agent(Data0, Agent) -> + Data1 = drop_agent(Data0, Agent), + ok = invalidate_agent(Data1, Agent), + Data1. + +with_agent(#{agents := Agents} = Data, Agent, Fun) -> + case Agents of + #{Agent := _} -> + Fun(); + _ -> + Data + end. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index d9a0b994f..7d81de083 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -13,9 +13,13 @@ -export([ agent_connect_leader/3, agent_update_stream_states/4, + agent_update_stream_states/5, - leader_lease_streams/4, - leader_renew_stream_lease/3 + leader_lease_streams/5, + leader_renew_stream_lease/3, + leader_renew_stream_lease/4, + leader_update_streams/5, + leader_invalidate/2 ]). -type agent() :: pid(). @@ -29,6 +33,12 @@ iterator := emqx_ds:iterator() }. +-type agent_stream_progress() :: #{ + stream := emqx_ds:stream(), + iterator := emqx_ds:iterator(), + use_finished := boolean() +}. + -export_type([ agent/0, leader/0, @@ -44,20 +54,27 @@ agent_connect_leader(ToLeader, FromAgent, TopicFilter) -> _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)), ok. --spec agent_update_stream_states(leader(), agent(), list(stream_progress()), version()) -> ok. +-spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok. agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)), ok. -%% ... +-spec agent_update_stream_states( + leader(), agent(), list(agent_stream_progress()), version(), version() +) -> ok. +agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) -> + _ = erlang:send( + ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew) + ), + ok. %% leader -> agent messages --spec leader_lease_streams(agent(), group(), list(stream_progress()), version()) -> ok. -leader_lease_streams(ToAgent, OfGroup, Streams, Version) -> +-spec leader_lease_streams(agent(), group(), leader(), list(stream_progress()), version()) -> ok. +leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> _ = emqx_persistent_session_ds_shared_subs_agent:send( ToAgent, - ?leader_lease_streams(OfGroup, Streams, Version) + ?leader_lease_streams(OfGroup, Leader, Streams, Version) ), ok. @@ -69,4 +86,26 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) -> ), ok. -%% ... +-spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok. +leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> + _ = emqx_persistent_session_ds_shared_subs_agent:send( + ToAgent, + ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew) + ), + ok. + +-spec leader_update_streams(agent(), group(), version(), version(), list(stream_progress())) -> ok. +leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> + _ = emqx_persistent_session_ds_shared_subs_agent:send( + ToAgent, + ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew) + ), + ok. + +-spec leader_invalidate(agent(), group()) -> ok. +leader_invalidate(ToAgent, OfGroup) -> + _ = emqx_persistent_session_ds_shared_subs_agent:send( + ToAgent, + ?leader_invalidate(OfGroup) + ), + ok. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl index c780ab193..6689a0d3b 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl @@ -49,6 +49,22 @@ agent := Agent }). +-define(agent_update_stream_states(Agent, StreamStates, VersionOld, VersionNew), #{ + type => ?agent_update_stream_states_msg, + stream_states => StreamStates, + version_old => VersionOld, + version_new => VersionNew, + agent => Agent +}). + +-define(agent_update_stream_states_match(Agent, StreamStates, VersionOld, VersionNew), #{ + type := ?agent_update_stream_states_msg, + stream_states := StreamStates, + version_old := VersionOld, + version_new := VersionNew, + agent := Agent +}). + %% leader messages, sent from the leader to the agent %% Agent may have several shared subscriptions, so may talk to several leaders %% `group` field is used to identify the leader. @@ -56,17 +72,19 @@ -define(leader_lease_streams_msg, leader_lease_streams). -define(leader_renew_stream_lease_msg, leader_renew_stream_lease). --define(leader_lease_streams(Group, Streams, Version), #{ +-define(leader_lease_streams(Group, Leader, Streams, Version), #{ type => ?leader_lease_streams_msg, streams => Streams, version => Version, + leader => Leader, group => Group }). --define(leader_lease_streams_match(Group, Streams, Version), #{ +-define(leader_lease_streams_match(Group, Leader, Streams, Version), #{ type := ?leader_lease_streams_msg, streams := Streams, version := Version, + leader := Leader, group := Group }). @@ -82,4 +100,44 @@ group := Group }). +-define(leader_renew_stream_lease(Group, VersionOld, VersionNew), #{ + type => ?leader_renew_stream_lease_msg, + version_old => VersionOld, + version_new => VersionNew, + group => Group +}). + +-define(leader_renew_stream_lease_match(Group, VersionOld, VersionNew), #{ + type := ?leader_renew_stream_lease_msg, + version_old := VersionOld, + version_new := VersionNew, + group := Group +}). + +-define(leader_update_streams(Group, VersionOld, VersionNew, StreamsNew), #{ + type => leader_update_streams, + version_old => VersionOld, + version_new => VersionNew, + streams_new => StreamsNew, + group => Group +}). + +-define(leader_update_streams_match(Group, VersionOld, VersionNew, StreamsNew), #{ + type := leader_update_streams, + version_old := VersionOld, + version_new := VersionNew, + streams_new := StreamsNew, + group := Group +}). + +-define(leader_invalidate(Group), #{ + type => leader_invalidate, + group => Group +}). + +-define(leader_invalidate_match(Group), #{ + type := leader_invalidate, + group := Group +}). + -endif.