feat(queue): move group subscription state machine to its own module

This commit is contained in:
Ilya Averyanov 2024-06-19 17:26:10 +03:00
parent bca743054b
commit e3c4816035
9 changed files with 285 additions and 107 deletions

View File

@ -107,6 +107,7 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
msg => shared_subs_new_stream_lease_events, stream_lease_events => StreamLeaseEvents
}
),
% StreamLeaseEvents =/= [] andalso ct:print("StreamLeaseEvents: ~p~n", [StreamLeaseEvents]),
S1 = lists:foldl(
fun
(#{type := lease} = Event, S) -> accept_stream(Event, S);

View File

@ -10,10 +10,10 @@
-if(?EMQX_RELEASE_EDITION == ee).
%% agent from BSL app
% -define(shared_subs_agent, emqx_ds_shared_sub_agent).
-define(shared_subs_agent, emqx_ds_shared_sub_agent).
%% Till full implementation we need to dispach to the null agent.
%% It will report "not implemented" error for attempts to use shared subscriptions.
-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
% -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
%% -if(?EMQX_RELEASE_EDITION == ee).
-else.

View File

@ -4,7 +4,6 @@
-module(emqx_ds_shared_sub_agent).
-include_lib("emqx/include/emqx_persistent_message.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
@ -22,14 +21,13 @@
renew_streams/1
]).
%% Individual subscription state
-define(connecting, connecting).
-define(replaying, replaying).
% -define(updating, updating).
-behaviour(emqx_persistent_session_ds_shared_subs_agent).
-record(message_to_group_sm, {
group :: emqx_types:group(),
message :: term()
}).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
@ -41,7 +39,7 @@ open(TopicSubscriptions, Opts) ->
State0 = init_state(Opts),
State1 = lists:foldl(
fun({ShareTopicFilter, #{}}, State) ->
add_subscription(State, ShareTopicFilter)
add_group_subscription(State, ShareTopicFilter)
end,
State0,
TopicSubscriptions
@ -49,38 +47,44 @@ open(TopicSubscriptions, Opts) ->
State1.
on_subscribe(State0, TopicFilter, _SubOpts) ->
State1 = add_subscription(State0, TopicFilter),
State1 = add_group_subscription(State0, TopicFilter),
{ok, State1}.
on_unsubscribe(State, TopicFilter) ->
delete_subscription(State, TopicFilter).
delete_group_subscription(State, TopicFilter).
renew_streams(#{} = State) ->
fetch_stream_events(State).
on_stream_progress(State, _StreamProgress) ->
%% TODO
%% Send to leader
State.
on_info(State, ?leader_lease_streams_match(Group, StreamProgresses, Version)) ->
case State of
#{subscriptions := #{Group := Sub0} = Subs} ->
Sub1 = handle_leader_lease_streams(Sub0, StreamProgresses, Version),
State#{subscriptions => Subs#{Group => Sub1}};
_ ->
%% TODO
%% Handle unknown group?
State
end;
?SLOG(info, #{
msg => leader_lease_streams,
group => Group,
streams => StreamProgresses,
version => Version
}),
with_group_sm(State, Group, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_lease_streams(GSM, StreamProgresses, Version)
end);
on_info(State, ?leader_renew_stream_lease_match(Group, Version)) ->
case State of
#{subscriptions := #{Group := Sub0} = Subs} ->
Sub1 = handle_leader_renew_stream_lease(Sub0, Version),
State#{subscriptions => Subs#{Group => Sub1}};
_ ->
%% TODO
%% Handle unknown group?
State
end.
?SLOG(info, #{
msg => leader_renew_stream_lease,
group => Group,
version => Version
}),
with_group_sm(State, Group, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version)
end);
%% Generic messages sent by group_sm's to themselves (timeouts).
on_info(State, #message_to_group_sm{group = Group, message = Message}) ->
with_group_sm(State, Group, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_info(GSM, Message)
end).
%%--------------------------------------------------------------------
%% Internal functions
@ -90,92 +94,65 @@ init_state(Opts) ->
SessionId = maps:get(session_id, Opts),
#{
session_id => SessionId,
subscriptions => #{}
groups => #{}
}.
delete_subscription(State, _ShareTopicFilter) ->
delete_group_subscription(State, _ShareTopicFilter) ->
%% TODO
State.
add_subscription(
#{subscriptions := Subs0} = State0, ShareTopicFilter
add_group_subscription(
#{groups := Groups0} = State0, ShareTopicFilter
) ->
#share{topic = TopicFilter, group = Group} = ShareTopicFilter,
ok = emqx_ds_shared_sub_registry:lookup_leader(this_agent(), TopicFilter),
Subs1 = Subs0#{
%% TODO
%% State machine is complex, so better move it to a separate module
Group => #{
state => ?connecting,
?SLOG(info, #{
msg => agent_add_group_subscription,
topic_filter => ShareTopicFilter
}),
#share{group = Group} = ShareTopicFilter,
Groups1 = Groups0#{
Group => emqx_ds_shared_sub_group_sm:new(#{
topic_filter => ShareTopicFilter,
streams => #{},
version => undefined,
prev_version => undefined,
stream_lease_events => []
}
agent => this_agent(),
send_after => send_to_subscription_after(Group)
})
},
State1 = State0#{subscriptions => Subs1},
State1 = State0#{groups => Groups1},
State1.
fetch_stream_events(#{subscriptions := Subs0} = State0) ->
{Subs1, Events} = lists:foldl(
fun(
{_Group, #{stream_lease_events := Events0, topic_filter := TopicFilter} = Sub},
{SubsAcc, EventsAcc}
) ->
Events1 = lists:map(
fun(Event) ->
Event#{topic_filter => TopicFilter}
end,
Events0
),
{SubsAcc#{TopicFilter => Sub#{stream_lease_events => []}}, [Events1 | EventsAcc]}
fetch_stream_events(#{groups := Groups0} = State0) ->
{Groups1, Events} = maps:fold(
fun(Group, GroupSM0, {GroupsAcc, EventsAcc}) ->
{GroupSM1, Events} = emqx_ds_shared_sub_group_sm:fetch_stream_events(GroupSM0),
{GroupsAcc#{Group => GroupSM1}, [Events | EventsAcc]}
end,
{Subs0, []},
maps:to_list(Subs0)
{#{}, []},
Groups0
),
State1 = State0#{subscriptions => Subs1},
State1 = State0#{groups => Groups1},
{lists:concat(Events), State1}.
%%--------------------------------------------------------------------
%% Handler of leader messages
%%--------------------------------------------------------------------
handle_leader_lease_streams(#{state := ?connecting} = Sub, StreamProgresses, Version) ->
Streams = lists:foldl(
fun(#{stream := Stream, iterator := It}, Acc) ->
Acc#{Stream => It}
end,
#{},
StreamProgresses
),
StreamLeaseEvents = lists:map(
fun(#{stream := Stream, iterator := It}) ->
#{
type => lease,
stream => Stream,
iterator => It
}
end,
StreamProgresses
),
Sub#{
state => ?replaying,
streams => Streams,
stream_lease_events => StreamLeaseEvents,
version => Version,
last_update_time => erlang:monotonic_time(millisecond)
}.
handle_leader_renew_stream_lease(#{state := ?replaying, version := Version} = Sub, Version) ->
Sub#{
last_update_time => erlang:monotonic_time(millisecond)
};
handle_leader_renew_stream_lease(Sub, _Version) ->
Sub.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
this_agent() -> self().
send_to_subscription_after(Group) ->
fun(Time, Msg) ->
emqx_persistent_session_ds_shared_subs_agent:send_after(
Time,
self(),
#message_to_group_sm{group = Group, message = Msg}
)
end.
with_group_sm(State, Group, Fun) ->
case State of
#{groups := #{Group := GSM0} = Groups} ->
GSM1 = Fun(GSM0),
State#{groups => Groups#{Group => GSM1}};
_ ->
%% TODO
%% Error?
State
end.

View File

@ -0,0 +1,143 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% @doc State machine for a single subscription of a shared subscription agent.
%% Implements GSFSM described in
%% https://github.com/emqx/eip/blob/main/active/0028-durable-shared-subscriptions.md
%% `group_sm` stands for "group state machine".
-module(emqx_ds_shared_sub_group_sm).
-include_lib("emqx/include/logger.hrl").
-export([
new/1,
%% Leader messages
handle_leader_lease_streams/3,
handle_leader_renew_stream_lease/2,
%% Self-initiated messages
handle_info/2,
%% API
fetch_stream_events/1
]).
-type options() :: #{
agent := emqx_ds_shared_sub_proto:agent(),
topic_filter := emqx_persistent_session_ds:share_topic_filter(),
send_after := fun((non_neg_integer(), term()) -> reference())
}.
%% Subscription states
-define(connecting, connecting).
-define(replaying, replaying).
-define(updating, updating).
-type group_sm() :: #{
topic_filter => emqx_persistent_session_ds:share_topic_filter(),
agent => emqx_ds_shared_sub_proto:agent(),
send_after => fun((non_neg_integer(), term()) -> reference()),
state => ?connecting | ?replaying | ?updating,
state_data => map()
}.
-spec new(options()) -> group_sm().
new(#{
agent := Agent,
topic_filter := ShareTopicFilter,
send_after := SendAfter
}) ->
?SLOG(
info,
#{
msg => group_sm_new,
agent => Agent,
topic_filter => ShareTopicFilter
}
),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter),
#{
topic_filter => ShareTopicFilter,
agent => Agent,
send_after => SendAfter,
state => ?connecting,
state_data => #{}
}.
handle_leader_lease_streams(#{state := ?connecting} = GSM, StreamProgresses, Version) ->
Streams = lists:foldl(
fun(#{stream := Stream, iterator := It}, Acc) ->
Acc#{Stream => It}
end,
#{},
StreamProgresses
),
StreamLeaseEvents = lists:map(
fun(#{stream := Stream, iterator := It}) ->
#{
type => lease,
stream => Stream,
iterator => It
}
end,
StreamProgresses
),
GSM#{
state => ?replaying,
state_data => #{
streams => Streams,
stream_lease_events => StreamLeaseEvents,
prev_version => undefined,
version => Version,
last_update_time => erlang:monotonic_time(millisecond)
}
};
handle_leader_lease_streams(GSM, _StreamProgresses, _Version) ->
GSM.
handle_leader_renew_stream_lease(
#{state := ?replaying, state_data := #{version := Version} = Data} = GSM, Version
) ->
GSM#{
state_data => Data#{last_update_time => erlang:monotonic_time(millisecond)}
};
handle_leader_renew_stream_lease(GSM, _Version) ->
GSM.
handle_info(GSM, _Info) ->
GSM.
fetch_stream_events(
#{
state := ?replaying,
topic_filter := TopicFilter,
state_data := #{stream_lease_events := Events0} = Data
} = GSM
) ->
Events1 = lists:map(
fun(Event) ->
Event#{topic_filter => TopicFilter}
end,
Events0
),
{
GSM#{
state_data => Data#{stream_lease_events => []}
},
Events1
};
fetch_stream_events(GSM) ->
{GSM, []}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
% send_after(#{send_after := SendAfter} = _GSM, Delay, Message) ->
% SendAfter(Delay, Message).

View File

@ -109,7 +109,7 @@ id(#share{group = Group} = _TopicFilter) ->
%% gen_statem callbacks
%%--------------------------------------------------------------------
callback_mode() -> handle_event_function.
callback_mode() -> [handle_event_function, state_enter].
init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) ->
Data = #{
@ -117,7 +117,8 @@ init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) ->
topic => Topic,
router_id => router_id(),
stream_progresses => #{},
stream_assignments => #{}
stream_assignments => #{},
agent_stream_assignments => #{}
},
{ok, ?waiting_registration, Data}.
@ -199,10 +200,15 @@ renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) ->
),
%% TODO
%% Initiate reassigment
?SLOG(info, #{
msg => leader_renew_streams,
topic_filter => TopicFilter,
streams => length(Streams)
}),
Data0#{stream_progresses => NewProgresses}.
%% TODO
%% This just gives unassigned streams to connecting agent,
%% This just gives unassigned streams to the connecting agent,
%% we need to implement actual stream (re)assignment.
connect_agent(
#{
@ -213,6 +219,11 @@ connect_agent(
} = Data0,
Agent
) ->
?SLOG(info, #{
msg => leader_agent_connected,
agent => Agent,
group => Group
}),
{AgentStreamAssignments, StreamAssignments} =
case AgentStreamAssignments0 of
#{Agent := _} ->
@ -242,6 +253,20 @@ connect_agent(
end,
UnassignedStreams
),
?SLOG(info, #{
msg => leader_lease_streams,
agent => Agent,
group => Group,
streams => length(StreamLease),
version => Version
}),
% ct:print("connect_agent: ~p~n", [#{
% msg => leader_lease_streams,
% agent => Agent,
% group => Group,
% streams => length(StreamLease),
% version => Version
% }]),
ok = emqx_ds_shared_sub_proto:leader_lease_streams(
Agent, Group, StreamLease, Version
),

