feat(queue): implement backbones of queue agent, leader and leader registry

This commit is contained in:
Ilya Averyanov 2024-06-18 21:03:51 +03:00
parent f5eb3e7471
commit bca743054b
11 changed files with 767 additions and 168 deletions

View File

@ -660,30 +660,7 @@ handle_info(?shared_sub_message(Msg), Session = #{s := S0, shared_sub_s := Share
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
shared_sub_opts(SessionId) -> shared_sub_opts(SessionId) ->
#{ #{session_id => SessionId}.
session_id => SessionId,
send_funs => #{
send => fun send_message/2,
send_after => fun send_message_after/3
}
}.
send_message(Dest, Msg) ->
case Dest =:= self() of
true ->
erlang:send(Dest, ?session_message(?shared_sub_message(Msg))),
Msg;
false ->
erlang:send(Dest, Msg)
end.
send_message_after(Time, Dest, Msg) ->
case Dest =:= self() of
true ->
erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg)));
false ->
erlang:send_after(Time, Dest, Msg)
end.
bump_last_alive(S0) -> bump_last_alive(S0) ->
%% Note: we take a pessimistic approach here and assume that the client will be alive %% Note: we take a pessimistic approach here and assume that the client will be alive

View File

@ -23,23 +23,14 @@
to_map/2 to_map/2
]). ]).
-record(agent_message, {
message :: term()
}).
-type t() :: #{ -type t() :: #{
agent := emqx_persistent_session_ds_shared_subs_agent:t() agent := emqx_persistent_session_ds_shared_subs_agent:t()
}. }.
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{ -type opts() :: #{
session_id := emqx_persistent_session_ds:id(), session_id := emqx_persistent_session_ds:id()
send_funs := #{
send := fun((pid(), term()) -> term()),
send_after := fun((non_neg_integer(), pid(), term()) -> reference())
}
}. }.
-define(agent_message(Msg), #agent_message{message = Msg}).
-define(rank_x, rank_shared). -define(rank_x, rank_shared).
-define(rank_y, 0). -define(rank_y, 0).
@ -107,17 +98,25 @@ on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) ->
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) -> -spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}. {emqx_persistent_session_ds_state:t(), t()}.
renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
{NewLeasedStreams, RevokedStreams, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams( {StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
Agent0 Agent0
), ),
NewLeasedStreams =/= [] andalso StreamLeaseEvents =/= [] andalso
?SLOG( ?SLOG(
info, #{msg => shared_subs_new_stream_leases, stream_leases => NewLeasedStreams} info, #{
msg => shared_subs_new_stream_lease_events, stream_lease_events => StreamLeaseEvents
}
),
S1 = lists:foldl(
fun
(#{type := lease} = Event, S) -> accept_stream(Event, S);
(#{type := revoke} = Event, S) -> revoke_stream(Event, S)
end,
S0,
StreamLeaseEvents
), ),
S1 = lists:foldl(fun accept_stream/2, S0, NewLeasedStreams),
S2 = lists:foldl(fun revoke_stream/2, S1, RevokedStreams),
SharedSubS1 = SharedSubS0#{agent => Agent1}, SharedSubS1 = SharedSubS0#{agent => Agent1},
{S2, SharedSubS1}. {S1, SharedSubS1}.
-spec on_streams_replayed( -spec on_streams_replayed(
emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds_state:t(),
@ -147,14 +146,10 @@ on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) -> -spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
{emqx_persistent_session_ds_state:t(), t()}. {emqx_persistent_session_ds_state:t(), t()}.
on_info(S, #{agent := Agent0} = SharedSubS0, ?agent_message(Info)) -> on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
SharedSubS1 = SharedSubS0#{agent => Agent1}, SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}; {S, SharedSubS1}.
on_info(S, SharedSubS, _Info) ->
%% TODO
%% Log warning
{S, SharedSubS}.
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map(). -spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
to_map(_S, _SharedSubS) -> to_map(_S, _SharedSubS) ->
@ -340,39 +335,8 @@ to_agent_subscription(_S, Subscription) ->
maps:with([start_time], Subscription). maps:with([start_time], Subscription).
-spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts(). -spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts().
agent_opts(#{session_id := SessionId, send_funs := SendFuns}) -> agent_opts(#{session_id := SessionId}) ->
#{ #{session_id => SessionId}.
session_id => SessionId,
send_funs => agent_send_funs(SendFuns)
}.
agent_send_funs(#{
send := Send,
send_after := SendAfter
}) ->
#{
send => fun(Pid, Msg) -> send_from_agent(Send, Pid, Msg) end,
send_after => fun(Time, Pid, Msg) ->
send_after_from_agent(SendAfter, Time, Pid, Msg)
end
}.
send_from_agent(Send, Dest, Msg) ->
case Dest =:= self() of
true ->
Send(Dest, ?agent_message(Msg)),
Msg;
false ->
Send(Dest, Msg)
end.
send_after_from_agent(SendAfter, Time, Dest, Msg) ->
case Dest =:= self() of
true ->
SendAfter(Time, Dest, ?agent_message(Msg));
false ->
SendAfter(Time, Dest, Msg)
end.
-dialyzer({nowarn_function, now_ms/0}). -dialyzer({nowarn_function, now_ms/0}).
now_ms() -> now_ms() ->

View File

@ -5,6 +5,8 @@
-module(emqx_persistent_session_ds_shared_subs_agent). -module(emqx_persistent_session_ds_shared_subs_agent).
-include("shared_subs_agent.hrl"). -include("shared_subs_agent.hrl").
-include("emqx_session.hrl").
-include("session_internals.hrl").
-type session_id() :: emqx_persistent_session_ds:id(). -type session_id() :: emqx_persistent_session_ds:id().
@ -16,18 +18,15 @@
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{ -type opts() :: #{
session_id := session_id(), session_id := session_id()
send_funs := #{
send := fun((pid(), term()) -> term()),
send_after := fun((non_neg_integer(), pid(), term()) -> reference())
}
}. }.
%% TODO %% TODO
%% This records goe through network, we better shrink them %% This records go through network, we better shrink them
%% * use integer keys %% * use integer keys
%% * somehow avoid passing stream and topic_filter they both are part of the iterator %% * somehow avoid passing stream and topic_filter they both are part of the iterator
-type stream_lease() :: #{ -type stream_lease() :: #{
type => lease,
%% Used as "external" subscription_id %% Used as "external" subscription_id
topic_filter := topic_filter(), topic_filter := topic_filter(),
stream := emqx_ds:stream(), stream := emqx_ds:stream(),
@ -35,10 +34,13 @@
}. }.
-type stream_revoke() :: #{ -type stream_revoke() :: #{
type => revoke,
topic_filter := topic_filter(), topic_filter := topic_filter(),
stream := emqx_ds:stream() stream := emqx_ds:stream()
}. }.
-type stream_lease_event() :: stream_lease() | stream_revoke().
-type stream_progress() :: #{ -type stream_progress() :: #{
topic_filter := topic_filter(), topic_filter := topic_filter(),
stream := emqx_ds:stream(), stream := emqx_ds:stream(),
@ -65,6 +67,11 @@
renew_streams/1 renew_streams/1
]). ]).
-export([
send/2,
send_after/3
]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Behaviour %% Behaviour
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -74,7 +81,7 @@
-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> -callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
{ok, t()} | {error, term()}. {ok, t()} | {error, term()}.
-callback on_unsubscribe(t(), topic_filter()) -> t(). -callback on_unsubscribe(t(), topic_filter()) -> t().
-callback renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}. -callback renew_streams(t()) -> {[stream_lease_event()], t()}.
-callback on_stream_progress(t(), [stream_progress()]) -> t(). -callback on_stream_progress(t(), [stream_progress()]) -> t().
-callback on_info(t(), term()) -> t(). -callback on_info(t(), term()) -> t().
@ -99,7 +106,7 @@ on_subscribe(Agent, TopicFilter, SubOpts) ->
on_unsubscribe(Agent, TopicFilter) -> on_unsubscribe(Agent, TopicFilter) ->
?shared_subs_agent:on_unsubscribe(Agent, TopicFilter). ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter).
-spec renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}. -spec renew_streams(t()) -> {[stream_lease_event()], t()}.
renew_streams(Agent) -> renew_streams(Agent) ->
?shared_subs_agent:renew_streams(Agent). ?shared_subs_agent:renew_streams(Agent).
@ -110,3 +117,11 @@ on_stream_progress(Agent, StreamProgress) ->
-spec on_info(t(), term()) -> t(). -spec on_info(t(), term()) -> t().
on_info(Agent, Info) -> on_info(Agent, Info) ->
?shared_subs_agent:on_info(Agent, Info). ?shared_subs_agent:on_info(Agent, Info).
-spec send(pid(), term()) -> term().
send(Dest, Msg) ->
erlang:send(Dest, ?session_message(?shared_sub_message(Msg))).
-spec send_after(non_neg_integer(), pid(), term()) -> reference().
send_after(Time, Dest, Msg) ->
erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg))).

