diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index cc599eb06..00cafb1c2 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -660,30 +660,7 @@ handle_info(?shared_sub_message(Msg), Session = #{s := S0, shared_sub_s := Share %%-------------------------------------------------------------------- shared_sub_opts(SessionId) -> - #{ - session_id => SessionId, - send_funs => #{ - send => fun send_message/2, - send_after => fun send_message_after/3 - } - }. - -send_message(Dest, Msg) -> - case Dest =:= self() of - true -> - erlang:send(Dest, ?session_message(?shared_sub_message(Msg))), - Msg; - false -> - erlang:send(Dest, Msg) - end. - -send_message_after(Time, Dest, Msg) -> - case Dest =:= self() of - true -> - erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg))); - false -> - erlang:send_after(Time, Dest, Msg) - end. + #{session_id => SessionId}. bump_last_alive(S0) -> %% Note: we take a pessimistic approach here and assume that the client will be alive diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index c355af0a6..c4e929640 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -23,23 +23,14 @@ to_map/2 ]). --record(agent_message, { - message :: term() -}). - -type t() :: #{ agent := emqx_persistent_session_ds_shared_subs_agent:t() }. -type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type opts() :: #{ - session_id := emqx_persistent_session_ds:id(), - send_funs := #{ - send := fun((pid(), term()) -> term()), - send_after := fun((non_neg_integer(), pid(), term()) -> reference()) - } + session_id := emqx_persistent_session_ds:id() }. --define(agent_message(Msg), #agent_message{message = Msg}). -define(rank_x, rank_shared). -define(rank_y, 0). @@ -107,17 +98,20 @@ on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) -> -spec renew_streams(emqx_persistent_session_ds_state:t(), t()) -> {emqx_persistent_session_ds_state:t(), t()}. renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> - {NewLeasedStreams, RevokedStreams, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams( + {StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams( Agent0 ), - NewLeasedStreams =/= [] andalso - ?SLOG( - info, #{msg => shared_subs_new_stream_leases, stream_leases => NewLeasedStreams} - ), - S1 = lists:foldl(fun accept_stream/2, S0, NewLeasedStreams), - S2 = lists:foldl(fun revoke_stream/2, S1, RevokedStreams), + ?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}), + S1 = lists:foldl( + fun + (#{type := lease} = Event, S) -> accept_stream(Event, S); + (#{type := revoke} = Event, S) -> revoke_stream(Event, S) + end, + S0, + StreamLeaseEvents + ), SharedSubS1 = SharedSubS0#{agent => Agent1}, - {S2, SharedSubS1}. + {S1, SharedSubS1}. -spec on_streams_replayed( emqx_persistent_session_ds_state:t(), @@ -147,14 +141,10 @@ on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) -> -spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) -> {emqx_persistent_session_ds_state:t(), t()}. -on_info(S, #{agent := Agent0} = SharedSubS0, ?agent_message(Info)) -> +on_info(S, #{agent := Agent0} = SharedSubS0, Info) -> Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info), SharedSubS1 = SharedSubS0#{agent => Agent1}, - {S, SharedSubS1}; -on_info(S, SharedSubS, _Info) -> - %% TODO - %% Log warning - {S, SharedSubS}. + {S, SharedSubS1}. -spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map(). to_map(_S, _SharedSubS) -> @@ -340,39 +330,8 @@ to_agent_subscription(_S, Subscription) -> maps:with([start_time], Subscription). -spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts(). -agent_opts(#{session_id := SessionId, send_funs := SendFuns}) -> - #{ - session_id => SessionId, - send_funs => agent_send_funs(SendFuns) - }. - -agent_send_funs(#{ - send := Send, - send_after := SendAfter -}) -> - #{ - send => fun(Pid, Msg) -> send_from_agent(Send, Pid, Msg) end, - send_after => fun(Time, Pid, Msg) -> - send_after_from_agent(SendAfter, Time, Pid, Msg) - end - }. - -send_from_agent(Send, Dest, Msg) -> - case Dest =:= self() of - true -> - Send(Dest, ?agent_message(Msg)), - Msg; - false -> - Send(Dest, Msg) - end. - -send_after_from_agent(SendAfter, Time, Dest, Msg) -> - case Dest =:= self() of - true -> - SendAfter(Time, Dest, ?agent_message(Msg)); - false -> - SendAfter(Time, Dest, Msg) - end. +agent_opts(#{session_id := SessionId}) -> + #{session_id => SessionId}. -dialyzer({nowarn_function, now_ms/0}). now_ms() -> diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl index 31a80838f..97b38d0f2 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl @@ -5,6 +5,8 @@ -module(emqx_persistent_session_ds_shared_subs_agent). -include("shared_subs_agent.hrl"). +-include("emqx_session.hrl"). +-include("session_internals.hrl"). -type session_id() :: emqx_persistent_session_ds:id(). @@ -16,18 +18,15 @@ -type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type opts() :: #{ - session_id := session_id(), - send_funs := #{ - send := fun((pid(), term()) -> term()), - send_after := fun((non_neg_integer(), pid(), term()) -> reference()) - } + session_id := session_id() }. %% TODO -%% This records goe through network, we better shrink them +%% This records go through network, we better shrink them %% * use integer keys %% * somehow avoid passing stream and topic_filter — they both are part of the iterator -type stream_lease() :: #{ + type => lease, %% Used as "external" subscription_id topic_filter := topic_filter(), stream := emqx_ds:stream(), @@ -35,10 +34,13 @@ }. -type stream_revoke() :: #{ + type => revoke, topic_filter := topic_filter(), stream := emqx_ds:stream() }. +-type stream_lease_event() :: stream_lease() | stream_revoke(). + -type stream_progress() :: #{ topic_filter := topic_filter(), stream := emqx_ds:stream(), @@ -65,6 +67,11 @@ renew_streams/1 ]). +-export([ + send/2, + send_after/3 +]). + %%-------------------------------------------------------------------- %% Behaviour %%-------------------------------------------------------------------- @@ -74,7 +81,7 @@ -callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> {ok, t()} | {error, term()}. -callback on_unsubscribe(t(), topic_filter()) -> t(). --callback renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}. +-callback renew_streams(t()) -> {[stream_lease_event()], t()}. -callback on_stream_progress(t(), [stream_progress()]) -> t(). -callback on_info(t(), term()) -> t(). @@ -99,7 +106,7 @@ on_subscribe(Agent, TopicFilter, SubOpts) -> on_unsubscribe(Agent, TopicFilter) -> ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter). --spec renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}. +-spec renew_streams(t()) -> {[stream_lease_event()], t()}. renew_streams(Agent) -> ?shared_subs_agent:renew_streams(Agent). @@ -110,3 +117,11 @@ on_stream_progress(Agent, StreamProgress) -> -spec on_info(t(), term()) -> t(). on_info(Agent, Info) -> ?shared_subs_agent:on_info(Agent, Info). + +-spec send(pid(), term()) -> term(). +send(Dest, Msg) -> + erlang:send(Dest, ?session_message(?shared_sub_message(Msg))). + +-spec send_after(non_neg_integer(), pid(), term()) -> reference(). +send_after(Time, Dest, Msg) -> + erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg))). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl index 4896fb01e..e158c19e2 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl @@ -37,7 +37,7 @@ on_unsubscribe(Agent, _TopicFilter) -> Agent. renew_streams(Agent) -> - {[], [], Agent}. + {[], Agent}. on_stream_progress(Agent, _StreamProgress) -> Agent. diff --git a/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl b/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl index 6ec42d36a..4fcd43e8a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl @@ -10,25 +10,35 @@ -if(?EMQX_RELEASE_EDITION == ee). %% agent from BSL app -% -define(shared_subs_agent, emqx_ds_shared_sub_agent). + +-ifdef(TEST). + +-define(shared_subs_agent, emqx_ds_shared_sub_agent). + +%% clause of -ifdef(TEST). +-else. + %% Till full implementation we need to dispach to the null agent. %% It will report "not implemented" error for attempts to use shared subscriptions. -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). -%% -if(?EMQX_RELEASE_EDITION == ee). +%% end of -ifdef(TEST). +-endif. + +%% clause of -if(?EMQX_RELEASE_EDITION == ee). -else. -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). -%% -if(?EMQX_RELEASE_EDITION == ee). +%% end of -if(?EMQX_RELEASE_EDITION == ee). -endif. -%% -ifdef(EMQX_RELEASE_EDITION). +%% clause of -ifdef(EMQX_RELEASE_EDITION). -else. -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). -%% -ifdef(EMQX_RELEASE_EDITION). +%% end of -ifdef(EMQX_RELEASE_EDITION). -endif. -endif. diff --git a/apps/emqx_ds_shared_sub/README.md b/apps/emqx_ds_shared_sub/README.md index 41a5ab407..9c4c15870 100644 --- a/apps/emqx_ds_shared_sub/README.md +++ b/apps/emqx_ds_shared_sub/README.md @@ -1,5 +1,14 @@ # EMQX Durable Shared Subscriptions +This application makes durable session capable to cooperatively replay messages from a topic. + +# General layout and interaction with session + +![General layout](docs/images/ds_shared_subs.png) + +* The nesting reflects nesting/ownership of entity states. +* The bold arrow represent the [most complex interaction](https://github.com/emqx/eip/blob/main/active/0028-durable-shared-subscriptions.md#shared-subscription-session-handler), between session-side group subscription state machine and the shared subscription leader. + # Contributing Please see our [contributing.md](../../CONTRIBUTING.md). diff --git a/apps/emqx_ds_shared_sub/docs/images/ds_shared_subs.png b/apps/emqx_ds_shared_sub/docs/images/ds_shared_subs.png new file mode 100644 index 000000000..ae138d5b1 Binary files /dev/null and b/apps/emqx_ds_shared_sub/docs/images/ds_shared_subs.png differ diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.erl new file mode 100644 index 000000000..95a4b7294 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.erl @@ -0,0 +1,5 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index 425d5df3b..29745aa4a 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -4,10 +4,11 @@ -module(emqx_ds_shared_sub_agent). --include_lib("emqx/include/emqx_persistent_message.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). +-include("emqx_ds_shared_sub_proto.hrl"). + -export([ new/1, open/2, @@ -22,6 +23,11 @@ -behaviour(emqx_persistent_session_ds_shared_subs_agent). +-record(message_to_group_sm, { + group :: emqx_types:group(), + message :: term() +}). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -32,8 +38,8 @@ new(Opts) -> open(TopicSubscriptions, Opts) -> State0 = init_state(Opts), State1 = lists:foldl( - fun({ShareTopicFilter, #{start_time := StartTime}}, State) -> - add_subscription(State, ShareTopicFilter, StartTime) + fun({ShareTopicFilter, #{}}, State) -> + add_group_subscription(State, ShareTopicFilter) end, State0, TopicSubscriptions @@ -41,23 +47,44 @@ open(TopicSubscriptions, Opts) -> State1. on_subscribe(State0, TopicFilter, _SubOpts) -> - StartTime = now_ms(), - State1 = add_subscription(State0, TopicFilter, StartTime), + State1 = add_group_subscription(State0, TopicFilter), {ok, State1}. on_unsubscribe(State, TopicFilter) -> - delete_subscription(State, TopicFilter). + delete_group_subscription(State, TopicFilter). -renew_streams(State0) -> - State1 = do_renew_streams(State0), - {State2, StreamLeases} = stream_leases(State1), - {StreamLeases, [], State2}. +renew_streams(#{} = State) -> + fetch_stream_events(State). on_stream_progress(State, _StreamProgress) -> + %% TODO https://emqx.atlassian.net/browse/EMQX-12572 + %% Send to leader State. -on_info(State, _Info) -> - State. +on_info(State, ?leader_lease_streams_match(Group, StreamProgresses, Version)) -> + ?SLOG(info, #{ + msg => leader_lease_streams, + group => Group, + streams => StreamProgresses, + version => Version + }), + with_group_sm(State, Group, fun(GSM) -> + emqx_ds_shared_sub_group_sm:handle_leader_lease_streams(GSM, StreamProgresses, Version) + end); +on_info(State, ?leader_renew_stream_lease_match(Group, Version)) -> + ?SLOG(info, #{ + msg => leader_renew_stream_lease, + group => Group, + version => Version + }), + with_group_sm(State, Group, fun(GSM) -> + emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version) + end); +%% Generic messages sent by group_sm's to themselves (timeouts). +on_info(State, #message_to_group_sm{group = Group, message = Message}) -> + with_group_sm(State, Group, fun(GSM) -> + emqx_ds_shared_sub_group_sm:handle_info(GSM, Message) + end). %%-------------------------------------------------------------------- %% Internal functions @@ -65,92 +92,67 @@ on_info(State, _Info) -> init_state(Opts) -> SessionId = maps:get(session_id, Opts), - SendFuns = maps:get(send_funs, Opts), - Send = maps:get(send, SendFuns), - SendAfter = maps:get(send_after, SendFuns), #{ session_id => SessionId, - send => Send, - end_after => SendAfter, - subscriptions => #{} + groups => #{} }. -% send(State, Pid, Msg) -> -% Send = maps:get(send, State), -% Send(Pid, Msg). +delete_group_subscription(State, _ShareTopicFilter) -> + %% TODO https://emqx.atlassian.net/browse/EMQX-12572 + State. -% send_after(State, Time, Pid, Msg) -> -% SendAfter = maps:get(send_after, State), -% SendAfter(Time, Pid, Msg). - -do_renew_streams(#{subscriptions := Subs0} = State0) -> - Subs1 = maps:map( - fun( - ShareTopicFilter, - #{start_time := StartTime, streams := Streams0, stream_leases := StreamLeases} = Sub - ) -> - #share{topic = TopicFilterRaw} = ShareTopicFilter, - TopicFilter = emqx_topic:words(TopicFilterRaw), - {_, NewStreams} = lists:unzip( - emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime) - ), - {Streams1, NewLeases} = lists:foldl( - fun(Stream, {StreamsAcc, LeasesAcc}) -> - case StreamsAcc of - #{Stream := _} -> - {StreamsAcc, LeasesAcc}; - _ -> - {ok, It} = emqx_ds:make_iterator( - ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime - ), - StreamLease = #{ - topic_filter => ShareTopicFilter, - stream => Stream, - iterator => It - }, - {StreamsAcc#{Stream => It}, [StreamLease | LeasesAcc]} - end - end, - {Streams0, []}, - NewStreams - ), - Sub#{streams => Streams1, stream_leases => StreamLeases ++ NewLeases} - end, - Subs0 - ), - State0#{subscriptions => Subs1}. - -delete_subscription(#{session_id := SessionId, subscriptions := Subs0} = State0, ShareTopicFilter) -> - #share{topic = TopicFilter} = ShareTopicFilter, - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId), - Subs1 = maps:remove(ShareTopicFilter, Subs0), - State0#{subscriptions => Subs1}. - -stream_leases(#{subscriptions := Subs0} = State0) -> - {Subs1, StreamLeases} = lists:foldl( - fun({TopicFilter, #{stream_leases := Leases} = Sub}, {SubsAcc, LeasesAcc}) -> - {SubsAcc#{TopicFilter => Sub#{stream_leases => []}}, [Leases | LeasesAcc]} - end, - {Subs0, []}, - maps:to_list(Subs0) - ), - State1 = State0#{subscriptions => Subs1}, - {State1, lists:concat(StreamLeases)}. - -now_ms() -> - erlang:system_time(millisecond). - -add_subscription( - #{subscriptions := Subs0, session_id := SessionId} = State0, ShareTopicFilter, StartTime +add_group_subscription( + #{groups := Groups0} = State0, ShareTopicFilter ) -> - #share{topic = TopicFilter} = ShareTopicFilter, - ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId), - Subs1 = Subs0#{ - ShareTopicFilter => #{ - start_time => StartTime, - streams => #{}, - stream_leases => [] - } + ?SLOG(info, #{ + msg => agent_add_group_subscription, + topic_filter => ShareTopicFilter + }), + #share{group = Group} = ShareTopicFilter, + Groups1 = Groups0#{ + Group => emqx_ds_shared_sub_group_sm:new(#{ + topic_filter => ShareTopicFilter, + agent => this_agent(), + send_after => send_to_subscription_after(Group) + }) }, - State1 = State0#{subscriptions => Subs1}, + State1 = State0#{groups => Groups1}, State1. + +fetch_stream_events(#{groups := Groups0} = State0) -> + {Groups1, Events} = maps:fold( + fun(Group, GroupSM0, {GroupsAcc, EventsAcc}) -> + {GroupSM1, Events} = emqx_ds_shared_sub_group_sm:fetch_stream_events(GroupSM0), + {GroupsAcc#{Group => GroupSM1}, [Events | EventsAcc]} + end, + {#{}, []}, + Groups0 + ), + State1 = State0#{groups => Groups1}, + {lists:concat(Events), State1}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +this_agent() -> self(). + +send_to_subscription_after(Group) -> + fun(Time, Msg) -> + emqx_persistent_session_ds_shared_subs_agent:send_after( + Time, + self(), + #message_to_group_sm{group = Group, message = Msg} + ) + end. + +with_group_sm(State, Group, Fun) -> + case State of + #{groups := #{Group := GSM0} = Groups} -> + GSM1 = Fun(GSM0), + State#{groups => Groups#{Group => GSM1}}; + _ -> + %% TODO + %% Error? + State + end. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl new file mode 100644 index 000000000..c6bdf9d93 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -0,0 +1,282 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% @doc State machine for a single subscription of a shared subscription agent. +%% Implements GSFSM described in +%% https://github.com/emqx/eip/blob/main/active/0028-durable-shared-subscriptions.md + +%% `group_sm` stands for "group state machine". +-module(emqx_ds_shared_sub_group_sm). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-export([ + new/1, + + %% Leader messages + handle_leader_lease_streams/3, + handle_leader_renew_stream_lease/2, + + %% Self-initiated messages + handle_info/2, + + %% API + fetch_stream_events/1 +]). + +-type options() :: #{ + agent := emqx_ds_shared_sub_proto:agent(), + topic_filter := emqx_persistent_session_ds:share_topic_filter(), + send_after := fun((non_neg_integer(), term()) -> reference()) +}. + +%% Subscription states + +-define(connecting, connecting). +-define(replaying, replaying). +-define(updating, updating). + +-type state() :: ?connecting | ?replaying | ?updating. + +-type group_sm() :: #{ + topic_filter => emqx_persistent_session_ds:share_topic_filter(), + agent => emqx_ds_shared_sub_proto:agent(), + send_after => fun((non_neg_integer(), term()) -> reference()), + + state => state(), + state_data => map(), + state_timers => map() +}. + +-record(state_timeout, { + id :: reference(), + name :: atom(), + message :: term() +}). +-record(timer, { + ref :: reference(), + id :: reference() +}). + +%%----------------------------------------------------------------------- +%% Constants +%%----------------------------------------------------------------------- + +%% TODO https://emqx.atlassian.net/browse/EMQX-12574 +%% Move to settings +-define(FIND_LEADER_TIMEOUT, 1000). +-define(RENEW_LEASE_TIMEOUT, 2000). + +%%----------------------------------------------------------------------- +%% API +%%----------------------------------------------------------------------- + +-spec new(options()) -> group_sm(). +new(#{ + agent := Agent, + topic_filter := ShareTopicFilter, + send_after := SendAfter +}) -> + ?SLOG( + info, + #{ + msg => group_sm_new, + agent => Agent, + topic_filter => ShareTopicFilter + } + ), + GSM0 = #{ + topic_filter => ShareTopicFilter, + agent => Agent, + send_after => SendAfter + }, + transition(GSM0, ?connecting, #{}). + +fetch_stream_events( + #{ + state := ?replaying, + topic_filter := TopicFilter, + state_data := #{stream_lease_events := Events0} = Data + } = GSM +) -> + Events1 = lists:map( + fun(Event) -> + Event#{topic_filter => TopicFilter} + end, + Events0 + ), + { + GSM#{ + state_data => Data#{stream_lease_events => []} + }, + Events1 + }; +fetch_stream_events(GSM) -> + {GSM, []}. + +%%----------------------------------------------------------------------- +%% Event Handlers +%%----------------------------------------------------------------------- + +%%----------------------------------------------------------------------- +%% Connecting state + +handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) -> + ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter), + ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT). + +handle_leader_lease_streams( + #{state := ?connecting, topic_filter := TopicFilter} = GSM0, StreamProgresses, Version +) -> + ?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}), + Streams = lists:foldl( + fun(#{stream := Stream, iterator := It}, Acc) -> + Acc#{Stream => It} + end, + #{}, + StreamProgresses + ), + StreamLeaseEvents = lists:map( + fun(#{stream := Stream, iterator := It}) -> + #{ + type => lease, + stream => Stream, + iterator => It + } + end, + StreamProgresses + ), + transition( + GSM0, + ?replaying, + #{ + streams => Streams, + stream_lease_events => StreamLeaseEvents, + prev_version => undefined, + version => Version + } + ); +handle_leader_lease_streams(GSM, _StreamProgresses, _Version) -> + GSM. + +handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) -> + ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter), + GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT), + GSM1. + +%%----------------------------------------------------------------------- +%% Replaying state + +handle_replaying(GSM) -> + ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT). + +handle_leader_renew_stream_lease( + #{state := ?replaying, state_data := #{version := Version}} = GSM, Version +) -> + ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); +handle_leader_renew_stream_lease(GSM, _Version) -> + GSM. + +handle_renew_lease_timeout(GSM) -> + ?tp(debug, renew_lease_timeout, #{}), + transition(GSM, ?connecting, #{}). + +%%----------------------------------------------------------------------- +%% Updating state + +% handle_updating(GSM) -> +% GSM. + +%%----------------------------------------------------------------------- +%% Internal API +%%----------------------------------------------------------------------- + +handle_state_timeout( + #{state := ?connecting, topic_filter := TopicFilter} = GSM, + find_leader_timeout, + _Message +) -> + ?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}), + handle_find_leader_timeout(GSM); +handle_state_timeout( + #{state := ?replaying} = GSM, + renew_lease_timeout, + _Message +) -> + handle_renew_lease_timeout(GSM). + +handle_info( + #{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = _Info +) -> + case Timers of + #{Name := #timer{id = Id}} -> + handle_state_timeout(GSM, Name, Message); + _ -> + %% Stale timer + GSM + end; +handle_info(GSM, _Info) -> + GSM. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +transition(GSM0, NewState, NewStateData) -> + Timers = maps:get(state_timers, GSM0, #{}), + TimerNames = maps:keys(Timers), + GSM1 = lists:foldl( + fun(Name, Acc) -> + cancel_timer(Acc, Name) + end, + GSM0, + TimerNames + ), + GSM2 = GSM1#{ + state => NewState, + state_data => NewStateData, + state_timers => #{} + }, + run_enter_callback(GSM2). + +ensure_state_timeout(GSM0, Name, Delay) -> + ensure_state_timeout(GSM0, Name, Delay, Name). + +ensure_state_timeout(GSM0, Name, Delay, Message) -> + Id = make_ref(), + GSM1 = cancel_timer(GSM0, Name), + Timers = maps:get(state_timers, GSM1), + TimerMessage = #state_timeout{ + id = Id, + name = Name, + message = Message + }, + TimerRef = send_after(GSM1, Delay, TimerMessage), + GSM2 = GSM1#{ + state_timers := Timers#{Name => #timer{ref = TimerRef, id = Id}} + }, + GSM2. + +send_after(#{send_after := SendAfter} = _GSM, Delay, Message) -> + SendAfter(Delay, Message). + +cancel_timer(GSM, Name) -> + Timers = maps:get(state_timers, GSM, #{}), + case Timers of + #{Name := #timer{ref = TimerRef}} -> + _ = erlang:cancel_timer(TimerRef), + GSM#{ + state_timers := maps:remove(Name, Timers) + }; + _ -> + GSM + end. + +run_enter_callback(#{state := ?connecting} = GSM) -> + handle_connecting(GSM); +run_enter_callback(#{state := ?replaying} = GSM) -> + handle_replaying(GSM). +% run_enter_callback(#{state := ?updating} = GSM) -> +% handle_updating(GSM). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl new file mode 100644 index 000000000..5323595cf --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -0,0 +1,326 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_leader). + +-behaviour(gen_statem). + +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_persistent_message.hrl"). +-include("emqx_ds_shared_sub_proto.hrl"). + +-export([ + register/2, + + start_link/1, + child_spec/1, + id/1, + + callback_mode/0, + init/1, + handle_event/4, + terminate/3 +]). + +-type options() :: #{ + topic_filter := emqx_persistent_session_ds:share_topic_filter() +}. + +-type stream_assignment() :: #{ + prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()), + version := emqx_ds_shared_sub_proto:version(), + streams := list(emqx_ds:stream()) +}. + +-type data() :: #{ + group := emqx_types:group(), + topic := emqx_types:topic(), + %% For ds router, not an actual session_id + router_id := binary(), + %% TODO https://emqx.atlassian.net/browse/EMQX-12307 + %% Persist progress + %% TODO https://emqx.atlassian.net/browse/EMQX-12575 + %% Implement some stats to assign evenly? + stream_progresses := #{ + emqx_ds:stream() => emqx_ds:iterator() + }, + agent_stream_assignments := #{ + emqx_ds_shared_sub_proto:agent() => stream_assignment() + }, + stream_assignments := #{ + emqx_ds:stream() => emqx_ds_shared_sub_proto:agent() + } +}. + +-export_type([ + options/0, + data/0 +]). + +%% States + +-define(waiting_registration, waiting_registration). +-define(replaying, replaying). + +%% Events + +-record(register, { + register_fun :: fun(() -> pid()) +}). +-record(renew_streams, {}). +-record(renew_leases, {}). + +%% Constants + +%% TODO https://emqx.atlassian.net/browse/EMQX-12574 +%% Move to settings +-define(RENEW_LEASE_INTERVAL, 5000). +-define(RENEW_STREAMS_INTERVAL, 5000). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +register(Pid, Fun) -> + gen_statem:call(Pid, #register{register_fun = Fun}). + +%%-------------------------------------------------------------------- +%% Internal API +%%-------------------------------------------------------------------- + +child_spec(#{topic_filter := TopicFilter} = Options) -> + #{ + id => id(TopicFilter), + start => {?MODULE, start_link, [Options]}, + restart => temporary, + shutdown => 5000, + type => worker + }. + +start_link(Options) -> + gen_statem:start_link(?MODULE, [Options], []). + +id(#share{group = Group} = _TopicFilter) -> + {?MODULE, Group}. + +%%-------------------------------------------------------------------- +%% gen_statem callbacks +%%-------------------------------------------------------------------- + +callback_mode() -> [handle_event_function, state_enter]. + +init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) -> + Data = #{ + group => Group, + topic => Topic, + router_id => router_id(), + stream_progresses => #{}, + stream_assignments => #{}, + agent_stream_assignments => #{} + }, + {ok, ?waiting_registration, Data}. + +%%-------------------------------------------------------------------- +%% waiting_registration state + +handle_event({call, From}, #register{register_fun = Fun}, ?waiting_registration, Data) -> + Self = self(), + case Fun() of + Self -> + {next_state, ?replaying, Data, {reply, From, {ok, Self}}}; + OtherPid -> + {stop_and_reply, normal, {reply, From, {ok, OtherPid}}} + end; +%%-------------------------------------------------------------------- +%% repalying state +handle_event(enter, _OldState, ?replaying, #{topic := Topic, router_id := RouterId} = _Data) -> + ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId), + {keep_state_and_data, [ + {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}, + {state_timeout, 0, #renew_streams{}} + ]}; +handle_event(state_timeout, #renew_streams{}, ?replaying, Data0) -> + Data1 = renew_streams(Data0), + {keep_state, Data1, {state_timeout, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}}; +handle_event(state_timeout, #renew_leases{}, ?replaying, Data0) -> + Data1 = renew_leases(Data0), + {keep_state, Data1, {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}}; +handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?replaying, Data0) -> + Data1 = connect_agent(Data0, Agent), + {keep_state, Data1}; +handle_event( + info, ?agent_update_stream_states_match(Agent, StreamProgresses, Version), ?replaying, Data0 +) -> + Data1 = update_agent_stream_states(Data0, Agent, StreamProgresses, Version), + {keep_state, Data1}; +%%-------------------------------------------------------------------- +%% fallback +handle_event(enter, _OldState, _State, _Data) -> + keep_state_and_data; +handle_event(Event, _Content, State, _Data) -> + ?SLOG(warning, #{ + msg => unexpected_event, + event => Event, + state => State + }), + keep_state_and_data. + +terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) -> + ok = emqx_persistent_session_ds_router:do_delete_route(Topic, RouterId), + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) -> + TopicFilter = emqx_topic:words(Topic), + StartTime = now_ms(), + {_, Streams} = lists:unzip( + emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, now_ms()) + ), + %% TODO https://emqx.atlassian.net/browse/EMQX-12572 + %% Handle stream removal + NewProgresses = lists:foldl( + fun(Stream, ProgressesAcc) -> + case ProgressesAcc of + #{Stream := _} -> + ProgressesAcc; + _ -> + {ok, It} = emqx_ds:make_iterator( + ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime + ), + ProgressesAcc#{Stream => It} + end + end, + Progresses, + Streams + ), + %% TODO https://emqx.atlassian.net/browse/EMQX-12572 + %% Initiate reassigment + ?SLOG(info, #{ + msg => leader_renew_streams, + topic_filter => TopicFilter, + streams => length(Streams) + }), + Data0#{stream_progresses => NewProgresses}. + +%% TODO https://emqx.atlassian.net/browse/EMQX-12572 +%% This just gives unassigned streams to the connecting agent, +%% we need to implement actual stream (re)assignment. +connect_agent( + #{ + group := Group, + agent_stream_assignments := AgentStreamAssignments0, + stream_assignments := StreamAssignments0, + stream_progresses := StreamProgresses + } = Data0, + Agent +) -> + ?SLOG(info, #{ + msg => leader_agent_connected, + agent => Agent, + group => Group + }), + {AgentStreamAssignments, StreamAssignments} = + case AgentStreamAssignments0 of + #{Agent := _} -> + {AgentStreamAssignments0, StreamAssignments0}; + _ -> + UnassignedStreams = unassigned_streams(Data0), + Version = 0, + StreamAssignment = #{ + prev_version => undefined, + version => Version, + streams => UnassignedStreams + }, + AgentStreamAssignments1 = AgentStreamAssignments0#{Agent => StreamAssignment}, + StreamAssignments1 = lists:foldl( + fun(Stream, Acc) -> + Acc#{Stream => Agent} + end, + StreamAssignments0, + UnassignedStreams + ), + StreamLease = lists:map( + fun(Stream) -> + #{ + stream => Stream, + iterator => maps:get(Stream, StreamProgresses) + } + end, + UnassignedStreams + ), + ?SLOG(info, #{ + msg => leader_lease_streams, + agent => Agent, + group => Group, + streams => length(StreamLease), + version => Version + }), + ok = emqx_ds_shared_sub_proto:leader_lease_streams( + Agent, Group, StreamLease, Version + ), + {AgentStreamAssignments1, StreamAssignments1} + end, + Data0#{ + agent_stream_assignments => AgentStreamAssignments, stream_assignments => StreamAssignments + }. + +renew_leases(#{group := Group, agent_stream_assignments := AgentStreamAssignments} = Data) -> + ok = lists:foreach( + fun({Agent, #{version := Version}}) -> + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version) + end, + maps:to_list(AgentStreamAssignments) + ), + Data. + +update_agent_stream_states( + #{ + agent_stream_assignments := AgentStreamAssignments, + stream_assignments := StreamAssignments, + stream_progresses := StreamProgresses0 + } = Data0, + Agent, + AgentStreamProgresses, + Version +) -> + AgentVersion = emqx_utils_maps:deep_get([Agent, version], AgentStreamAssignments, undefined), + AgentPrevVersion = emqx_utils_maps:deep_get( + [Agent, prev_version], AgentStreamAssignments, undefined + ), + case AgentVersion == Version orelse AgentPrevVersion == Version of + false -> + %% TODO https://emqx.atlassian.net/browse/EMQX-12572 + %% send invalidate to agent + Data0; + true -> + StreamProgresses1 = lists:foldl( + fun(#{stream := Stream, iterator := It}, ProgressesAcc) -> + %% Assert Stream is assigned to Agent + Agent = maps:get(Stream, StreamAssignments), + ProgressesAcc#{Stream => It} + end, + StreamProgresses0, + AgentStreamProgresses + ), + Data0#{stream_progresses => StreamProgresses1} + end. + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +router_id() -> + emqx_guid:to_hexstr(emqx_guid:gen()). + +now_ms() -> + erlang:system_time(millisecond). + +unassigned_streams(#{stream_progresses := StreamProgresses, stream_assignments := StreamAssignments}) -> + Streams = maps:keys(StreamProgresses), + AssignedStreams = maps:keys(StreamAssignments), + Streams -- AssignedStreams. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl new file mode 100644 index 000000000..d511fde24 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_sup.erl @@ -0,0 +1,59 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_leader_sup). + +-behaviour(supervisor). + +%% API +-export([ + start_link/0, + child_spec/0, + + start_leader/1, + stop_leader/1 +]). + +%% supervisor behaviour callbacks +-export([init/1]). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec start_link() -> supervisor:startlink_ret(). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +-spec child_spec() -> supervisor:child_spec(). +child_spec() -> + #{ + id => ?MODULE, + start => {?MODULE, start_link, []}, + restart => permanent, + shutdown => 5000, + type => supervisor + }. + +-spec start_leader(emqx_ds_shared_sub_leader:options()) -> supervisor:startchild_ret(). +start_leader(Options) -> + ChildSpec = emqx_ds_shared_sub_leader:child_spec(Options), + supervisor:start_child(?MODULE, ChildSpec). + +-spec stop_leader(emqx_persistent_session_ds:share_topic_filter()) -> ok | {error, term()}. +stop_leader(TopicFilter) -> + supervisor:terminate_child(?MODULE, emqx_ds_shared_sub_leader:id(TopicFilter)). + +%%------------------------------------------------------------------------------ +%% supervisor behaviour callbacks +%%------------------------------------------------------------------------------ + +init([]) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 10 + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl new file mode 100644 index 000000000..d9a0b994f --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -0,0 +1,72 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% TODO https://emqx.atlassian.net/browse/EMQX-12573 +%% This should be wrapped with a proto_v1 module. +%% For simplicity, send as simple OTP messages for now. + +-module(emqx_ds_shared_sub_proto). + +-include("emqx_ds_shared_sub_proto.hrl"). + +-export([ + agent_connect_leader/3, + agent_update_stream_states/4, + + leader_lease_streams/4, + leader_renew_stream_lease/3 +]). + +-type agent() :: pid(). +-type leader() :: pid(). +-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). +-type group() :: emqx_types:group(). +-type version() :: non_neg_integer(). + +-type stream_progress() :: #{ + stream := emqx_ds:stream(), + iterator := emqx_ds:iterator() +}. + +-export_type([ + agent/0, + leader/0, + group/0, + version/0, + stream_progress/0 +]). + +%% agent -> leader messages + +-spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok. +agent_connect_leader(ToLeader, FromAgent, TopicFilter) -> + _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)), + ok. + +-spec agent_update_stream_states(leader(), agent(), list(stream_progress()), version()) -> ok. +agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> + _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)), + ok. + +%% ... + +%% leader -> agent messages + +-spec leader_lease_streams(agent(), group(), list(stream_progress()), version()) -> ok. +leader_lease_streams(ToAgent, OfGroup, Streams, Version) -> + _ = emqx_persistent_session_ds_shared_subs_agent:send( + ToAgent, + ?leader_lease_streams(OfGroup, Streams, Version) + ), + ok. + +-spec leader_renew_stream_lease(agent(), group(), version()) -> ok. +leader_renew_stream_lease(ToAgent, OfGroup, Version) -> + _ = emqx_persistent_session_ds_shared_subs_agent:send( + ToAgent, + ?leader_renew_stream_lease(OfGroup, Version) + ), + ok. + +%% ... diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl new file mode 100644 index 000000000..c780ab193 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl @@ -0,0 +1,85 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% @doc Asynchronous messages between shared sub agent and shared sub leader +%% These messages are instantiated on the receiver's side, so they do not +%% travel over the network. + +-ifndef(EMQX_DS_SHARED_SUB_PROTO_HRL). +-define(EMQX_DS_SHARED_SUB_PROTO_HRL, true). + +%% NOTE +%% We do not need any kind of request/response identification, +%% because the protocol is fully event-based. + +%% agent messages, sent from agent side to the leader + +-define(agent_connect_leader_msg, agent_connect_leader). +-define(agent_update_stream_states_msg, agent_update_stream_states). +-define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout). +-define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout). + +%% Agent messages sent to the leader. +%% Leader talks to many agents, `agent` field is used to identify the sender. + +-define(agent_connect_leader(Agent, TopicFilter), #{ + type => ?agent_connect_leader_msg, + topic_filter => TopicFilter, + agent => Agent +}). + +-define(agent_connect_leader_match(Agent, TopicFilter), #{ + type := ?agent_connect_leader_msg, + topic_filter := TopicFilter, + agent := Agent +}). + +-define(agent_update_stream_states(Agent, StreamStates, Version), #{ + type => ?agent_update_stream_states_msg, + stream_states => StreamStates, + version => Version, + agent => Agent +}). + +-define(agent_update_stream_states_match(Agent, StreamStates, Version), #{ + type := ?agent_update_stream_states_msg, + stream_states := StreamStates, + version := Version, + agent := Agent +}). + +%% leader messages, sent from the leader to the agent +%% Agent may have several shared subscriptions, so may talk to several leaders +%% `group` field is used to identify the leader. + +-define(leader_lease_streams_msg, leader_lease_streams). +-define(leader_renew_stream_lease_msg, leader_renew_stream_lease). + +-define(leader_lease_streams(Group, Streams, Version), #{ + type => ?leader_lease_streams_msg, + streams => Streams, + version => Version, + group => Group +}). + +-define(leader_lease_streams_match(Group, Streams, Version), #{ + type := ?leader_lease_streams_msg, + streams := Streams, + version := Version, + group := Group +}). + +-define(leader_renew_stream_lease(Group, Version), #{ + type => ?leader_renew_stream_lease_msg, + version => Version, + group => Group +}). + +-define(leader_renew_stream_lease_match(Group, Version), #{ + type := ?leader_renew_stream_lease_msg, + version := Version, + group := Group +}). + +-endif. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl new file mode 100644 index 000000000..9b4a6bd11 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl @@ -0,0 +1,111 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_registry). + +-behaviour(gen_server). + +-include_lib("emqx/include/logger.hrl"). + +-export([ + start_link/0, + child_spec/0, + + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-export([ + lookup_leader/2 +]). + +-record(lookup_leader, { + agent :: emqx_ds_shared_sub_proto:agent(), + topic_filter :: emqx_persistent_session_ds:share_topic_filter() +}). + +-define(gproc_id(ID), {n, l, ID}). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec lookup_leader( + emqx_ds_shared_sub_proto:agent(), emqx_persistent_session_ds:share_topic_filter() +) -> ok. +lookup_leader(Agent, TopicFilter) -> + gen_server:cast(?MODULE, #lookup_leader{agent = Agent, topic_filter = TopicFilter}). + +%%-------------------------------------------------------------------- +%% Internal API +%%-------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +child_spec() -> + #{ + id => ?MODULE, + start => {?MODULE, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker + }. + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + {ok, #{}}. + +handle_call(_Request, _From, State) -> + {reply, {error, unknown_request}, State}. + +handle_cast(#lookup_leader{agent = Agent, topic_filter = TopicFilter}, State) -> + State1 = do_lookup_leader(Agent, TopicFilter, State), + {noreply, State1}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +do_lookup_leader(Agent, TopicFilter, State) -> + %% TODO https://emqx.atlassian.net/browse/EMQX-12309 + %% Cluster-wide unique leader election should be implemented + Id = emqx_ds_shared_sub_leader:id(TopicFilter), + LeaderPid = + case gproc:where(?gproc_id(Id)) of + undefined -> + {ok, Pid} = emqx_ds_shared_sub_leader_sup:start_leader(#{ + topic_filter => TopicFilter + }), + {ok, NewLeaderPid} = emqx_ds_shared_sub_leader:register( + Pid, + fun() -> + {LPid, _} = gproc:reg_or_locate(?gproc_id(Id)), + LPid + end + ), + NewLeaderPid; + Pid -> + Pid + end, + ?SLOG(info, #{ + msg => lookup_leader, + agent => Agent, + topic_filter => TopicFilter, + leader => LeaderPid + }), + ok = emqx_ds_shared_sub_proto:agent_connect_leader(LeaderPid, Agent, TopicFilter), + State. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl index dd6100f49..137f0680a 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl @@ -29,5 +29,8 @@ init([]) -> intensity => 10, period => 10 }, - ChildSpecs = [], + ChildSpecs = [ + emqx_ds_shared_sub_registry:child_spec(), + emqx_ds_shared_sub_leader_sup:child_spec() + ], {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl new file mode 100644 index 000000000..bca8eb0eb --- /dev/null +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -0,0 +1,165 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/asserts.hrl"). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, #{ + config => #{ + <<"durable_sessions">> => #{ + <<"enable">> => true, + <<"renew_streams_interval">> => "100ms" + }, + <<"durable_storage">> => #{ + <<"messages">> => #{ + <<"backend">> => <<"builtin">> + } + } + } + }}, + emqx_ds_shared_sub + ], + #{work_dir => ?config(priv_dir, Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)), + ok. + +init_per_testcase(_TC, Config) -> + ok = snabbkaffe:start_trace(), + Config. + +end_per_testcase(_TC, _Config) -> + ok = snabbkaffe:stop(), + ok = terminate_leaders(), + ok. + +t_lease_initial(_Config) -> + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + %% Need to pre-create some streams in "topic/#". + %% Leader is dummy by far and won't update streams after the first lease to the agent. + %% So there should be some streams already when the agent connects. + ok = init_streams(ConnPub, <<"topic1/1">>), + + ConnShared = emqtt_connect_sub(<<"client_shared">>), + {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic1/#">>, 1), + + {ok, _} = emqtt:publish(ConnPub, <<"topic1/1">>, <<"hello2">>, 1), + ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000), + + ok = emqtt:disconnect(ConnShared), + ok = emqtt:disconnect(ConnPub). + +t_lease_reconnect(_Config) -> + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + %% Need to pre-create some streams in "topic/#". + %% Leader is dummy by far and won't update streams after the first lease to the agent. + %% So there should be some streams already when the agent connects. + ok = init_streams(ConnPub, <<"topic2/2">>), + + ConnShared = emqtt_connect_sub(<<"client_shared">>), + + %% Stop registry to simulate unability to find leader. + ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry), + + ?assertWaitEvent( + {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr2/topic2/#">>, 1), + #{?snk_kind := find_leader_timeout}, + 5_000 + ), + + %% Start registry, agent should retry after some time and find the leader. + ?assertWaitEvent( + {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry), + #{?snk_kind := leader_lease_streams}, + 5_000 + ), + + ct:sleep(1_000), + {ok, _} = emqtt:publish(ConnPub, <<"topic2/2">>, <<"hello2">>, 1), + + ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000), + + ok = emqtt:disconnect(ConnShared), + ok = emqtt:disconnect(ConnPub). + +t_renew_lease_timeout(_Config) -> + ConnShared = emqtt_connect_sub(<<"client_shared">>), + + ?assertWaitEvent( + {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr3/topic3/#">>, 1), + #{?snk_kind := leader_lease_streams}, + 5_000 + ), + + ?check_trace( + ?wait_async_action( + ok = terminate_leaders(), + #{?snk_kind := leader_lease_streams}, + 5_000 + ), + fun(Trace) -> + ?strict_causality( + #{?snk_kind := renew_lease_timeout}, + #{?snk_kind := leader_lease_streams}, + Trace + ) + end + ), + + ok = emqtt:disconnect(ConnShared). + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +init_streams(ConnPub, Topic) -> + ConnRegular = emqtt_connect_sub(<<"client_regular">>), + {ok, _, _} = emqtt:subscribe(ConnRegular, Topic, 1), + {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello1">>, 1), + + ?assertReceive({publish, #{payload := <<"hello1">>}}, 5_000), + + ok = emqtt:disconnect(ConnRegular). + +emqtt_connect_sub(ClientId) -> + {ok, C} = emqtt:start_link([ + {client_id, ClientId}, + {clean_start, true}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 7_200}} + ]), + {ok, _} = emqtt:connect(C), + C. + +emqtt_connect_pub(ClientId) -> + {ok, C} = emqtt:start_link([ + {client_id, ClientId}, + {clean_start, true}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(C), + C. + +terminate_leaders() -> + ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup), + {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup), + ok.