View File

@ -9,6 +9,8 @@
%% API
-export([
start_link/0,
child_spec/0,
start_leader/1,
stop_leader/1
]).
@ -24,6 +26,16 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec child_spec() -> supervisor:child_spec().
child_spec() ->
#{
id => ?MODULE,
start => {?MODULE, start_link, []},
restart => permanent,
shutdown => 5000,
type => supervisor
}.
-spec start_leader(emqx_ds_shared_sub_leader:options()) -> supervisor:startchild_ret().
start_leader(Options) ->
ChildSpec = emqx_ds_shared_sub_leader:child_spec(Options),

View File

@ -37,7 +37,7 @@
stream_progress/0
]).
%% agent messages
%% agent -> leader messages
-spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok.
agent_connect_leader(ToLeader, FromAgent, TopicFilter) ->
@ -51,7 +51,7 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
%% ...
%% leader messages
%% leader -> agent messages
-spec leader_lease_streams(agent(), group(), list(stream_progress()), version()) -> ok.
leader_lease_streams(ToAgent, OfGroup, Streams, Version) ->

View File

@ -6,11 +6,13 @@
-behaviour(gen_server).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include("emqx_ds_shared_sub.hrl").
-export([
start_link/0,
child_spec/0,
init/1,
handle_call/3,
handle_cast/2,
@ -41,6 +43,15 @@ lookup_leader(Agent, TopicFilter) ->
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
child_spec() ->
#{
id => ?MODULE,
start => {?MODULE, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker
}.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
@ -86,5 +97,11 @@ do_lookup_leader(Agent, TopicFilter, State) ->
Pid ->
Pid
end,
?SLOG(info, #{
msg => lookup_leader,
agent => Agent,
topic_filter => TopicFilter,
leader => LeaderPid
}),
ok = emqx_ds_shared_sub_proto:agent_connect_leader(LeaderPid, Agent, TopicFilter),
State.

View File

@ -29,5 +29,8 @@ init([]) ->
intensity => 10,
period => 10
},
ChildSpecs = [],
ChildSpecs = [
emqx_ds_shared_sub_registry:child_spec(),
emqx_ds_shared_sub_leader_sup:child_spec()
],
{ok, {SupFlags, ChildSpecs}}.