View File

@ -0,0 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub).

View File

@ -0,0 +1,10 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-ifndef(EMQX_DS_SHARED_SUB_HRL).
-define(EMQX_DS_SHARED_SUB_HRL, true).
-define(gproc_id(ID), {n, l, ID}).
-endif.

View File

@ -8,6 +8,8 @@
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include("emqx_ds_shared_sub_proto.hrl").
-export([ -export([
new/1, new/1,
open/2, open/2,
@ -20,6 +22,12 @@
renew_streams/1 renew_streams/1
]). ]).
%% Individual subscription state
-define(connecting, connecting).
-define(replaying, replaying).
% -define(updating, updating).
-behaviour(emqx_persistent_session_ds_shared_subs_agent). -behaviour(emqx_persistent_session_ds_shared_subs_agent).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -32,8 +40,8 @@ new(Opts) ->
open(TopicSubscriptions, Opts) -> open(TopicSubscriptions, Opts) ->
State0 = init_state(Opts), State0 = init_state(Opts),
State1 = lists:foldl( State1 = lists:foldl(
fun({ShareTopicFilter, #{start_time := StartTime}}, State) -> fun({ShareTopicFilter, #{}}, State) ->
add_subscription(State, ShareTopicFilter, StartTime) add_subscription(State, ShareTopicFilter)
end, end,
State0, State0,
TopicSubscriptions TopicSubscriptions
@ -41,23 +49,38 @@ open(TopicSubscriptions, Opts) ->
State1. State1.
on_subscribe(State0, TopicFilter, _SubOpts) -> on_subscribe(State0, TopicFilter, _SubOpts) ->
StartTime = now_ms(), State1 = add_subscription(State0, TopicFilter),
State1 = add_subscription(State0, TopicFilter, StartTime),
{ok, State1}. {ok, State1}.
on_unsubscribe(State, TopicFilter) -> on_unsubscribe(State, TopicFilter) ->
delete_subscription(State, TopicFilter). delete_subscription(State, TopicFilter).
renew_streams(State0) -> renew_streams(#{} = State) ->
State1 = do_renew_streams(State0), fetch_stream_events(State).
{State2, StreamLeases} = stream_leases(State1),
{StreamLeases, [], State2}.
on_stream_progress(State, _StreamProgress) -> on_stream_progress(State, _StreamProgress) ->
State. State.
on_info(State, _Info) -> on_info(State, ?leader_lease_streams_match(Group, StreamProgresses, Version)) ->
State. case State of
#{subscriptions := #{Group := Sub0} = Subs} ->
Sub1 = handle_leader_lease_streams(Sub0, StreamProgresses, Version),
State#{subscriptions => Subs#{Group => Sub1}};
_ ->
%% TODO
%% Handle unknown group?
State
end;
on_info(State, ?leader_renew_stream_lease_match(Group, Version)) ->
case State of
#{subscriptions := #{Group := Sub0} = Subs} ->
Sub1 = handle_leader_renew_stream_lease(Sub0, Version),
State#{subscriptions => Subs#{Group => Sub1}};
_ ->
%% TODO
%% Handle unknown group?
State
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
@ -65,92 +88,94 @@ on_info(State, _Info) ->
init_state(Opts) -> init_state(Opts) ->
SessionId = maps:get(session_id, Opts), SessionId = maps:get(session_id, Opts),
SendFuns = maps:get(send_funs, Opts),
Send = maps:get(send, SendFuns),
SendAfter = maps:get(send_after, SendFuns),
#{ #{
session_id => SessionId, session_id => SessionId,
send => Send,
end_after => SendAfter,
subscriptions => #{} subscriptions => #{}
}. }.
% send(State, Pid, Msg) -> delete_subscription(State, _ShareTopicFilter) ->
% Send = maps:get(send, State), %% TODO
% Send(Pid, Msg). State.
% send_after(State, Time, Pid, Msg) -> add_subscription(
% SendAfter = maps:get(send_after, State), #{subscriptions := Subs0} = State0, ShareTopicFilter
% SendAfter(Time, Pid, Msg).
do_renew_streams(#{subscriptions := Subs0} = State0) ->
Subs1 = maps:map(
fun(
ShareTopicFilter,
#{start_time := StartTime, streams := Streams0, stream_leases := StreamLeases} = Sub
) -> ) ->
#share{topic = TopicFilterRaw} = ShareTopicFilter, #share{topic = TopicFilter, group = Group} = ShareTopicFilter,
TopicFilter = emqx_topic:words(TopicFilterRaw), ok = emqx_ds_shared_sub_registry:lookup_leader(this_agent(), TopicFilter),
{_, NewStreams} = lists:unzip( Subs1 = Subs0#{
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime) %% TODO
), %% State machine is complex, so better move it to a separate module
{Streams1, NewLeases} = lists:foldl( Group => #{
fun(Stream, {StreamsAcc, LeasesAcc}) -> state => ?connecting,
case StreamsAcc of
#{Stream := _} ->
{StreamsAcc, LeasesAcc};
_ ->
{ok, It} = emqx_ds:make_iterator(
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
),
StreamLease = #{
topic_filter => ShareTopicFilter, topic_filter => ShareTopicFilter,
stream => Stream, streams => #{},
iterator => It version => undefined,
prev_version => undefined,
stream_lease_events => []
}
}, },
{StreamsAcc#{Stream => It}, [StreamLease | LeasesAcc]} State1 = State0#{subscriptions => Subs1},
end State1.
end,
{Streams0, []},
NewStreams
),
Sub#{streams => Streams1, stream_leases => StreamLeases ++ NewLeases}
end,
Subs0
),
State0#{subscriptions => Subs1}.
delete_subscription(#{session_id := SessionId, subscriptions := Subs0} = State0, ShareTopicFilter) -> fetch_stream_events(#{subscriptions := Subs0} = State0) ->
#share{topic = TopicFilter} = ShareTopicFilter, {Subs1, Events} = lists:foldl(
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId), fun(
Subs1 = maps:remove(ShareTopicFilter, Subs0), {_Group, #{stream_lease_events := Events0, topic_filter := TopicFilter} = Sub},
State0#{subscriptions => Subs1}. {SubsAcc, EventsAcc}
) ->
stream_leases(#{subscriptions := Subs0} = State0) -> Events1 = lists:map(
{Subs1, StreamLeases} = lists:foldl( fun(Event) ->
fun({TopicFilter, #{stream_leases := Leases} = Sub}, {SubsAcc, LeasesAcc}) -> Event#{topic_filter => TopicFilter}
{SubsAcc#{TopicFilter => Sub#{stream_leases => []}}, [Leases | LeasesAcc]} end,
Events0
),
{SubsAcc#{TopicFilter => Sub#{stream_lease_events => []}}, [Events1 | EventsAcc]}
end, end,
{Subs0, []}, {Subs0, []},
maps:to_list(Subs0) maps:to_list(Subs0)
), ),
State1 = State0#{subscriptions => Subs1}, State1 = State0#{subscriptions => Subs1},
{State1, lists:concat(StreamLeases)}. {lists:concat(Events), State1}.
now_ms() -> %%--------------------------------------------------------------------
erlang:system_time(millisecond). %% Handler of leader messages
%%--------------------------------------------------------------------
add_subscription( handle_leader_lease_streams(#{state := ?connecting} = Sub, StreamProgresses, Version) ->
#{subscriptions := Subs0, session_id := SessionId} = State0, ShareTopicFilter, StartTime Streams = lists:foldl(
) -> fun(#{stream := Stream, iterator := It}, Acc) ->
#share{topic = TopicFilter} = ShareTopicFilter, Acc#{Stream => It}
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId), end,
Subs1 = Subs0#{ #{},
ShareTopicFilter => #{ StreamProgresses
start_time => StartTime, ),
streams => #{}, StreamLeaseEvents = lists:map(
stream_leases => [] fun(#{stream := Stream, iterator := It}) ->
#{
type => lease,
stream => Stream,
iterator => It
} }
}, end,
State1 = State0#{subscriptions => Subs1}, StreamProgresses
State1. ),
Sub#{
state => ?replaying,
streams => Streams,
stream_lease_events => StreamLeaseEvents,
version => Version,
last_update_time => erlang:monotonic_time(millisecond)
}.
handle_leader_renew_stream_lease(#{state := ?replaying, version := Version} = Sub, Version) ->
Sub#{
last_update_time => erlang:monotonic_time(millisecond)
};
handle_leader_renew_stream_lease(Sub, _Version) ->
Sub.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
this_agent() -> self().

View File

@ -0,0 +1,308 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_leader).
-behaviour(gen_statem).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_persistent_message.hrl").
-include("emqx_ds_shared_sub_proto.hrl").
-export([
register/2,
start_link/1,
child_spec/1,
id/1,
callback_mode/0,
init/1,
handle_event/4,
terminate/3
]).
-type options() :: #{
topic_filter := emqx_persistent_session_ds:share_topic_filter()
}.
-type stream_assignment() :: #{
prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()),
version := emqx_ds_shared_sub_proto:version(),
streams := list(emqx_ds:stream())
}.
-type data() :: #{
group := emqx_types:group(),
topic := emqx_types:topic(),
%% For ds router, not an actual session_id
router_id := binary(),
%% TODO
%% Persist progress
%% TODO
%% Implement some stats to assign evenly?
stream_progresses := #{
emqx_ds:stream() => emqx_ds:iterator()
},
agent_stream_assignments := #{
emqx_ds_shared_sub_proto:agent() => stream_assignment()
},
stream_assignments := #{
emqx_ds:stream() => emqx_ds_shared_sub_proto:agent()
}
}.
-export_type([
options/0,
data/0
]).
%% States
-define(waiting_registration, waiting_registration).
-define(replaying, replaying).
%% Events
-record(register, {
register_fun :: fun(() -> pid())
}).
-record(renew_streams, {}).
-record(renew_leases, {}).
%% Constants
%% TODO
%% Move to settings
-define(RENEW_LEASE_INTERVAL, 5000).
-define(RENEW_STREAMS_INTERVAL, 5000).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
register(Pid, Fun) ->
gen_statem:call(Pid, #register{register_fun = Fun}).
%%--------------------------------------------------------------------
%% Internal API
%%--------------------------------------------------------------------
child_spec(#{topic_filter := TopicFilter} = Options) ->
#{
id => id(TopicFilter),
start => {?MODULE, start_link, [Options]},
restart => temporary,
shutdown => 5000,
type => worker
}.
start_link(Options) ->
gen_statem:start_link(?MODULE, [Options], []).
id(#share{group = Group} = _TopicFilter) ->
{?MODULE, Group}.
%%--------------------------------------------------------------------
%% gen_statem callbacks
%%--------------------------------------------------------------------
callback_mode() -> handle_event_function.
init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) ->
Data = #{
group => Group,
topic => Topic,
router_id => router_id(),
stream_progresses => #{},
stream_assignments => #{}
},
{ok, ?waiting_registration, Data}.
%%--------------------------------------------------------------------
%% waiting_registration state
handle_event({call, From}, #register{register_fun = Fun}, ?waiting_registration, Data) ->
Self = self(),
case Fun() of
Self ->
{next_state, ?replaying, Data, {reply, From, {ok, Self}}};
OtherPid ->
{stop_and_reply, normal, {reply, From, {ok, OtherPid}}}
end;
%%--------------------------------------------------------------------
%% repalying state
handle_event(enter, _OldState, ?replaying, #{topic := Topic, router_id := RouterId} = _Data) ->
ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId),
{keep_state_and_data, [
{state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}},
{state_timeout, 0, #renew_streams{}}
]};
handle_event(state_timeout, #renew_streams{}, ?replaying, Data0) ->
Data1 = renew_streams(Data0),
{keep_state, Data1, {state_timeout, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}};
handle_event(state_timeout, #renew_leases{}, ?replaying, Data0) ->
Data1 = renew_leases(Data0),
{keep_state, Data1, {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}};
handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?replaying, Data0) ->
Data1 = connect_agent(Data0, Agent),
{keep_state, Data1};
handle_event(
info, ?agent_update_stream_states_match(Agent, StreamProgresses, Version), ?replaying, Data0
) ->
Data1 = update_agent_stream_states(Data0, Agent, StreamProgresses, Version),
{keep_state, Data1};
%%--------------------------------------------------------------------
%% fallback
handle_event(enter, _OldState, _State, _Data) ->
keep_state_and_data;
handle_event(Event, _Content, State, _Data) ->
?SLOG(warning, #{
msg => unexpected_event,
event => Event,
state => State
}),
keep_state_and_data.
terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) ->
ok = emqx_persistent_session_ds_router:do_delete_route(Topic, RouterId),
ok.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) ->
TopicFilter = emqx_topic:words(Topic),
StartTime = now_ms(),
{_, Streams} = lists:unzip(
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, now_ms())
),
%% TODO
%% Handle stream removal
NewProgresses = lists:foldl(
fun(Stream, ProgressesAcc) ->
case ProgressesAcc of
#{Stream := _} ->
ProgressesAcc;
_ ->
{ok, It} = emqx_ds:make_iterator(
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
),
ProgressesAcc#{Stream => It}
end
end,
Progresses,
Streams
),
%% TODO
%% Initiate reassigment
Data0#{stream_progresses => NewProgresses}.
%% TODO
%% This just gives unassigned streams to connecting agent,
%% we need to implement actual stream (re)assignment.
connect_agent(
#{
group := Group,
agent_stream_assignments := AgentStreamAssignments0,
stream_assignments := StreamAssignments0,
stream_progresses := StreamProgresses
} = Data0,
Agent
) ->
{AgentStreamAssignments, StreamAssignments} =
case AgentStreamAssignments0 of
#{Agent := _} ->
{AgentStreamAssignments0, StreamAssignments0};
_ ->
UnassignedStreams = unassigned_streams(Data0),
Version = 0,
StreamAssignment = #{
prev_version => undefined,
version => Version,
streams => UnassignedStreams
},
AgentStreamAssignments1 = AgentStreamAssignments0#{Agent => StreamAssignment},
StreamAssignments1 = lists:foldl(
fun(Stream, Acc) ->
Acc#{Stream => Agent}
end,
StreamAssignments0,
UnassignedStreams
),
StreamLease = lists:map(
fun(Stream) ->
#{
stream => Stream,
iterator => maps:get(Stream, StreamProgresses)
}
end,
UnassignedStreams
),
ok = emqx_ds_shared_sub_proto:leader_lease_streams(
Agent, Group, StreamLease, Version
),
{AgentStreamAssignments1, StreamAssignments1}
end,
Data0#{
agent_stream_assignments => AgentStreamAssignments, stream_assignments => StreamAssignments
}.
renew_leases(#{group := Group, agent_stream_assignments := AgentStreamAssignments} = Data) ->
ok = lists:foreach(
fun({Agent, #{version := Version}}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version)
end,
maps:to_list(AgentStreamAssignments)
),
Data.
update_agent_stream_states(
#{
agent_stream_assignments := AgentStreamAssignments,
stream_assignments := StreamAssignments,
stream_progresses := StreamProgresses0
} = Data0,
Agent,
AgentStreamProgresses,
Version
) ->
AgentVersion = emqx_utils_maps:deep_get([Agent, version], AgentStreamAssignments, undefined),
AgentPrevVersion = emqx_utils_maps:deep_get(
[Agent, prev_version], AgentStreamAssignments, undefined
),
case AgentVersion == Version orelse AgentPrevVersion == Version of
false ->
%% TODO
%% send invalidate to agent
Data0;
true ->
StreamProgresses1 = lists:foldl(
fun(#{stream := Stream, iterator := It}, ProgressesAcc) ->
%% Assert Stream is assigned to Agent
Agent = maps:get(Stream, StreamAssignments),
ProgressesAcc#{Stream => It}
end,
StreamProgresses0,
AgentStreamProgresses
),
Data0#{stream_progresses => StreamProgresses1}
end.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
router_id() ->
emqx_guid:to_hexstr(emqx_guid:gen()).
now_ms() ->
erlang:system_time(millisecond).
unassigned_streams(#{stream_progresses := StreamProgresses, stream_assignments := StreamAssignments}) ->
Streams = maps:keys(StreamProgresses),
AssignedStreams = maps:keys(StreamAssignments),
Streams -- AssignedStreams.

View File

@ -0,0 +1,47 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_leader_sup).
-behaviour(supervisor).
%% API
-export([
start_link/0,
start_leader/1,
stop_leader/1
]).
%% supervisor behaviour callbacks
-export([init/1]).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
-spec start_link() -> supervisor:startlink_ret().
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec start_leader(emqx_ds_shared_sub_leader:options()) -> supervisor:startchild_ret().
start_leader(Options) ->
ChildSpec = emqx_ds_shared_sub_leader:child_spec(Options),
supervisor:start_child(?MODULE, ChildSpec).
-spec stop_leader(emqx_ds_shared_sub_leader:topic_filter()) -> ok | {error, term()}.
stop_leader(TopicFilter) ->
supervisor:terminate_child(?MODULE, emqx_ds_shared_sub_leader:id(TopicFilter)).
%%------------------------------------------------------------------------------
%% supervisor behaviour callbacks
%%------------------------------------------------------------------------------
init([]) ->
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 10
},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -0,0 +1,72 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% TODO
%% This should be wrapped with a proto_v1 module.
%% For simplicity, send as simple OTP messages for now.
-module(emqx_ds_shared_sub_proto).
-include("emqx_ds_shared_sub_proto.hrl").
-export([
agent_connect_leader/3,
agent_update_stream_states/4,
leader_lease_streams/4,
leader_renew_stream_lease/3
]).
-type agent() :: pid().
-type leader() :: pid().
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type group() :: emqx_types:group().
-type version() :: non_neg_integer().
-type stream_progress() :: #{
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
}.
-export_type([
agent/0,
leader/0,
group/0,
version/0,
stream_progress/0
]).
%% agent messages
-spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok.
agent_connect_leader(ToLeader, FromAgent, TopicFilter) ->
_ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)),
ok.
-spec agent_update_stream_states(leader(), agent(), list(stream_progress()), version()) -> ok.
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
_ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)),
ok.
%% ...
%% leader messages
-spec leader_lease_streams(agent(), group(), list(stream_progress()), version()) -> ok.
leader_lease_streams(ToAgent, OfGroup, Streams, Version) ->
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_lease_streams(OfGroup, Streams, Version)
),
ok.
-spec leader_renew_stream_lease(agent(), group(), version()) -> ok.
leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_renew_stream_lease(OfGroup, Version)
),
ok.
%% ...

