diff --git a/apps/emqx/include/emqx_session.hrl b/apps/emqx/include/emqx_session.hrl index 0a3ab9b6d..acf9a5735 100644 --- a/apps/emqx/include/emqx_session.hrl +++ b/apps/emqx/include/emqx_session.hrl @@ -20,4 +20,11 @@ -define(IS_SESSION_IMPL_MEM(S), (is_tuple(S) andalso element(1, S) =:= session)). -define(IS_SESSION_IMPL_DS(S), (is_map_key(id, S))). +%% (Erlang) messages that a connection process should forward to the +%% session handler. +-record(session_message, { + message :: term() +}). +-define(session_message(MSG), #session_message{message = MSG}). + -endif. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index eb54f6ba1..b9dbe089b 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -19,6 +19,7 @@ -include("emqx.hrl"). -include("emqx_channel.hrl"). +-include("emqx_session.hrl"). -include("emqx_mqtt.hrl"). -include("emqx_access_control.hrl"). -include("logger.hrl"). @@ -1299,6 +1300,9 @@ handle_info({'DOWN', Ref, process, Pid, Reason}, Channel) -> [] -> {ok, Channel}; Msgs -> {ok, Msgs, Channel} end; +handle_info(?session_message(Message), #channel{session = Session} = Channel) -> + NSession = emqx_session:handle_info(Message, Session), + {ok, Channel#channel{session = NSession}}; handle_info(Info, Channel) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {ok, Channel}. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 1177cf653..cc599eb06 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -25,6 +25,7 @@ -include("emqx_mqtt.hrl"). +-include("emqx_session.hrl"). -include("emqx_persistent_session_ds/session_internals.hrl"). -ifdef(TEST). @@ -63,6 +64,7 @@ deliver/3, replay/3, handle_timeout/3, + handle_info/2, disconnect/2, terminate/2 ]). @@ -106,6 +108,7 @@ seqno/0, timestamp/0, topic_filter/0, + share_topic_filter/0, subscription_id/0, subscription/0, session/0, @@ -117,7 +120,8 @@ %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% an atom, in theory (?). -type id() :: binary(). --type topic_filter() :: emqx_types:topic() | #share{}. +-type share_topic_filter() :: #share{}. +-type topic_filter() :: emqx_types:topic() | share_topic_filter(). %% Subscription and subscription states: %% @@ -155,6 +159,8 @@ subopts := map() }. +-type shared_sub_state() :: term(). + -define(TIMER_PULL, timer_pull). -define(TIMER_GET_STREAMS, timer_get_streams). -define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). @@ -172,6 +178,8 @@ props := map(), %% Persistent state: s := emqx_persistent_session_ds_state:t(), + %% Shared subscription state: + shared_sub_s := shared_sub_state(), %% Buffer: inflight := emqx_persistent_session_ds_inflight:t(), %% In-progress replay: @@ -277,8 +285,11 @@ info(created_at, #{s := S}) -> emqx_persistent_session_ds_state:get_created_at(S); info(is_persistent, #{}) -> true; -info(subscriptions, #{s := S}) -> - emqx_persistent_session_ds_subs:to_map(S); +info(subscriptions, #{s := S, shared_sub_s := SharedSubS}) -> + maps:merge( + emqx_persistent_session_ds_subs:to_map(S), + emqx_persistent_session_ds_shared_subs:to_map(S, SharedSubS) + ); info(subscriptions_cnt, #{s := S}) -> emqx_persistent_session_ds_state:n_subscriptions(S); info(subscriptions_max, #{props := Conf}) -> @@ -356,15 +367,23 @@ print_session(ClientId) -> %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE %%-------------------------------------------------------------------- +%% Suppress warnings about clauses handling unimplemented results +%% of `emqx_persistent_session_ds_shared_subs:on_subscribe/3` +-dialyzer({nowarn_function, subscribe/3}). -spec subscribe(topic_filter(), emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}. subscribe( - #share{}, - _SubOpts, - _Session + #share{} = TopicFilter, + SubOpts, + Session ) -> - %% TODO: Shared subscriptions are not supported yet: - {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; + case emqx_persistent_session_ds_shared_subs:on_subscribe(TopicFilter, SubOpts, Session) of + {ok, S0, SharedSubS} -> + S = emqx_persistent_session_ds_state:commit(S0), + {ok, Session#{s => S, shared_sub_s => SharedSubS}}; + Error = {error, _} -> + Error + end; subscribe( TopicFilter, SubOpts, @@ -378,8 +397,27 @@ subscribe( Error end. +%% Suppress warnings about clauses handling unimplemented results +%% of `emqx_persistent_session_ds_shared_subs:on_unsubscribe/4` +-dialyzer({nowarn_function, unsubscribe/2}). -spec unsubscribe(topic_filter(), session()) -> {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}. +unsubscribe( + #share{} = TopicFilter, + Session = #{id := SessionId, s := S0, shared_sub_s := SharedSubS0} +) -> + case + emqx_persistent_session_ds_shared_subs:on_unsubscribe( + SessionId, TopicFilter, S0, SharedSubS0 + ) + of + {ok, S1, SharedSubS1, #{id := SubId, subopts := SubOpts}} -> + S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1), + S = emqx_persistent_session_ds_state:commit(S2), + {ok, Session#{s => S, shared_sub_s => SharedSubS1}, SubOpts}; + Error = {error, _} -> + Error + end; unsubscribe( TopicFilter, Session = #{id := SessionId, s := S0} @@ -540,6 +578,8 @@ pubcomp(_ClientInfo, PacketId, Session0) -> end. %%-------------------------------------------------------------------- +%% Delivers +%%-------------------------------------------------------------------- -spec deliver(clientinfo(), [emqx_types:deliver()], session()) -> {ok, replies(), session()}. @@ -551,6 +591,10 @@ deliver(ClientInfo, Delivers, Session0) -> ), {ok, [], pull_now(Session)}. +%%-------------------------------------------------------------------- +%% Timeouts +%%-------------------------------------------------------------------- + -spec handle_timeout(clientinfo(), _Timeout, session()) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. handle_timeout(ClientInfo, ?TIMER_PULL, Session0) -> @@ -573,14 +617,15 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) -> handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) -> Session = replay_streams(Session0, ClientInfo), {ok, [], Session}; -handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) -> +handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) -> S1 = emqx_persistent_session_ds_subs:gc(S0), - S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1), + S2 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1), + {S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S2, SharedSubS0), Interval = get_config(ClientInfo, [renew_streams_interval]), Session = emqx_session:ensure_timer( ?TIMER_GET_STREAMS, Interval, - Session0#{s => S} + Session0#{s => S, shared_sub_s => SharedSubS} ), {ok, [], Session}; handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0 = #{s := S0}) -> @@ -601,6 +646,45 @@ handle_timeout(_ClientInfo, Timeout, Session) -> ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}), {ok, [], Session}. +%%-------------------------------------------------------------------- +%% Generic messages +%%-------------------------------------------------------------------- + +-spec handle_info(term(), session()) -> session(). +handle_info(?shared_sub_message(Msg), Session = #{s := S0, shared_sub_s := SharedSubS0}) -> + {S, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_info(S0, SharedSubS0, Msg), + Session#{s => S, shared_sub_s => SharedSubS}. + +%%-------------------------------------------------------------------- +%% Shared subscription outgoing messages +%%-------------------------------------------------------------------- + +shared_sub_opts(SessionId) -> + #{ + session_id => SessionId, + send_funs => #{ + send => fun send_message/2, + send_after => fun send_message_after/3 + } + }. + +send_message(Dest, Msg) -> + case Dest =:= self() of + true -> + erlang:send(Dest, ?session_message(?shared_sub_message(Msg))), + Msg; + false -> + erlang:send(Dest, Msg) + end. + +send_message_after(Time, Dest, Msg) -> + case Dest =:= self() of + true -> + erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg))); + false -> + erlang:send_after(Time, Dest, Msg) + end. + bump_last_alive(S0) -> %% Note: we take a pessimistic approach here and assume that the client will be alive %% until the next bump timeout. With this, we avoid garbage collecting this session @@ -814,13 +898,17 @@ session_open( S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3), S5 = set_clientinfo(ClientInfo, S4), S6 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S5), - S = emqx_persistent_session_ds_state:commit(S6), + {ok, S7, SharedSubS} = emqx_persistent_session_ds_shared_subs:open( + S6, shared_sub_opts(SessionId) + ), + S = emqx_persistent_session_ds_state:commit(S7), Inflight = emqx_persistent_session_ds_inflight:new( receive_maximum(NewConnInfo) ), #{ id => SessionId, s => S, + shared_sub_s => SharedSubS, inflight => Inflight, props => #{} } @@ -869,6 +957,7 @@ session_ensure_new( id => Id, props => Conf, s => S, + shared_sub_s => emqx_persistent_session_ds_shared_subs:new(shared_sub_opts(Id)), inflight => emqx_persistent_session_ds_inflight:new(receive_maximum(ConnInfo)) }. @@ -879,8 +968,8 @@ session_drop(SessionId, Reason) -> case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> ?tp(debug, drop_persistent_session, #{client_id => SessionId, reason => Reason}), - emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0), - emqx_persistent_session_ds_state:delete(SessionId); + ok = emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0), + ok = emqx_persistent_session_ds_state:delete(SessionId); undefined -> ok end. @@ -917,9 +1006,12 @@ do_ensure_all_iterators_closed(_DSSessionID) -> %% Normal replay: %%-------------------------------------------------------------------- -fetch_new_messages(Session = #{s := S}, ClientInfo) -> - Streams = emqx_persistent_session_ds_stream_scheduler:find_new_streams(S), - fetch_new_messages(Streams, Session, ClientInfo). +fetch_new_messages(Session0 = #{s := S0}, ClientInfo) -> + Streams = emqx_persistent_session_ds_stream_scheduler:find_new_streams(S0), + Session1 = fetch_new_messages(Streams, Session0, ClientInfo), + #{s := S1, shared_sub_s := SharedSubS0} = Session1, + {S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0), + Session1#{s => S2, shared_sub_s => SharedSubS1}. fetch_new_messages([], Session, _ClientInfo) -> Session; diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl new file mode 100644 index 000000000..c355af0a6 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -0,0 +1,379 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_ds_shared_subs). + +-include("emqx_mqtt.hrl"). +-include("logger.hrl"). +-include("session_internals.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). + +-export([ + new/1, + open/2, + + on_subscribe/3, + on_unsubscribe/4, + + on_streams_replayed/2, + on_info/3, + + renew_streams/2, + to_map/2 +]). + +-record(agent_message, { + message :: term() +}). + +-type t() :: #{ + agent := emqx_persistent_session_ds_shared_subs_agent:t() +}. +-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). +-type opts() :: #{ + session_id := emqx_persistent_session_ds:id(), + send_funs := #{ + send := fun((pid(), term()) -> term()), + send_after := fun((non_neg_integer(), pid(), term()) -> reference()) + } +}. + +-define(agent_message(Msg), #agent_message{message = Msg}). +-define(rank_x, rank_shared). +-define(rank_y, 0). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec new(opts()) -> t(). +new(Opts) -> + #{ + agent => emqx_persistent_session_ds_shared_subs_agent:new( + agent_opts(Opts) + ) + }. + +-spec open(emqx_persistent_session_ds_state:t(), opts()) -> + {ok, emqx_persistent_session_ds_state:t(), t()}. +open(S, Opts) -> + SharedSubscriptions = fold_shared_subs( + fun(#share{} = TopicFilter, Sub, Acc) -> + [{TopicFilter, to_agent_subscription(S, Sub)} | Acc] + end, + [], + S + ), + Agent = emqx_persistent_session_ds_shared_subs_agent:open( + SharedSubscriptions, agent_opts(Opts) + ), + SharedSubS = #{agent => Agent}, + {ok, S, SharedSubS}. + +-spec on_subscribe( + share_topic_filter(), + emqx_types:subopts(), + emqx_persistent_session_ds:session() +) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}. +on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) -> + Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S), + on_subscribe(Subscription, TopicFilter, SubOpts, Session). + +-spec on_unsubscribe( + emqx_persistent_session_ds:id(), + emqx_persistent_session_ds:topic_filter(), + emqx_persistent_session_ds_state:t(), + t() +) -> + {ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()} + | {error, emqx_types:reason_code()}. +on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) -> + case lookup(TopicFilter, S0) of + undefined -> + {error, ?RC_NO_SUBSCRIPTION_EXISTED}; + Subscription -> + ?tp(persistent_session_ds_subscription_delete, #{ + session_id => SessionId, topic_filter => TopicFilter + }), + Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe( + Agent0, TopicFilter + ), + SharedSubS = SharedSubS0#{agent => Agent1}, + S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), + {ok, S, SharedSubS, Subscription} + end. + +-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) -> + {emqx_persistent_session_ds_state:t(), t()}. +renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> + {NewLeasedStreams, RevokedStreams, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams( + Agent0 + ), + NewLeasedStreams =/= [] andalso + ?SLOG( + info, #{msg => shared_subs_new_stream_leases, stream_leases => NewLeasedStreams} + ), + S1 = lists:foldl(fun accept_stream/2, S0, NewLeasedStreams), + S2 = lists:foldl(fun revoke_stream/2, S1, RevokedStreams), + SharedSubS1 = SharedSubS0#{agent => Agent1}, + {S2, SharedSubS1}. + +-spec on_streams_replayed( + emqx_persistent_session_ds_state:t(), + t() +) -> {emqx_persistent_session_ds_state:t(), t()}. +on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) -> + %% TODO + %% Is it sufficient for a report? + Progress = fold_shared_stream_states( + fun(TopicFilter, Stream, SRS, Acc) -> + #srs{it_begin = BeginIt} = SRS, + StreamProgress = #{ + topic_filter => TopicFilter, + stream => Stream, + iterator => BeginIt + }, + [StreamProgress | Acc] + end, + [], + S + ), + Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress( + Agent0, Progress + ), + SharedSubS1 = SharedSubS0#{agent => Agent1}, + {S, SharedSubS1}. + +-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) -> + {emqx_persistent_session_ds_state:t(), t()}. +on_info(S, #{agent := Agent0} = SharedSubS0, ?agent_message(Info)) -> + Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info), + SharedSubS1 = SharedSubS0#{agent => Agent1}, + {S, SharedSubS1}; +on_info(S, SharedSubS, _Info) -> + %% TODO + %% Log warning + {S, SharedSubS}. + +-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map(). +to_map(_S, _SharedSubS) -> + %% TODO + #{}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +fold_shared_subs(Fun, Acc, S) -> + emqx_persistent_session_ds_state:fold_subscriptions( + fun + (#share{} = TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0); + (_, _Sub, Acc0) -> Acc0 + end, + Acc, + S + ). + +fold_shared_stream_states(Fun, Acc, S) -> + %% TODO + %% Optimize or cache + TopicFilters = fold_shared_subs( + fun + (#share{} = TopicFilter, #{id := Id} = _Sub, Acc0) -> + Acc0#{Id => TopicFilter}; + (_, _, Acc0) -> + Acc0 + end, + #{}, + S + ), + emqx_persistent_session_ds_state:fold_streams( + fun({SubId, Stream}, SRS, Acc0) -> + case TopicFilters of + #{SubId := TopicFilter} -> + Fun(TopicFilter, Stream, SRS, Acc0); + _ -> + Acc0 + end + end, + Acc, + S + ). + +on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Session) -> + #{max_subscriptions := MaxSubscriptions} = Props, + case emqx_persistent_session_ds_state:n_subscriptions(S) < MaxSubscriptions of + true -> + create_new_subscription(TopicFilter, SubOpts, Session); + false -> + {error, ?RC_QUOTA_EXCEEDED} + end; +on_subscribe(Subscription, TopicFilter, SubOpts, Session) -> + update_subscription(Subscription, TopicFilter, SubOpts, Session). + +-dialyzer({nowarn_function, create_new_subscription/3}). +create_new_subscription(TopicFilter, SubOpts, #{ + id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props +}) -> + case + emqx_persistent_session_ds_shared_subs_agent:on_subscribe( + Agent0, TopicFilter, SubOpts + ) + of + {ok, Agent1} -> + #{upgrade_qos := UpgradeQoS} = Props, + {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), + {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), + SState = #{ + parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts + }, + S3 = emqx_persistent_session_ds_state:put_subscription_state( + SStateId, SState, S2 + ), + Subscription = #{ + id => SubId, + current_state => SStateId, + start_time => now_ms() + }, + S = emqx_persistent_session_ds_state:put_subscription( + TopicFilter, Subscription, S3 + ), + SharedSubS = SharedSubS0#{agent => Agent1}, + ?tp(persistent_session_ds_shared_subscription_added, #{ + topic_filter => TopicFilter, session => SessionId + }), + {ok, S, SharedSubS}; + {error, _} = Error -> + Error + end. + +update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilter, SubOpts, #{ + s := S0, shared_sub_s := SharedSubS, props := Props +}) -> + #{upgrade_qos := UpgradeQoS} = Props, + SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, + case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of + SState -> + %% Client resubscribed with the same parameters: + {ok, S0, SharedSubS}; + _ -> + %% Subsription parameters changed: + {SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0), + S2 = emqx_persistent_session_ds_state:put_subscription_state( + SStateId, SState, S1 + ), + Sub = Sub0#{current_state => SStateId}, + S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2), + {ok, S, SharedSubS} + end. + +lookup(TopicFilter, S) -> + case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of + Sub = #{current_state := SStateId} -> + case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of + #{subopts := SubOpts} -> + Sub#{subopts => SubOpts}; + undefined -> + undefined + end; + undefined -> + undefined + end. + +accept_stream( + #{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0 +) -> + case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of + undefined -> + %% This should not happen. + %% Agent should have received unsubscribe callback + %% and should not have passed this stream as a new one + error(new_stream_without_sub); + #{id := SubId, current_state := SStateId} -> + Key = {SubId, Stream}, + case emqx_persistent_session_ds_state:get_stream(Key, S0) of + undefined -> + NewSRS = + #srs{ + rank_x = ?rank_x, + rank_y = ?rank_y, + it_begin = Iterator, + it_end = Iterator, + sub_state_id = SStateId + }, + S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0), + S1; + _SRS -> + S0 + end + end. + +revoke_stream( + #{topic_filter := TopicFilter, stream := Stream}, S0 +) -> + case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of + undefined -> + %% This should not happen. + %% Agent should have received unsubscribe callback + %% and should not have revoked this stream + S0; + #{id := SubId} -> + Key = {SubId, Stream}, + case emqx_persistent_session_ds_state:get_stream(Key, S0) of + undefined -> + S0; + SRS0 -> + SRS1 = SRS0#srs{unsubscribed = true}, + S1 = emqx_persistent_session_ds_state:put_stream(Key, SRS1, S0), + S1 + end + end. + +-spec to_agent_subscription( + emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription() +) -> + emqx_persistent_session_ds_shared_subs_agent:subscription(). +to_agent_subscription(_S, Subscription) -> + %% TODO + %% do we need anything from sub state? + maps:with([start_time], Subscription). + +-spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts(). +agent_opts(#{session_id := SessionId, send_funs := SendFuns}) -> + #{ + session_id => SessionId, + send_funs => agent_send_funs(SendFuns) + }. + +agent_send_funs(#{ + send := Send, + send_after := SendAfter +}) -> + #{ + send => fun(Pid, Msg) -> send_from_agent(Send, Pid, Msg) end, + send_after => fun(Time, Pid, Msg) -> + send_after_from_agent(SendAfter, Time, Pid, Msg) + end + }. + +send_from_agent(Send, Dest, Msg) -> + case Dest =:= self() of + true -> + Send(Dest, ?agent_message(Msg)), + Msg; + false -> + Send(Dest, Msg) + end. + +send_after_from_agent(SendAfter, Time, Dest, Msg) -> + case Dest =:= self() of + true -> + SendAfter(Time, Dest, ?agent_message(Msg)); + false -> + SendAfter(Time, Dest, Msg) + end. + +-dialyzer({nowarn_function, now_ms/0}). +now_ms() -> + erlang:system_time(millisecond). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl new file mode 100644 index 000000000..31a80838f --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl @@ -0,0 +1,112 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_ds_shared_subs_agent). + +-include("shared_subs_agent.hrl"). + +-type session_id() :: emqx_persistent_session_ds:id(). + +-type subscription() :: #{ + start_time := emqx_ds:time() +}. + +-type t() :: term(). +-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). + +-type opts() :: #{ + session_id := session_id(), + send_funs := #{ + send := fun((pid(), term()) -> term()), + send_after := fun((non_neg_integer(), pid(), term()) -> reference()) + } +}. + +%% TODO +%% This records goe through network, we better shrink them +%% * use integer keys +%% * somehow avoid passing stream and topic_filter — they both are part of the iterator +-type stream_lease() :: #{ + %% Used as "external" subscription_id + topic_filter := topic_filter(), + stream := emqx_ds:stream(), + iterator := emqx_ds:iterator() +}. + +-type stream_revoke() :: #{ + topic_filter := topic_filter(), + stream := emqx_ds:stream() +}. + +-type stream_progress() :: #{ + topic_filter := topic_filter(), + stream := emqx_ds:stream(), + iterator := emqx_ds:iterator() +}. + +-export_type([ + t/0, + subscription/0, + session_id/0, + stream_lease/0, + opts/0 +]). + +-export([ + new/1, + open/2, + + on_subscribe/3, + on_unsubscribe/2, + on_stream_progress/2, + on_info/2, + + renew_streams/1 +]). + +%%-------------------------------------------------------------------- +%% Behaviour +%%-------------------------------------------------------------------- + +-callback new(opts()) -> t(). +-callback open([{topic_filter(), subscription()}], opts()) -> t(). +-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> + {ok, t()} | {error, term()}. +-callback on_unsubscribe(t(), topic_filter()) -> t(). +-callback renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}. +-callback on_stream_progress(t(), [stream_progress()]) -> t(). +-callback on_info(t(), term()) -> t(). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec new(opts()) -> t(). +new(Opts) -> + ?shared_subs_agent:new(Opts). + +-spec open([{topic_filter(), subscription()}], opts()) -> t(). +open(Topics, Opts) -> + ?shared_subs_agent:open(Topics, Opts). + +-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> + {ok, t()} | {error, emqx_types:reason_code()}. +on_subscribe(Agent, TopicFilter, SubOpts) -> + ?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts). + +-spec on_unsubscribe(t(), topic_filter()) -> t(). +on_unsubscribe(Agent, TopicFilter) -> + ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter). + +-spec renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}. +renew_streams(Agent) -> + ?shared_subs_agent:renew_streams(Agent). + +-spec on_stream_progress(t(), [stream_progress()]) -> t(). +on_stream_progress(Agent, StreamProgress) -> + ?shared_subs_agent:on_stream_progress(Agent, StreamProgress). + +-spec on_info(t(), term()) -> t(). +on_info(Agent, Info) -> + ?shared_subs_agent:on_info(Agent, Info). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl new file mode 100644 index 000000000..4896fb01e --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl @@ -0,0 +1,46 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_ds_shared_subs_null_agent). + +-include("emqx_mqtt.hrl"). + +-export([ + new/1, + open/2, + + on_subscribe/3, + on_unsubscribe/2, + on_stream_progress/2, + on_info/2, + + renew_streams/1 +]). + +-behaviour(emqx_persistent_session_ds_shared_subs_agent). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +new(_Opts) -> + undefined. + +open(_Topics, _Opts) -> + undefined. + +on_subscribe(_Agent, _TopicFilter, _SubOpts) -> + {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}. + +on_unsubscribe(Agent, _TopicFilter) -> + Agent. + +renew_streams(Agent) -> + {[], [], Agent}. + +on_stream_progress(Agent, _StreamProgress) -> + Agent. + +on_info(Agent, _Info) -> + Agent. diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl index 7bf80cc2b..15d43c7cd 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl @@ -127,7 +127,12 @@ renew_streams(S0) -> S1 = remove_unsubscribed_streams(S0), S2 = remove_fully_replayed_streams(S1), S3 = update_stream_subscription_state_ids(S2), - emqx_persistent_session_ds_subs:fold( + %% For shared subscriptions, the streams are populated by + %% `emqx_persistent_session_ds_shared_subs`. + %% TODO + %% Move discovery of proper streams + %% out of the scheduler for complete symmetry? + fold_proper_subscriptions( fun (Key, #{start_time := StartTime, id := SubId, current_state := SStateId}, Acc) -> TopicFilter = emqx_topic:words(Key), @@ -206,9 +211,6 @@ ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream Key = {SubId, Stream}, case emqx_persistent_session_ds_state:get_stream(Key, S) of undefined -> - ?SLOG(debug, #{ - msg => new_stream, key => Key, stream => Stream - }), case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of {ok, Iterator} -> NewStreamState = #srs{ @@ -420,3 +422,13 @@ shuffle(L0) -> L2 = lists:sort(L1), {_, L} = lists:unzip(L2), L. + +fold_proper_subscriptions(Fun, Acc, S) -> + emqx_persistent_session_ds_state:fold_subscriptions( + fun + (#share{}, _Sub, Acc0) -> Acc0; + (TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0) + end, + Acc, + S + ). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl index 3fa65eed5..e8422674b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl @@ -30,8 +30,7 @@ on_session_drop/2, gc/1, lookup/2, - to_map/1, - fold/3 + to_map/1 ]). %% Management API: @@ -160,7 +159,7 @@ on_unsubscribe(SessionId, TopicFilter, S0) -> -spec on_session_drop(emqx_persistent_session_ds:id(), emqx_persistent_session_ds_state:t()) -> ok. on_session_drop(SessionId, S0) -> - fold( + _ = fold_proper_subscriptions( fun(TopicFilter, _Subscription, S) -> case on_unsubscribe(SessionId, TopicFilter, S) of {ok, S1, _} -> S1; @@ -169,10 +168,14 @@ on_session_drop(SessionId, S0) -> end, S0, S0 - ). + ), + ok. %% @doc Remove subscription states that don't have a parent, and that -%% don't have any unacked messages: +%% don't have any unacked messages. +%% TODO +%% This function collects shared subs as well +%% Move to a separate module to keep symmetry? -spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). gc(S0) -> %% Create a set of subscription states IDs referenced either by a @@ -210,7 +213,7 @@ gc(S0) -> S0 ). -%% @doc Fold over active subscriptions: +%% @doc Lookup a subscription and merge it with its current state: -spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds:subscription() | undefined. lookup(TopicFilter, S) -> @@ -230,22 +233,12 @@ lookup(TopicFilter, S) -> %% purpose: -spec to_map(emqx_persistent_session_ds_state:t()) -> map(). to_map(S) -> - fold( + fold_proper_subscriptions( fun(TopicFilter, _, Acc) -> Acc#{TopicFilter => lookup(TopicFilter, S)} end, #{}, S ). -%% @doc Fold over active subscriptions: --spec fold( - fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc), - Acc, - emqx_persistent_session_ds_state:t() -) -> - Acc. -fold(Fun, Acc, S) -> - emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S). - -spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) -> emqx_persistent_session_ds:subscription() | undefined. cold_get_subscription(SessionId, Topic) -> @@ -267,5 +260,15 @@ cold_get_subscription(SessionId, Topic) -> %% Internal functions %%================================================================================ +fold_proper_subscriptions(Fun, Acc, S) -> + emqx_persistent_session_ds_state:fold_subscriptions( + fun + (#share{}, _Sub, Acc0) -> Acc0; + (TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0) + end, + Acc, + S + ). + now_ms() -> erlang:system_time(millisecond). diff --git a/apps/emqx/src/emqx_persistent_session_ds/session_internals.hrl b/apps/emqx/src/emqx_persistent_session_ds/session_internals.hrl index e18e2cb7f..f23cb030b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/session_internals.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds/session_internals.hrl @@ -71,4 +71,11 @@ sub_state_id :: emqx_persistent_session_ds_subs:subscription_state_id() }). +%% (Erlang) messages that session should forward to the +%% shared subscription handler. +-record(shared_sub_message, { + message :: term() +}). +-define(shared_sub_message(MSG), #shared_sub_message{message = MSG}). + -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl b/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl new file mode 100644 index 000000000..6ec42d36a --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(SHARED_SUBS_AGENT_HRL). +-define(SHARED_SUBS_AGENT_HRL, true). + +-ifdef(EMQX_RELEASE_EDITION). + +-if(?EMQX_RELEASE_EDITION == ee). + +%% agent from BSL app +% -define(shared_subs_agent, emqx_ds_shared_sub_agent). +%% Till full implementation we need to dispach to the null agent. +%% It will report "not implemented" error for attempts to use shared subscriptions. +-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). + +%% -if(?EMQX_RELEASE_EDITION == ee). +-else. + +-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). + +%% -if(?EMQX_RELEASE_EDITION == ee). +-endif. + +%% -ifdef(EMQX_RELEASE_EDITION). +-else. + +-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). + +%% -ifdef(EMQX_RELEASE_EDITION). +-endif. + +-endif. diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 2cdd968f9..8735dc3c3 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -83,6 +83,7 @@ -export([ deliver/3, + handle_info/2, handle_timeout/3, disconnect/3, terminate/3 @@ -188,6 +189,10 @@ -callback destroy(t() | clientinfo()) -> ok. -callback clear_will_message(t()) -> t(). -callback publish_will_message_now(t(), message()) -> t(). +-callback handle_timeout(clientinfo(), common_timer_name() | custom_timer_name(), t()) -> + {ok, replies(), t()} + | {ok, replies(), timeout(), t()}. +-callback handle_info(term(), t()) -> t(). %%-------------------------------------------------------------------- %% Create a Session @@ -484,6 +489,14 @@ enrich_subopts(_Opt, _V, Msg, _) -> handle_timeout(ClientInfo, Timer, Session) -> ?IMPL(Session):handle_timeout(ClientInfo, Timer, Session). +%%-------------------------------------------------------------------- +%% Generic Messages +%%-------------------------------------------------------------------- + +-spec handle_info(term(), t()) -> t(). +handle_info(Info, Session) -> + ?IMPL(Session):handle_info(Info, Session). + %%-------------------------------------------------------------------- -spec ensure_timer(custom_timer_name(), timeout(), map()) -> diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index c3cc17926..f5568c146 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -87,6 +87,7 @@ deliver/3, replay/3, handle_timeout/3, + handle_info/2, disconnect/2, terminate/2 ]). @@ -597,6 +598,15 @@ handle_timeout(ClientInfo, retry_delivery, Session) -> handle_timeout(ClientInfo, expire_awaiting_rel, Session) -> expire(ClientInfo, Session). +%%-------------------------------------------------------------------- +%% Geneic messages +%%-------------------------------------------------------------------- + +-spec handle_info(term(), session()) -> session(). +handle_info(Msg, Session) -> + ?SLOG(warning, #{msg => emqx_session_mem_unknown_message, message => Msg}), + Session. + %%-------------------------------------------------------------------- %% Retry Delivery %%-------------------------------------------------------------------- diff --git a/apps/emqx_ds_shared_sub/BSL.txt b/apps/emqx_ds_shared_sub/BSL.txt new file mode 100644 index 000000000..35a9a5207 --- /dev/null +++ b/apps/emqx_ds_shared_sub/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2024 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2028-05-30 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_ds_shared_sub/README.md b/apps/emqx_ds_shared_sub/README.md new file mode 100644 index 000000000..41a5ab407 --- /dev/null +++ b/apps/emqx_ds_shared_sub/README.md @@ -0,0 +1,9 @@ +# EMQX Durable Shared Subscriptions + +# Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + +# License + +EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt). diff --git a/apps/emqx_ds_shared_sub/rebar.config b/apps/emqx_ds_shared_sub/rebar.config new file mode 100644 index 000000000..3471031cd --- /dev/null +++ b/apps/emqx_ds_shared_sub/rebar.config @@ -0,0 +1,6 @@ +%% -*- mode: erlang; -*- + +{erl_opts, [debug_info]}. +{deps, [ + {emqx, {path, "../../apps/emqx"}} +]}. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.app.src b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.app.src new file mode 100644 index 000000000..f9698ad89 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub.app.src @@ -0,0 +1,15 @@ +{application, emqx_ds_shared_sub, [ + {description, "EMQX DS Shared Subscriptions"}, + {vsn, "0.1.0"}, + {registered, [emqx_ds_shared_sub_sup]}, + {mod, {emqx_ds_shared_sub_app, []}}, + {applications, [ + kernel, + stdlib, + emqx + ]}, + {env, []}, + {modules, []}, + + {links, []} +]}. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl new file mode 100644 index 000000000..425d5df3b --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -0,0 +1,156 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_agent). + +-include_lib("emqx/include/emqx_persistent_message.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-export([ + new/1, + open/2, + + on_subscribe/3, + on_unsubscribe/2, + on_stream_progress/2, + on_info/2, + + renew_streams/1 +]). + +-behaviour(emqx_persistent_session_ds_shared_subs_agent). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +new(Opts) -> + init_state(Opts). + +open(TopicSubscriptions, Opts) -> + State0 = init_state(Opts), + State1 = lists:foldl( + fun({ShareTopicFilter, #{start_time := StartTime}}, State) -> + add_subscription(State, ShareTopicFilter, StartTime) + end, + State0, + TopicSubscriptions + ), + State1. + +on_subscribe(State0, TopicFilter, _SubOpts) -> + StartTime = now_ms(), + State1 = add_subscription(State0, TopicFilter, StartTime), + {ok, State1}. + +on_unsubscribe(State, TopicFilter) -> + delete_subscription(State, TopicFilter). + +renew_streams(State0) -> + State1 = do_renew_streams(State0), + {State2, StreamLeases} = stream_leases(State1), + {StreamLeases, [], State2}. + +on_stream_progress(State, _StreamProgress) -> + State. + +on_info(State, _Info) -> + State. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +init_state(Opts) -> + SessionId = maps:get(session_id, Opts), + SendFuns = maps:get(send_funs, Opts), + Send = maps:get(send, SendFuns), + SendAfter = maps:get(send_after, SendFuns), + #{ + session_id => SessionId, + send => Send, + end_after => SendAfter, + subscriptions => #{} + }. + +% send(State, Pid, Msg) -> +% Send = maps:get(send, State), +% Send(Pid, Msg). + +% send_after(State, Time, Pid, Msg) -> +% SendAfter = maps:get(send_after, State), +% SendAfter(Time, Pid, Msg). + +do_renew_streams(#{subscriptions := Subs0} = State0) -> + Subs1 = maps:map( + fun( + ShareTopicFilter, + #{start_time := StartTime, streams := Streams0, stream_leases := StreamLeases} = Sub + ) -> + #share{topic = TopicFilterRaw} = ShareTopicFilter, + TopicFilter = emqx_topic:words(TopicFilterRaw), + {_, NewStreams} = lists:unzip( + emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime) + ), + {Streams1, NewLeases} = lists:foldl( + fun(Stream, {StreamsAcc, LeasesAcc}) -> + case StreamsAcc of + #{Stream := _} -> + {StreamsAcc, LeasesAcc}; + _ -> + {ok, It} = emqx_ds:make_iterator( + ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime + ), + StreamLease = #{ + topic_filter => ShareTopicFilter, + stream => Stream, + iterator => It + }, + {StreamsAcc#{Stream => It}, [StreamLease | LeasesAcc]} + end + end, + {Streams0, []}, + NewStreams + ), + Sub#{streams => Streams1, stream_leases => StreamLeases ++ NewLeases} + end, + Subs0 + ), + State0#{subscriptions => Subs1}. + +delete_subscription(#{session_id := SessionId, subscriptions := Subs0} = State0, ShareTopicFilter) -> + #share{topic = TopicFilter} = ShareTopicFilter, + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId), + Subs1 = maps:remove(ShareTopicFilter, Subs0), + State0#{subscriptions => Subs1}. + +stream_leases(#{subscriptions := Subs0} = State0) -> + {Subs1, StreamLeases} = lists:foldl( + fun({TopicFilter, #{stream_leases := Leases} = Sub}, {SubsAcc, LeasesAcc}) -> + {SubsAcc#{TopicFilter => Sub#{stream_leases => []}}, [Leases | LeasesAcc]} + end, + {Subs0, []}, + maps:to_list(Subs0) + ), + State1 = State0#{subscriptions => Subs1}, + {State1, lists:concat(StreamLeases)}. + +now_ms() -> + erlang:system_time(millisecond). + +add_subscription( + #{subscriptions := Subs0, session_id := SessionId} = State0, ShareTopicFilter, StartTime +) -> + #share{topic = TopicFilter} = ShareTopicFilter, + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId), + Subs1 = Subs0#{ + ShareTopicFilter => #{ + start_time => StartTime, + streams => #{}, + stream_leases => [] + } + }, + State1 = State0#{subscriptions => Subs1}, + State1. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_app.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_app.erl new file mode 100644 index 000000000..5c2d8d964 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_app.erl @@ -0,0 +1,23 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_app). + +-behaviour(application). + +%% application behaviour callbacks +-export([start/2, stop/1]). + +%%------------------------------------------------------------------------------ +%% application behaviour callbacks +%%------------------------------------------------------------------------------ + +-spec start(application:start_type(), term()) -> {ok, pid()}. +start(_Type, _Args) -> + {ok, Sup} = emqx_ds_shared_sub_sup:start_link(), + {ok, Sup}. + +-spec stop(term()) -> ok. +stop(_State) -> + ok. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl new file mode 100644 index 000000000..dd6100f49 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_sup.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% supervisor behaviour callbacks +-export([init/1]). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%%------------------------------------------------------------------------------ +%% supervisor behaviour callbacks +%%------------------------------------------------------------------------------ + +init([]) -> + SupFlags = #{ + strategy => one_for_one, + intensity => 10, + period => 10 + }, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index f59e21ccd..54021a3e5 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -130,7 +130,8 @@ emqx_gateway_ocpp, emqx_gateway_jt808, emqx_bridge_syskeeper, - emqx_bridge_confluent + emqx_bridge_confluent, + emqx_ds_shared_sub ], %% must always be of type `load' ce_business_apps => diff --git a/mix.exs b/mix.exs index 4a5ec3b1a..b3ecc6252 100644 --- a/mix.exs +++ b/mix.exs @@ -202,7 +202,8 @@ defmodule EMQXUmbrella.MixProject do :emqx_gateway_gbt32960, :emqx_gateway_ocpp, :emqx_gateway_jt808, - :emqx_bridge_syskeeper + :emqx_bridge_syskeeper, + :emqx_ds_shared_sub ]) end diff --git a/rebar.config.erl b/rebar.config.erl index 3fda433c5..34829f9d1 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -120,6 +120,7 @@ is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false; is_community_umbrella_app("apps/emqx_schema_validation") -> false; is_community_umbrella_app("apps/emqx_eviction_agent") -> false; is_community_umbrella_app("apps/emqx_node_rebalance") -> false; +is_community_umbrella_app("apps/emqx_ds_shared_sub") -> false; is_community_umbrella_app(_) -> true. %% BUILD_WITHOUT_JQ