From bca743054bed6c1f0f11cebda671afbb8ac19f6a Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Tue, 18 Jun 2024 21:03:51 +0300 Subject: [PATCH] feat(queue): implement backbones of queue agent, leader and leader registry --- apps/emqx/src/emqx_persistent_session_ds.erl | 25 +- ...emqx_persistent_session_ds_shared_subs.erl | 74 ++--- ...ersistent_session_ds_shared_subs_agent.erl | 31 +- .../src/emqx_ds_shared_sub.erl | 5 + .../src/emqx_ds_shared_sub.hrl | 10 + .../src/emqx_ds_shared_sub_agent.erl | 187 ++++++----- .../src/emqx_ds_shared_sub_leader.erl | 308 ++++++++++++++++++ .../src/emqx_ds_shared_sub_leader_sup.erl | 47 +++ .../src/emqx_ds_shared_sub_proto.erl | 72 ++++ .../src/emqx_ds_shared_sub_proto.hrl | 86 +++++ .../src/emqx_ds_shared_sub_registry.erl | 90 +++++ 11 files changed, 767 insertions(+), 168 deletions(-) create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.erl create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.hrl create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index cc599eb06..00cafb1c2 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -660,30 +660,7 @@ handle_info(?shared_sub_message(Msg), Session = #{s := S0, shared_sub_s := Share %%-------------------------------------------------------------------- shared_sub_opts(SessionId) -> - #{ - session_id => SessionId, - send_funs => #{ - send => fun send_message/2, - send_after => fun send_message_after/3 - } - }. - -send_message(Dest, Msg) -> - case Dest =:= self() of - true -> - erlang:send(Dest, ?session_message(?shared_sub_message(Msg))), - Msg; - false -> - erlang:send(Dest, Msg) - end. - -send_message_after(Time, Dest, Msg) -> - case Dest =:= self() of - true -> - erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg))); - false -> - erlang:send_after(Time, Dest, Msg) - end. + #{session_id => SessionId}. bump_last_alive(S0) -> %% Note: we take a pessimistic approach here and assume that the client will be alive diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index c355af0a6..835a2ed12 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -23,23 +23,14 @@ to_map/2 ]). --record(agent_message, { - message :: term() -}). - -type t() :: #{ agent := emqx_persistent_session_ds_shared_subs_agent:t() }. -type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type opts() :: #{ - session_id := emqx_persistent_session_ds:id(), - send_funs := #{ - send := fun((pid(), term()) -> term()), - send_after := fun((non_neg_integer(), pid(), term()) -> reference()) - } + session_id := emqx_persistent_session_ds:id() }. --define(agent_message(Msg), #agent_message{message = Msg}). -define(rank_x, rank_shared). -define(rank_y, 0). @@ -107,17 +98,25 @@ on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) -> -spec renew_streams(emqx_persistent_session_ds_state:t(), t()) -> {emqx_persistent_session_ds_state:t(), t()}. renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> - {NewLeasedStreams, RevokedStreams, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams( + {StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams( Agent0 ), - NewLeasedStreams =/= [] andalso + StreamLeaseEvents =/= [] andalso ?SLOG( - info, #{msg => shared_subs_new_stream_leases, stream_leases => NewLeasedStreams} + info, #{ + msg => shared_subs_new_stream_lease_events, stream_lease_events => StreamLeaseEvents + } ), - S1 = lists:foldl(fun accept_stream/2, S0, NewLeasedStreams), - S2 = lists:foldl(fun revoke_stream/2, S1, RevokedStreams), + S1 = lists:foldl( + fun + (#{type := lease} = Event, S) -> accept_stream(Event, S); + (#{type := revoke} = Event, S) -> revoke_stream(Event, S) + end, + S0, + StreamLeaseEvents + ), SharedSubS1 = SharedSubS0#{agent => Agent1}, - {S2, SharedSubS1}. + {S1, SharedSubS1}. -spec on_streams_replayed( emqx_persistent_session_ds_state:t(), @@ -147,14 +146,10 @@ on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) -> -spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) -> {emqx_persistent_session_ds_state:t(), t()}. -on_info(S, #{agent := Agent0} = SharedSubS0, ?agent_message(Info)) -> +on_info(S, #{agent := Agent0} = SharedSubS0, Info) -> Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info), SharedSubS1 = SharedSubS0#{agent => Agent1}, - {S, SharedSubS1}; -on_info(S, SharedSubS, _Info) -> - %% TODO - %% Log warning - {S, SharedSubS}. + {S, SharedSubS1}. -spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map(). to_map(_S, _SharedSubS) -> @@ -340,39 +335,8 @@ to_agent_subscription(_S, Subscription) -> maps:with([start_time], Subscription). -spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts(). -agent_opts(#{session_id := SessionId, send_funs := SendFuns}) -> - #{ - session_id => SessionId, - send_funs => agent_send_funs(SendFuns) - }. - -agent_send_funs(#{ - send := Send, - send_after := SendAfter -}) -> - #{ - send => fun(Pid, Msg) -> send_from_agent(Send, Pid, Msg) end, - send_after => fun(Time, Pid, Msg) -> - send_after_from_agent(SendAfter, Time, Pid, Msg) - end - }. - -send_from_agent(Send, Dest, Msg) -> - case Dest =:= self() of - true -> - Send(Dest, ?agent_message(Msg)), - Msg; - false -> - Send(Dest, Msg) - end. - -send_after_from_agent(SendAfter, Time, Dest, Msg) -> - case Dest =:= self() of - true -> - SendAfter(Time, Dest, ?agent_message(Msg)); - false -> - SendAfter(Time, Dest, Msg) - end. +agent_opts(#{session_id := SessionId}) -> + #{session_id => SessionId}. -dialyzer({nowarn_function, now_ms/0}). now_ms() -> diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl index 31a80838f..97b38d0f2 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl @@ -5,6 +5,8 @@ -module(emqx_persistent_session_ds_shared_subs_agent). -include("shared_subs_agent.hrl"). +-include("emqx_session.hrl"). +-include("session_internals.hrl"). -type session_id() :: emqx_persistent_session_ds:id(). @@ -16,18 +18,15 @@ -type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type opts() :: #{ - session_id := session_id(), - send_funs := #{ - send := fun((pid(), term()) -> term()), - send_after := fun((non_neg_integer(), pid(), term()) -> reference()) - } + session_id := session_id() }. %% TODO -%% This records goe through network, we better shrink them +%% This records go through network, we better shrink them %% * use integer keys %% * somehow avoid passing stream and topic_filter — they both are part of the iterator -type stream_lease() :: #{ + type => lease, %% Used as "external" subscription_id topic_filter := topic_filter(), stream := emqx_ds:stream(), @@ -35,10 +34,13 @@ }. -type stream_revoke() :: #{ + type => revoke, topic_filter := topic_filter(), stream := emqx_ds:stream() }. +-type stream_lease_event() :: stream_lease() | stream_revoke(). + -type stream_progress() :: #{ topic_filter := topic_filter(), stream := emqx_ds:stream(), @@ -65,6 +67,11 @@ renew_streams/1 ]). +-export([ + send/2, + send_after/3 +]). + %%-------------------------------------------------------------------- %% Behaviour %%-------------------------------------------------------------------- @@ -74,7 +81,7 @@ -callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> {ok, t()} | {error, term()}. -callback on_unsubscribe(t(), topic_filter()) -> t(). --callback renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}. +-callback renew_streams(t()) -> {[stream_lease_event()], t()}. -callback on_stream_progress(t(), [stream_progress()]) -> t(). -callback on_info(t(), term()) -> t(). @@ -99,7 +106,7 @@ on_subscribe(Agent, TopicFilter, SubOpts) -> on_unsubscribe(Agent, TopicFilter) -> ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter). --spec renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}. +-spec renew_streams(t()) -> {[stream_lease_event()], t()}. renew_streams(Agent) -> ?shared_subs_agent:renew_streams(Agent). @@ -110,3 +117,11 @@ on_stream_progress(Agent, StreamProgress) -> -spec on_info(t(), term()) -> t(). on_info(Agent, Info) -> ?shared_subs_agent:on_info(Agent, Info). + +-spec send(pid(), term()) -> term(). +send(Dest, Msg) -> + erlang:send(Dest, ?session_message(?shared_sub_message(Msg))). + +-spec send_after(non_neg_integer(), pid(), term()) -> reference(). +send_after(Time, Dest, Msg) -> + erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg))). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.erl new file mode 100644 index 000000000..95a4b7294 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.erl @@ -0,0 +1,5 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.hrl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.hrl new file mode 100644 index 000000000..b51f7bfd0 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.hrl @@ -0,0 +1,10 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_DS_SHARED_SUB_HRL). +-define(EMQX_DS_SHARED_SUB_HRL, true). + +-define(gproc_id(ID), {n, l, ID}). + +-endif. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index 425d5df3b..42a642d54 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -8,6 +8,8 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). +-include("emqx_ds_shared_sub_proto.hrl"). + -export([ new/1, open/2, @@ -20,6 +22,12 @@ renew_streams/1 ]). +%% Individual subscription state + +-define(connecting, connecting). +-define(replaying, replaying). +% -define(updating, updating). + -behaviour(emqx_persistent_session_ds_shared_subs_agent). %%-------------------------------------------------------------------- @@ -32,8 +40,8 @@ new(Opts) -> open(TopicSubscriptions, Opts) -> State0 = init_state(Opts), State1 = lists:foldl( - fun({ShareTopicFilter, #{start_time := StartTime}}, State) -> - add_subscription(State, ShareTopicFilter, StartTime) + fun({ShareTopicFilter, #{}}, State) -> + add_subscription(State, ShareTopicFilter) end, State0, TopicSubscriptions @@ -41,23 +49,38 @@ open(TopicSubscriptions, Opts) -> State1. on_subscribe(State0, TopicFilter, _SubOpts) -> - StartTime = now_ms(), - State1 = add_subscription(State0, TopicFilter, StartTime), + State1 = add_subscription(State0, TopicFilter), {ok, State1}. on_unsubscribe(State, TopicFilter) -> delete_subscription(State, TopicFilter). -renew_streams(State0) -> - State1 = do_renew_streams(State0), - {State2, StreamLeases} = stream_leases(State1), - {StreamLeases, [], State2}. +renew_streams(#{} = State) -> + fetch_stream_events(State). on_stream_progress(State, _StreamProgress) -> State. -on_info(State, _Info) -> - State. +on_info(State, ?leader_lease_streams_match(Group, StreamProgresses, Version)) -> + case State of + #{subscriptions := #{Group := Sub0} = Subs} -> + Sub1 = handle_leader_lease_streams(Sub0, StreamProgresses, Version), + State#{subscriptions => Subs#{Group => Sub1}}; + _ -> + %% TODO + %% Handle unknown group? + State + end; +on_info(State, ?leader_renew_stream_lease_match(Group, Version)) -> + case State of + #{subscriptions := #{Group := Sub0} = Subs} -> + Sub1 = handle_leader_renew_stream_lease(Sub0, Version), + State#{subscriptions => Subs#{Group => Sub1}}; + _ -> + %% TODO + %% Handle unknown group? + State + end. %%-------------------------------------------------------------------- %% Internal functions @@ -65,92 +88,94 @@ on_info(State, _Info) -> init_state(Opts) -> SessionId = maps:get(session_id, Opts), - SendFuns = maps:get(send_funs, Opts), - Send = maps:get(send, SendFuns), - SendAfter = maps:get(send_after, SendFuns), #{ session_id => SessionId, - send => Send, - end_after => SendAfter, subscriptions => #{} }. -% send(State, Pid, Msg) -> -% Send = maps:get(send, State), -% Send(Pid, Msg). +delete_subscription(State, _ShareTopicFilter) -> + %% TODO + State. -% send_after(State, Time, Pid, Msg) -> -% SendAfter = maps:get(send_after, State), -% SendAfter(Time, Pid, Msg). +add_subscription( + #{subscriptions := Subs0} = State0, ShareTopicFilter +) -> + #share{topic = TopicFilter, group = Group} = ShareTopicFilter, + ok = emqx_ds_shared_sub_registry:lookup_leader(this_agent(), TopicFilter), + Subs1 = Subs0#{ + %% TODO + %% State machine is complex, so better move it to a separate module + Group => #{ + state => ?connecting, + topic_filter => ShareTopicFilter, + streams => #{}, + version => undefined, + prev_version => undefined, + stream_lease_events => [] + } + }, + State1 = State0#{subscriptions => Subs1}, + State1. -do_renew_streams(#{subscriptions := Subs0} = State0) -> - Subs1 = maps:map( +fetch_stream_events(#{subscriptions := Subs0} = State0) -> + {Subs1, Events} = lists:foldl( fun( - ShareTopicFilter, - #{start_time := StartTime, streams := Streams0, stream_leases := StreamLeases} = Sub + {_Group, #{stream_lease_events := Events0, topic_filter := TopicFilter} = Sub}, + {SubsAcc, EventsAcc} ) -> - #share{topic = TopicFilterRaw} = ShareTopicFilter, - TopicFilter = emqx_topic:words(TopicFilterRaw), - {_, NewStreams} = lists:unzip( - emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime) - ), - {Streams1, NewLeases} = lists:foldl( - fun(Stream, {StreamsAcc, LeasesAcc}) -> - case StreamsAcc of - #{Stream := _} -> - {StreamsAcc, LeasesAcc}; - _ -> - {ok, It} = emqx_ds:make_iterator( - ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime - ), - StreamLease = #{ - topic_filter => ShareTopicFilter, - stream => Stream, - iterator => It - }, - {StreamsAcc#{Stream => It}, [StreamLease | LeasesAcc]} - end + Events1 = lists:map( + fun(Event) -> + Event#{topic_filter => TopicFilter} end, - {Streams0, []}, - NewStreams + Events0 ), - Sub#{streams => Streams1, stream_leases => StreamLeases ++ NewLeases} - end, - Subs0 - ), - State0#{subscriptions => Subs1}. - -delete_subscription(#{session_id := SessionId, subscriptions := Subs0} = State0, ShareTopicFilter) -> - #share{topic = TopicFilter} = ShareTopicFilter, - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId), - Subs1 = maps:remove(ShareTopicFilter, Subs0), - State0#{subscriptions => Subs1}. - -stream_leases(#{subscriptions := Subs0} = State0) -> - {Subs1, StreamLeases} = lists:foldl( - fun({TopicFilter, #{stream_leases := Leases} = Sub}, {SubsAcc, LeasesAcc}) -> - {SubsAcc#{TopicFilter => Sub#{stream_leases => []}}, [Leases | LeasesAcc]} + {SubsAcc#{TopicFilter => Sub#{stream_lease_events => []}}, [Events1 | EventsAcc]} end, {Subs0, []}, maps:to_list(Subs0) ), State1 = State0#{subscriptions => Subs1}, - {State1, lists:concat(StreamLeases)}. + {lists:concat(Events), State1}. -now_ms() -> - erlang:system_time(millisecond). +%%-------------------------------------------------------------------- +%% Handler of leader messages +%%-------------------------------------------------------------------- -add_subscription( - #{subscriptions := Subs0, session_id := SessionId} = State0, ShareTopicFilter, StartTime -) -> - #share{topic = TopicFilter} = ShareTopicFilter, - ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId), - Subs1 = Subs0#{ - ShareTopicFilter => #{ - start_time => StartTime, - streams => #{}, - stream_leases => [] - } - }, - State1 = State0#{subscriptions => Subs1}, - State1. +handle_leader_lease_streams(#{state := ?connecting} = Sub, StreamProgresses, Version) -> + Streams = lists:foldl( + fun(#{stream := Stream, iterator := It}, Acc) -> + Acc#{Stream => It} + end, + #{}, + StreamProgresses + ), + StreamLeaseEvents = lists:map( + fun(#{stream := Stream, iterator := It}) -> + #{ + type => lease, + stream => Stream, + iterator => It + } + end, + StreamProgresses + ), + Sub#{ + state => ?replaying, + streams => Streams, + stream_lease_events => StreamLeaseEvents, + version => Version, + last_update_time => erlang:monotonic_time(millisecond) + }. + +handle_leader_renew_stream_lease(#{state := ?replaying, version := Version} = Sub, Version) -> + Sub#{ + last_update_time => erlang:monotonic_time(millisecond) + }; +handle_leader_renew_stream_lease(Sub, _Version) -> + Sub. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +this_agent() -> self(). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl new file mode 100644 index 000000000..dea8dcf98 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -0,0 +1,308 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_leader). + +-behaviour(gen_statem). + +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_persistent_message.hrl"). +-include("emqx_ds_shared_sub_proto.hrl"). + +-export([ + register/2, + + start_link/1, + child_spec/1, + id/1, + + callback_mode/0, + init/1, + handle_event/4, + terminate/3 +]). + +-type options() :: #{ + topic_filter := emqx_persistent_session_ds:share_topic_filter() +}. + +-type stream_assignment() :: #{ + prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()), + version := emqx_ds_shared_sub_proto:version(), + streams := list(emqx_ds:stream()) +}. + +-type data() :: #{ + group := emqx_types:group(), + topic := emqx_types:topic(), + %% For ds router, not an actual session_id + router_id := binary(), + %% TODO + %% Persist progress + %% TODO + %% Implement some stats to assign evenly? + stream_progresses := #{ + emqx_ds:stream() => emqx_ds:iterator() + }, + agent_stream_assignments := #{ + emqx_ds_shared_sub_proto:agent() => stream_assignment() + }, + stream_assignments := #{ + emqx_ds:stream() => emqx_ds_shared_sub_proto:agent() + } +}. + +-export_type([ + options/0, + data/0 +]). + +%% States + +-define(waiting_registration, waiting_registration). +-define(replaying, replaying). + +%% Events + +-record(register, { + register_fun :: fun(() -> pid()) +}). +-record(renew_streams, {}). +-record(renew_leases, {}). + +%% Constants + +%% TODO +%% Move to settings +-define(RENEW_LEASE_INTERVAL, 5000). +-define(RENEW_STREAMS_INTERVAL, 5000). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +register(Pid, Fun) -> + gen_statem:call(Pid, #register{register_fun = Fun}). + +%%-------------------------------------------------------------------- +%% Internal API +%%-------------------------------------------------------------------- + +child_spec(#{topic_filter := TopicFilter} = Options) -> + #{ + id => id(TopicFilter), + start => {?MODULE, start_link, [Options]}, + restart => temporary, + shutdown => 5000, + type => worker + }. + +start_link(Options) -> + gen_statem:start_link(?MODULE, [Options], []). + +id(#share{group = Group} = _TopicFilter) -> + {?MODULE, Group}. + +%%-------------------------------------------------------------------- +%% gen_statem callbacks +%%-------------------------------------------------------------------- + +callback_mode() -> handle_event_function. + +init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) -> + Data = #{ + group => Group, + topic => Topic, + router_id => router_id(), + stream_progresses => #{}, + stream_assignments => #{} + }, + {ok, ?waiting_registration, Data}. + +%%-------------------------------------------------------------------- +%% waiting_registration state + +handle_event({call, From}, #register{register_fun = Fun}, ?waiting_registration, Data) -> + Self = self(), + case Fun() of + Self -> + {next_state, ?replaying, Data, {reply, From, {ok, Self}}}; + OtherPid -> + {stop_and_reply, normal, {reply, From, {ok, OtherPid}}} + end; +%%-------------------------------------------------------------------- +%% repalying state +handle_event(enter, _OldState, ?replaying, #{topic := Topic, router_id := RouterId} = _Data) -> + ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId), + {keep_state_and_data, [ + {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}, + {state_timeout, 0, #renew_streams{}} + ]}; +handle_event(state_timeout, #renew_streams{}, ?replaying, Data0) -> + Data1 = renew_streams(Data0), + {keep_state, Data1, {state_timeout, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}}; +handle_event(state_timeout, #renew_leases{}, ?replaying, Data0) -> + Data1 = renew_leases(Data0), + {keep_state, Data1, {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}}; +handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?replaying, Data0) -> + Data1 = connect_agent(Data0, Agent), + {keep_state, Data1}; +handle_event( + info, ?agent_update_stream_states_match(Agent, StreamProgresses, Version), ?replaying, Data0 +) -> + Data1 = update_agent_stream_states(Data0, Agent, StreamProgresses, Version), + {keep_state, Data1}; +%%-------------------------------------------------------------------- +%% fallback +handle_event(enter, _OldState, _State, _Data) -> + keep_state_and_data; +handle_event(Event, _Content, State, _Data) -> + ?SLOG(warning, #{ + msg => unexpected_event, + event => Event, + state => State + }), + keep_state_and_data. + +terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) -> + ok = emqx_persistent_session_ds_router:do_delete_route(Topic, RouterId), + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) -> + TopicFilter = emqx_topic:words(Topic), + StartTime = now_ms(), + {_, Streams} = lists:unzip( + emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, now_ms()) + ), + %% TODO + %% Handle stream removal + NewProgresses = lists:foldl( + fun(Stream, ProgressesAcc) -> + case ProgressesAcc of + #{Stream := _} -> + ProgressesAcc; + _ -> + {ok, It} = emqx_ds:make_iterator( + ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime + ), + ProgressesAcc#{Stream => It} + end + end, + Progresses, + Streams + ), + %% TODO + %% Initiate reassigment + Data0#{stream_progresses => NewProgresses}. + +%% TODO +%% This just gives unassigned streams to connecting agent, +%% we need to implement actual stream (re)assignment. +connect_agent( + #{ + group := Group, + agent_stream_assignments := AgentStreamAssignments0, + stream_assignments := StreamAssignments0, + stream_progresses := StreamProgresses + } = Data0, + Agent +) -> + {AgentStreamAssignments, StreamAssignments} = + case AgentStreamAssignments0 of + #{Agent := _} -> + {AgentStreamAssignments0, StreamAssignments0}; + _ -> + UnassignedStreams = unassigned_streams(Data0), + Version = 0, + StreamAssignment = #{ + prev_version => undefined, + version => Version, + streams => UnassignedStreams + }, + AgentStreamAssignments1 = AgentStreamAssignments0#{Agent => StreamAssignment}, + StreamAssignments1 = lists:foldl( + fun(Stream, Acc) -> + Acc#{Stream => Agent} + end, + StreamAssignments0, + UnassignedStreams + ), + StreamLease = lists:map( + fun(Stream) -> + #{ + stream => Stream, + iterator => maps:get(Stream, StreamProgresses) + } + end, + UnassignedStreams + ), + ok = emqx_ds_shared_sub_proto:leader_lease_streams( + Agent, Group, StreamLease, Version + ), + {AgentStreamAssignments1, StreamAssignments1} + end, + Data0#{ + agent_stream_assignments => AgentStreamAssignments, stream_assignments => StreamAssignments + }. + +renew_leases(#{group := Group, agent_stream_assignments := AgentStreamAssignments} = Data) -> + ok = lists:foreach( + fun({Agent, #{version := Version}}) -> + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version) + end, + maps:to_list(AgentStreamAssignments) + ), + Data. + +update_agent_stream_states( + #{ + agent_stream_assignments := AgentStreamAssignments, + stream_assignments := StreamAssignments, + stream_progresses := StreamProgresses0 + } = Data0, + Agent, + AgentStreamProgresses, + Version +) -> + AgentVersion = emqx_utils_maps:deep_get([Agent, version], AgentStreamAssignments, undefined), + AgentPrevVersion = emqx_utils_maps:deep_get( + [Agent, prev_version], AgentStreamAssignments, undefined + ), + case AgentVersion == Version orelse AgentPrevVersion == Version of + false -> + %% TODO + %% send invalidate to agent + Data0; + true -> + StreamProgresses1 = lists:foldl( + fun(#{stream := Stream, iterator := It}, ProgressesAcc) -> + %% Assert Stream is assigned to Agent + Agent = maps:get(Stream, StreamAssignments), + ProgressesAcc#{Stream => It} + end, + StreamProgresses0, + AgentStreamProgresses + ), + Data0#{stream_progresses => StreamProgresses1} + end. + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +router_id() -> + emqx_guid:to_hexstr(emqx_guid:gen()). + +now_ms() -> + erlang:system_time(millisecond). + +unassigned_streams(#{stream_progresses := StreamProgresses, stream_assignments := StreamAssignments}) -> + Streams = maps:keys(StreamProgresses), + AssignedStreams = maps:keys(StreamAssignments), + Streams -- AssignedStreams. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl new file mode 100644 index 000000000..ea4111391 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl @@ -0,0 +1,47 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_leader_sup). + +-behaviour(supervisor). + +%% API +-export([ + start_link/0, + start_leader/1, + stop_leader/1 +]). + +%% supervisor behaviour callbacks +-export([init/1]). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec start_link() -> supervisor:startlink_ret(). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +-spec start_leader(emqx_ds_shared_sub_leader:options()) -> supervisor:startchild_ret(). +start_leader(Options) -> + ChildSpec = emqx_ds_shared_sub_leader:child_spec(Options), + supervisor:start_child(?MODULE, ChildSpec). + +-spec stop_leader(emqx_ds_shared_sub_leader:topic_filter()) -> ok | {error, term()}. +stop_leader(TopicFilter) -> + supervisor:terminate_child(?MODULE, emqx_ds_shared_sub_leader:id(TopicFilter)). + +%%------------------------------------------------------------------------------ +%% supervisor behaviour callbacks +%%------------------------------------------------------------------------------ + +init([]) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 10 + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl new file mode 100644 index 000000000..b513d024f --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -0,0 +1,72 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% TODO +%% This should be wrapped with a proto_v1 module. +%% For simplicity, send as simple OTP messages for now. + +-module(emqx_ds_shared_sub_proto). + +-include("emqx_ds_shared_sub_proto.hrl"). + +-export([ + agent_connect_leader/3, + agent_update_stream_states/4, + + leader_lease_streams/4, + leader_renew_stream_lease/3 +]). + +-type agent() :: pid(). +-type leader() :: pid(). +-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). +-type group() :: emqx_types:group(). +-type version() :: non_neg_integer(). + +-type stream_progress() :: #{ + stream := emqx_ds:stream(), + iterator := emqx_ds:iterator() +}. + +-export_type([ + agent/0, + leader/0, + group/0, + version/0, + stream_progress/0 +]). + +%% agent messages + +-spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok. +agent_connect_leader(ToLeader, FromAgent, TopicFilter) -> + _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)), + ok. + +-spec agent_update_stream_states(leader(), agent(), list(stream_progress()), version()) -> ok. +agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> + _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)), + ok. + +%% ... + +%% leader messages + +-spec leader_lease_streams(agent(), group(), list(stream_progress()), version()) -> ok. +leader_lease_streams(ToAgent, OfGroup, Streams, Version) -> + _ = emqx_persistent_session_ds_shared_subs_agent:send( + ToAgent, + ?leader_lease_streams(OfGroup, Streams, Version) + ), + ok. + +-spec leader_renew_stream_lease(agent(), group(), version()) -> ok. +leader_renew_stream_lease(ToAgent, OfGroup, Version) -> + _ = emqx_persistent_session_ds_shared_subs_agent:send( + ToAgent, + ?leader_renew_stream_lease(OfGroup, Version) + ), + ok. + +%% ... diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl new file mode 100644 index 000000000..2bb978c8b --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl @@ -0,0 +1,86 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% @doc Asynchronous messages between shared sub agent and shared sub leader + +-ifndef(EMQX_DS_SHARED_SUB_PROTO_HRL). +-define(EMQX_DS_SHARED_SUB_PROTO_HRL, true). + +%% TODO +%% Make integer keys on GA + +%% NOTE +%% We do not need any kind of request/response identification, +%% because the protocol is fully event-based. + +%% agent messages, sent from agent side to the leader + +-define(agent_connect_leader_msg, agent_connect_leader). +-define(agent_update_stream_states_msg, agent_update_stream_states). +-define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout). +-define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout). + +%% Agent messages sent to the leader. +%% Leader talks to many agents, `agent` field is used to identify the sender. + +-define(agent_connect_leader(Agent, TopicFilter), #{ + type => ?agent_connect_leader_msg, + topic_filter => TopicFilter, + agent => Agent +}). + +-define(agent_connect_leader_match(Agent, TopicFilter), #{ + type := ?agent_connect_leader_msg, + topic_filter := TopicFilter, + agent := Agent +}). + +-define(agent_update_stream_states(Agent, StreamStates, Version), #{ + type => ?agent_update_stream_states_msg, + stream_states => StreamStates, + version => Version, + agent => Agent +}). + +-define(agent_update_stream_states_match(Agent, StreamStates, Version), #{ + type := ?agent_update_stream_states_msg, + stream_states := StreamStates, + version := Version, + agent := Agent +}). + +%% leader messages, sent from the leader to the agent +%% Agent may have several shared subscriptions, so may talk to several leaders +%% `group` field is used to identify the leader. + +-define(leader_lease_streams_msg, leader_lease_streams). +-define(leader_renew_stream_lease_msg, leader_renew_stream_lease). + +-define(leader_lease_streams(Group, Streams, Version), #{ + type => ?leader_lease_streams_msg, + streams => Streams, + version => Version, + group => Group +}). + +-define(leader_lease_streams_match(Group, Streams, Version), #{ + type := ?leader_lease_streams_msg, + streams := Streams, + version := Version, + group := Group +}). + +-define(leader_renew_stream_lease(Group, Version), #{ + type => ?leader_renew_stream_lease_msg, + version => Version, + group => Group +}). + +-define(leader_renew_stream_lease_match(Group, Version), #{ + type := ?leader_renew_stream_lease_msg, + version := Version, + group := Group +}). + +-endif. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl new file mode 100644 index 000000000..6dcd86c4d --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl @@ -0,0 +1,90 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_registry). + +-behaviour(gen_server). + +-include_lib("emqx/include/emqx.hrl"). +-include("emqx_ds_shared_sub.hrl"). + +-export([ + start_link/0, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-export([ + lookup_leader/2 +]). + +-record(lookup_leader, { + agent :: emqx_ds_shared_sub:agent(), + topic_filter :: emqx_persistent_session_ds:share_topic_filter() +}). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +lookup_leader(Agent, TopicFilter) -> + gen_server:cast(?MODULE, #lookup_leader{agent = Agent, topic_filter = TopicFilter}). + +%%-------------------------------------------------------------------- +%% Internal API +%%-------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + {ok, #{}}. + +handle_call(_Request, _From, State) -> + {reply, {error, unknown_request}, State}. + +handle_cast(#lookup_leader{agent = Agent, topic_filter = TopicFilter}, State) -> + State1 = do_lookup_leader(Agent, TopicFilter, State), + {noreply, State1}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +do_lookup_leader(Agent, TopicFilter, State) -> + %% TODO + %% Cluster-wide unique leader election should be implemented + Id = emqx_ds_shared_sub_leader:id(TopicFilter), + LeaderPid = + case gproc:where(?gproc_id(Id)) of + undefined -> + {ok, Pid} = emqx_ds_shared_sub_leader_sup:start_leader(#{ + topic_filter => TopicFilter + }), + {ok, NewLeaderPid} = emqx_ds_shared_sub_leader:register( + Pid, + fun() -> + {LPid, _} = gproc:reg_or_locate(?gproc_id(Id)), + LPid + end + ), + NewLeaderPid; + Pid -> + Pid + end, + ok = emqx_ds_shared_sub_proto:agent_connect_leader(LeaderPid, Agent, TopicFilter), + State.