View File

@ -0,0 +1,86 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% @doc Asynchronous messages between shared sub agent and shared sub leader
-ifndef(EMQX_DS_SHARED_SUB_PROTO_HRL).
-define(EMQX_DS_SHARED_SUB_PROTO_HRL, true).
%% TODO
%% Make integer keys on GA
%% NOTE
%% We do not need any kind of request/response identification,
%% because the protocol is fully event-based.
%% agent messages, sent from agent side to the leader
-define(agent_connect_leader_msg, agent_connect_leader).
-define(agent_update_stream_states_msg, agent_update_stream_states).
-define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout).
-define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout).
%% Agent messages sent to the leader.
%% Leader talks to many agents, `agent` field is used to identify the sender.
-define(agent_connect_leader(Agent, TopicFilter), #{
type => ?agent_connect_leader_msg,
topic_filter => TopicFilter,
agent => Agent
}).
-define(agent_connect_leader_match(Agent, TopicFilter), #{
type := ?agent_connect_leader_msg,
topic_filter := TopicFilter,
agent := Agent
}).
-define(agent_update_stream_states(Agent, StreamStates, Version), #{
type => ?agent_update_stream_states_msg,
stream_states => StreamStates,
version => Version,
agent => Agent
}).
-define(agent_update_stream_states_match(Agent, StreamStates, Version), #{
type := ?agent_update_stream_states_msg,
stream_states := StreamStates,
version := Version,
agent := Agent
}).
%% leader messages, sent from the leader to the agent
%% Agent may have several shared subscriptions, so may talk to several leaders
%% `group` field is used to identify the leader.
-define(leader_lease_streams_msg, leader_lease_streams).
-define(leader_renew_stream_lease_msg, leader_renew_stream_lease).
-define(leader_lease_streams(Group, Streams, Version), #{
type => ?leader_lease_streams_msg,
streams => Streams,
version => Version,
group => Group
}).
-define(leader_lease_streams_match(Group, Streams, Version), #{
type := ?leader_lease_streams_msg,
streams := Streams,
version := Version,
group := Group
}).
-define(leader_renew_stream_lease(Group, Version), #{
type => ?leader_renew_stream_lease_msg,
version => Version,
group => Group
}).
-define(leader_renew_stream_lease_match(Group, Version), #{
type := ?leader_renew_stream_lease_msg,
version := Version,
group := Group
}).
-endif.

