From e3c4816035a846d58aa8434fbceaade42753bb5d Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 19 Jun 2024 17:26:10 +0300 Subject: [PATCH] feat(queue): move group subscription state machine to its own module --- ...emqx_persistent_session_ds_shared_subs.erl | 1 + .../shared_subs_agent.hrl | 4 +- .../src/emqx_ds_shared_sub_agent.erl | 173 ++++++++---------- .../src/emqx_ds_shared_sub_group_sm.erl | 143 +++++++++++++++ .../src/emqx_ds_shared_sub_leader.erl | 31 +++- .../src/emqx_ds_shared_sub_leader_sup.erl | 12 ++ .../src/emqx_ds_shared_sub_proto.erl | 4 +- .../src/emqx_ds_shared_sub_registry.erl | 19 +- .../src/emqx_ds_shared_sub_sup.erl | 5 +- 9 files changed, 285 insertions(+), 107 deletions(-) create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 835a2ed12..31be7cc0b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -107,6 +107,7 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> msg => shared_subs_new_stream_lease_events, stream_lease_events => StreamLeaseEvents } ), + % StreamLeaseEvents =/= [] andalso ct:print("StreamLeaseEvents: ~p~n", [StreamLeaseEvents]), S1 = lists:foldl( fun (#{type := lease} = Event, S) -> accept_stream(Event, S); diff --git a/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl b/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl index 6ec42d36a..9cd6a7d74 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl @@ -10,10 +10,10 @@ -if(?EMQX_RELEASE_EDITION == ee). %% agent from BSL app -% -define(shared_subs_agent, emqx_ds_shared_sub_agent). +-define(shared_subs_agent, emqx_ds_shared_sub_agent). %% Till full implementation we need to dispach to the null agent. %% It will report "not implemented" error for attempts to use shared subscriptions. --define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). +% -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). %% -if(?EMQX_RELEASE_EDITION == ee). -else. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index 42a642d54..0c66759fd 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -4,7 +4,6 @@ -module(emqx_ds_shared_sub_agent). --include_lib("emqx/include/emqx_persistent_message.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -22,14 +21,13 @@ renew_streams/1 ]). -%% Individual subscription state - --define(connecting, connecting). --define(replaying, replaying). -% -define(updating, updating). - -behaviour(emqx_persistent_session_ds_shared_subs_agent). +-record(message_to_group_sm, { + group :: emqx_types:group(), + message :: term() +}). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -41,7 +39,7 @@ open(TopicSubscriptions, Opts) -> State0 = init_state(Opts), State1 = lists:foldl( fun({ShareTopicFilter, #{}}, State) -> - add_subscription(State, ShareTopicFilter) + add_group_subscription(State, ShareTopicFilter) end, State0, TopicSubscriptions @@ -49,38 +47,44 @@ open(TopicSubscriptions, Opts) -> State1. on_subscribe(State0, TopicFilter, _SubOpts) -> - State1 = add_subscription(State0, TopicFilter), + State1 = add_group_subscription(State0, TopicFilter), {ok, State1}. on_unsubscribe(State, TopicFilter) -> - delete_subscription(State, TopicFilter). + delete_group_subscription(State, TopicFilter). renew_streams(#{} = State) -> fetch_stream_events(State). on_stream_progress(State, _StreamProgress) -> + %% TODO + %% Send to leader State. on_info(State, ?leader_lease_streams_match(Group, StreamProgresses, Version)) -> - case State of - #{subscriptions := #{Group := Sub0} = Subs} -> - Sub1 = handle_leader_lease_streams(Sub0, StreamProgresses, Version), - State#{subscriptions => Subs#{Group => Sub1}}; - _ -> - %% TODO - %% Handle unknown group? - State - end; + ?SLOG(info, #{ + msg => leader_lease_streams, + group => Group, + streams => StreamProgresses, + version => Version + }), + with_group_sm(State, Group, fun(GSM) -> + emqx_ds_shared_sub_group_sm:handle_leader_lease_streams(GSM, StreamProgresses, Version) + end); on_info(State, ?leader_renew_stream_lease_match(Group, Version)) -> - case State of - #{subscriptions := #{Group := Sub0} = Subs} -> - Sub1 = handle_leader_renew_stream_lease(Sub0, Version), - State#{subscriptions => Subs#{Group => Sub1}}; - _ -> - %% TODO - %% Handle unknown group? - State - end. + ?SLOG(info, #{ + msg => leader_renew_stream_lease, + group => Group, + version => Version + }), + with_group_sm(State, Group, fun(GSM) -> + emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version) + end); +%% Generic messages sent by group_sm's to themselves (timeouts). +on_info(State, #message_to_group_sm{group = Group, message = Message}) -> + with_group_sm(State, Group, fun(GSM) -> + emqx_ds_shared_sub_group_sm:handle_info(GSM, Message) + end). %%-------------------------------------------------------------------- %% Internal functions @@ -90,92 +94,65 @@ init_state(Opts) -> SessionId = maps:get(session_id, Opts), #{ session_id => SessionId, - subscriptions => #{} + groups => #{} }. -delete_subscription(State, _ShareTopicFilter) -> +delete_group_subscription(State, _ShareTopicFilter) -> %% TODO State. -add_subscription( - #{subscriptions := Subs0} = State0, ShareTopicFilter +add_group_subscription( + #{groups := Groups0} = State0, ShareTopicFilter ) -> - #share{topic = TopicFilter, group = Group} = ShareTopicFilter, - ok = emqx_ds_shared_sub_registry:lookup_leader(this_agent(), TopicFilter), - Subs1 = Subs0#{ - %% TODO - %% State machine is complex, so better move it to a separate module - Group => #{ - state => ?connecting, + ?SLOG(info, #{ + msg => agent_add_group_subscription, + topic_filter => ShareTopicFilter + }), + #share{group = Group} = ShareTopicFilter, + Groups1 = Groups0#{ + Group => emqx_ds_shared_sub_group_sm:new(#{ topic_filter => ShareTopicFilter, - streams => #{}, - version => undefined, - prev_version => undefined, - stream_lease_events => [] - } + agent => this_agent(), + send_after => send_to_subscription_after(Group) + }) }, - State1 = State0#{subscriptions => Subs1}, + State1 = State0#{groups => Groups1}, State1. -fetch_stream_events(#{subscriptions := Subs0} = State0) -> - {Subs1, Events} = lists:foldl( - fun( - {_Group, #{stream_lease_events := Events0, topic_filter := TopicFilter} = Sub}, - {SubsAcc, EventsAcc} - ) -> - Events1 = lists:map( - fun(Event) -> - Event#{topic_filter => TopicFilter} - end, - Events0 - ), - {SubsAcc#{TopicFilter => Sub#{stream_lease_events => []}}, [Events1 | EventsAcc]} +fetch_stream_events(#{groups := Groups0} = State0) -> + {Groups1, Events} = maps:fold( + fun(Group, GroupSM0, {GroupsAcc, EventsAcc}) -> + {GroupSM1, Events} = emqx_ds_shared_sub_group_sm:fetch_stream_events(GroupSM0), + {GroupsAcc#{Group => GroupSM1}, [Events | EventsAcc]} end, - {Subs0, []}, - maps:to_list(Subs0) + {#{}, []}, + Groups0 ), - State1 = State0#{subscriptions => Subs1}, + State1 = State0#{groups => Groups1}, {lists:concat(Events), State1}. -%%-------------------------------------------------------------------- -%% Handler of leader messages -%%-------------------------------------------------------------------- - -handle_leader_lease_streams(#{state := ?connecting} = Sub, StreamProgresses, Version) -> - Streams = lists:foldl( - fun(#{stream := Stream, iterator := It}, Acc) -> - Acc#{Stream => It} - end, - #{}, - StreamProgresses - ), - StreamLeaseEvents = lists:map( - fun(#{stream := Stream, iterator := It}) -> - #{ - type => lease, - stream => Stream, - iterator => It - } - end, - StreamProgresses - ), - Sub#{ - state => ?replaying, - streams => Streams, - stream_lease_events => StreamLeaseEvents, - version => Version, - last_update_time => erlang:monotonic_time(millisecond) - }. - -handle_leader_renew_stream_lease(#{state := ?replaying, version := Version} = Sub, Version) -> - Sub#{ - last_update_time => erlang:monotonic_time(millisecond) - }; -handle_leader_renew_stream_lease(Sub, _Version) -> - Sub. - %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- this_agent() -> self(). + +send_to_subscription_after(Group) -> + fun(Time, Msg) -> + emqx_persistent_session_ds_shared_subs_agent:send_after( + Time, + self(), + #message_to_group_sm{group = Group, message = Msg} + ) + end. + +with_group_sm(State, Group, Fun) -> + case State of + #{groups := #{Group := GSM0} = Groups} -> + GSM1 = Fun(GSM0), + State#{groups => Groups#{Group => GSM1}}; + _ -> + %% TODO + %% Error? + State + end. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl new file mode 100644 index 000000000..b5e8c3f4c --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -0,0 +1,143 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% @doc State machine for a single subscription of a shared subscription agent. +%% Implements GSFSM described in +%% https://github.com/emqx/eip/blob/main/active/0028-durable-shared-subscriptions.md + +%% `group_sm` stands for "group state machine". +-module(emqx_ds_shared_sub_group_sm). + +-include_lib("emqx/include/logger.hrl"). + +-export([ + new/1, + + %% Leader messages + handle_leader_lease_streams/3, + handle_leader_renew_stream_lease/2, + + %% Self-initiated messages + handle_info/2, + + %% API + fetch_stream_events/1 +]). + +-type options() :: #{ + agent := emqx_ds_shared_sub_proto:agent(), + topic_filter := emqx_persistent_session_ds:share_topic_filter(), + send_after := fun((non_neg_integer(), term()) -> reference()) +}. + +%% Subscription states + +-define(connecting, connecting). +-define(replaying, replaying). +-define(updating, updating). + +-type group_sm() :: #{ + topic_filter => emqx_persistent_session_ds:share_topic_filter(), + agent => emqx_ds_shared_sub_proto:agent(), + send_after => fun((non_neg_integer(), term()) -> reference()), + + state => ?connecting | ?replaying | ?updating, + state_data => map() +}. + +-spec new(options()) -> group_sm(). +new(#{ + agent := Agent, + topic_filter := ShareTopicFilter, + send_after := SendAfter +}) -> + ?SLOG( + info, + #{ + msg => group_sm_new, + agent => Agent, + topic_filter => ShareTopicFilter + } + ), + ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter), + #{ + topic_filter => ShareTopicFilter, + agent => Agent, + send_after => SendAfter, + + state => ?connecting, + state_data => #{} + }. + +handle_leader_lease_streams(#{state := ?connecting} = GSM, StreamProgresses, Version) -> + Streams = lists:foldl( + fun(#{stream := Stream, iterator := It}, Acc) -> + Acc#{Stream => It} + end, + #{}, + StreamProgresses + ), + StreamLeaseEvents = lists:map( + fun(#{stream := Stream, iterator := It}) -> + #{ + type => lease, + stream => Stream, + iterator => It + } + end, + StreamProgresses + ), + GSM#{ + state => ?replaying, + state_data => #{ + streams => Streams, + stream_lease_events => StreamLeaseEvents, + prev_version => undefined, + version => Version, + last_update_time => erlang:monotonic_time(millisecond) + } + }; +handle_leader_lease_streams(GSM, _StreamProgresses, _Version) -> + GSM. + +handle_leader_renew_stream_lease( + #{state := ?replaying, state_data := #{version := Version} = Data} = GSM, Version +) -> + GSM#{ + state_data => Data#{last_update_time => erlang:monotonic_time(millisecond)} + }; +handle_leader_renew_stream_lease(GSM, _Version) -> + GSM. + +handle_info(GSM, _Info) -> + GSM. + +fetch_stream_events( + #{ + state := ?replaying, + topic_filter := TopicFilter, + state_data := #{stream_lease_events := Events0} = Data + } = GSM +) -> + Events1 = lists:map( + fun(Event) -> + Event#{topic_filter => TopicFilter} + end, + Events0 + ), + { + GSM#{ + state_data => Data#{stream_lease_events => []} + }, + Events1 + }; +fetch_stream_events(GSM) -> + {GSM, []}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +% send_after(#{send_after := SendAfter} = _GSM, Delay, Message) -> +% SendAfter(Delay, Message). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index dea8dcf98..0199b0eed 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -109,7 +109,7 @@ id(#share{group = Group} = _TopicFilter) -> %% gen_statem callbacks %%-------------------------------------------------------------------- -callback_mode() -> handle_event_function. +callback_mode() -> [handle_event_function, state_enter]. init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) -> Data = #{ @@ -117,7 +117,8 @@ init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) -> topic => Topic, router_id => router_id(), stream_progresses => #{}, - stream_assignments => #{} + stream_assignments => #{}, + agent_stream_assignments => #{} }, {ok, ?waiting_registration, Data}. @@ -199,10 +200,15 @@ renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) -> ), %% TODO %% Initiate reassigment + ?SLOG(info, #{ + msg => leader_renew_streams, + topic_filter => TopicFilter, + streams => length(Streams) + }), Data0#{stream_progresses => NewProgresses}. %% TODO -%% This just gives unassigned streams to connecting agent, +%% This just gives unassigned streams to the connecting agent, %% we need to implement actual stream (re)assignment. connect_agent( #{ @@ -213,6 +219,11 @@ connect_agent( } = Data0, Agent ) -> + ?SLOG(info, #{ + msg => leader_agent_connected, + agent => Agent, + group => Group + }), {AgentStreamAssignments, StreamAssignments} = case AgentStreamAssignments0 of #{Agent := _} -> @@ -242,6 +253,20 @@ connect_agent( end, UnassignedStreams ), + ?SLOG(info, #{ + msg => leader_lease_streams, + agent => Agent, + group => Group, + streams => length(StreamLease), + version => Version + }), + % ct:print("connect_agent: ~p~n", [#{ + % msg => leader_lease_streams, + % agent => Agent, + % group => Group, + % streams => length(StreamLease), + % version => Version + % }]), ok = emqx_ds_shared_sub_proto:leader_lease_streams( Agent, Group, StreamLease, Version ), diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl index ea4111391..27978f07d 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl @@ -9,6 +9,8 @@ %% API -export([ start_link/0, + child_spec/0, + start_leader/1, stop_leader/1 ]). @@ -24,6 +26,16 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). +-spec child_spec() -> supervisor:child_spec(). +child_spec() -> + #{ + id => ?MODULE, + start => {?MODULE, start_link, []}, + restart => permanent, + shutdown => 5000, + type => supervisor + }. + -spec start_leader(emqx_ds_shared_sub_leader:options()) -> supervisor:startchild_ret(). start_leader(Options) -> ChildSpec = emqx_ds_shared_sub_leader:child_spec(Options), diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index b513d024f..c9609a773 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -37,7 +37,7 @@ stream_progress/0 ]). -%% agent messages +%% agent -> leader messages -spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok. agent_connect_leader(ToLeader, FromAgent, TopicFilter) -> @@ -51,7 +51,7 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> %% ... -%% leader messages +%% leader -> agent messages -spec leader_lease_streams(agent(), group(), list(stream_progress()), version()) -> ok. leader_lease_streams(ToAgent, OfGroup, Streams, Version) -> diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl index 6dcd86c4d..70746264a 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl @@ -6,11 +6,13 @@ -behaviour(gen_server). --include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). -include("emqx_ds_shared_sub.hrl"). -export([ start_link/0, + child_spec/0, + init/1, handle_call/3, handle_cast/2, @@ -41,6 +43,15 @@ lookup_leader(Agent, TopicFilter) -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +child_spec() -> + #{ + id => ?MODULE, + start => {?MODULE, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker + }. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -86,5 +97,11 @@ do_lookup_leader(Agent, TopicFilter, State) -> Pid -> Pid end, + ?SLOG(info, #{ + msg => lookup_leader, + agent => Agent, + topic_filter => TopicFilter, + leader => LeaderPid + }), ok = emqx_ds_shared_sub_proto:agent_connect_leader(LeaderPid, Agent, TopicFilter), State. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl index dd6100f49..137f0680a 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl @@ -29,5 +29,8 @@ init([]) -> intensity => 10, period => 10 }, - ChildSpecs = [], + ChildSpecs = [ + emqx_ds_shared_sub_registry:child_spec(), + emqx_ds_shared_sub_leader_sup:child_spec() + ], {ok, {SupFlags, ChildSpecs}}.