View File

@ -0,0 +1,90 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_registry).
-behaviour(gen_server).
-include_lib("emqx/include/emqx.hrl").
-include("emqx_ds_shared_sub.hrl").
-export([
start_link/0,
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).
-export([
lookup_leader/2
]).
-record(lookup_leader, {
agent :: emqx_ds_shared_sub:agent(),
topic_filter :: emqx_persistent_session_ds:share_topic_filter()
}).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
lookup_leader(Agent, TopicFilter) ->
gen_server:cast(?MODULE, #lookup_leader{agent = Agent, topic_filter = TopicFilter}).
%%--------------------------------------------------------------------
%% Internal API
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
{ok, #{}}.
handle_call(_Request, _From, State) ->
{reply, {error, unknown_request}, State}.
handle_cast(#lookup_leader{agent = Agent, topic_filter = TopicFilter}, State) ->
State1 = do_lookup_leader(Agent, TopicFilter, State),
{noreply, State1}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
do_lookup_leader(Agent, TopicFilter, State) ->
%% TODO
%% Cluster-wide unique leader election should be implemented
Id = emqx_ds_shared_sub_leader:id(TopicFilter),
LeaderPid =
case gproc:where(?gproc_id(Id)) of
undefined ->
{ok, Pid} = emqx_ds_shared_sub_leader_sup:start_leader(#{
topic_filter => TopicFilter
}),
{ok, NewLeaderPid} = emqx_ds_shared_sub_leader:register(
Pid,
fun() ->
{LPid, _} = gproc:reg_or_locate(?gproc_id(Id)),
LPid
end
),
NewLeaderPid;
Pid ->
Pid
end,
ok = emqx_ds_shared_sub_proto:agent_connect_leader(LeaderPid, Agent, TopicFilter),
State.