diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 3897028cd..cc599eb06 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -367,7 +367,7 @@ print_session(ClientId) -> %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE %%-------------------------------------------------------------------- -%% Suppress warnings about clauses handling unimplemented reuslts +%% Suppress warnings about clauses handling unimplemented results %% of `emqx_persistent_session_ds_shared_subs:on_subscribe/3` -dialyzer({nowarn_function, subscribe/3}). -spec subscribe(topic_filter(), emqx_types:subopts(), session()) -> @@ -378,9 +378,9 @@ subscribe( Session ) -> case emqx_persistent_session_ds_shared_subs:on_subscribe(TopicFilter, SubOpts, Session) of - {ok, S1} -> - S = emqx_persistent_session_ds_state:commit(S1), - {ok, Session#{s => S}}; + {ok, S0, SharedSubS} -> + S = emqx_persistent_session_ds_state:commit(S0), + {ok, Session#{s => S, shared_sub_s => SharedSubS}}; Error = {error, _} -> Error end; @@ -397,20 +397,24 @@ subscribe( Error end. -%% Suppress warnings about clauses handling unimplemented reuslts -%% of `emqx_persistent_session_ds_shared_subs:on_subscribe/4` +%% Suppress warnings about clauses handling unimplemented results +%% of `emqx_persistent_session_ds_shared_subs:on_unsubscribe/4` -dialyzer({nowarn_function, unsubscribe/2}). -spec unsubscribe(topic_filter(), session()) -> {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}. unsubscribe( #share{} = TopicFilter, - Session = #{id := SessionId, s := S0} + Session = #{id := SessionId, s := S0, shared_sub_s := SharedSubS0} ) -> - case emqx_persistent_session_ds_shared_subs:on_unsubscribe(SessionId, TopicFilter, S0) of - {ok, S1, #{id := SubId, subopts := SubOpts}} -> + case + emqx_persistent_session_ds_shared_subs:on_unsubscribe( + SessionId, TopicFilter, S0, SharedSubS0 + ) + of + {ok, S1, SharedSubS1, #{id := SubId, subopts := SubOpts}} -> S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1), S = emqx_persistent_session_ds_state:commit(S2), - {ok, Session#{s => S}, SubOpts}; + {ok, Session#{s => S, shared_sub_s => SharedSubS1}, SubOpts}; Error = {error, _} -> Error end; diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 36d961dc2..c355af0a6 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -4,7 +4,8 @@ -module(emqx_persistent_session_ds_shared_subs). --include_lib("emqx_mqtt.hrl"). +-include("emqx_mqtt.hrl"). +-include("logger.hrl"). -include("session_internals.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). @@ -46,7 +47,7 @@ %% API %%-------------------------------------------------------------------- --spec new(emqx_persistent_session_ds:shared_sub_opts()) -> t(). +-spec new(opts()) -> t(). new(Opts) -> #{ agent => emqx_persistent_session_ds_shared_subs_agent:new( @@ -54,7 +55,7 @@ new(Opts) -> ) }. --spec open(emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:shared_sub_opts()) -> +-spec open(emqx_persistent_session_ds_state:t(), opts()) -> {ok, emqx_persistent_session_ds_state:t(), t()}. open(S, Opts) -> SharedSubscriptions = fold_shared_subs( @@ -71,10 +72,9 @@ open(S, Opts) -> {ok, S, SharedSubS}. -spec on_subscribe( - emqx_persistent_session_ds_state:t(), - t(), share_topic_filter(), - emqx_types:subopts() + emqx_types:subopts(), + emqx_persistent_session_ds:session() ) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}. on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) -> Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S), @@ -110,6 +110,10 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> {NewLeasedStreams, RevokedStreams, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams( Agent0 ), + NewLeasedStreams =/= [] andalso + ?SLOG( + info, #{msg => shared_subs_new_stream_leases, stream_leases => NewLeasedStreams} + ), S1 = lists:foldl(fun accept_stream/2, S0, NewLeasedStreams), S2 = lists:foldl(fun revoke_stream/2, S1, RevokedStreams), SharedSubS1 = SharedSubS0#{agent => Agent1}, @@ -118,7 +122,7 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> -spec on_streams_replayed( emqx_persistent_session_ds_state:t(), t() -) -> t(). +) -> {emqx_persistent_session_ds_state:t(), t()}. on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) -> %% TODO %% Is it sufficient for a report? @@ -208,6 +212,7 @@ on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Sessio on_subscribe(Subscription, TopicFilter, SubOpts, Session) -> update_subscription(Subscription, TopicFilter, SubOpts, Session). +-dialyzer({nowarn_function, create_new_subscription/3}). create_new_subscription(TopicFilter, SubOpts, #{ id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props }) -> @@ -286,17 +291,22 @@ accept_stream( %% and should not have passed this stream as a new one error(new_stream_without_sub); #{id := SubId, current_state := SStateId} -> - NewSRS = - #srs{ - rank_x = ?rank_x, - rank_y = ?rank_y, - it_begin = Iterator, - it_end = Iterator, - sub_state_id = SStateId - }, Key = {SubId, Stream}, - S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0), - S1 + case emqx_persistent_session_ds_state:get_stream(Key, S0) of + undefined -> + NewSRS = + #srs{ + rank_x = ?rank_x, + rank_y = ?rank_y, + it_begin = Iterator, + it_end = Iterator, + sub_state_id = SStateId + }, + S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0), + S1; + _SRS -> + S0 + end end. revoke_stream( @@ -364,5 +374,6 @@ send_after_from_agent(SendAfter, Time, Dest, Msg) -> SendAfter(Time, Dest, Msg) end. +-dialyzer({nowarn_function, now_ms/0}). now_ms() -> erlang:system_time(millisecond). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl index 259066b20..31a80838f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl @@ -4,6 +4,8 @@ -module(emqx_persistent_session_ds_shared_subs_agent). +-include("shared_subs_agent.hrl"). + -type session_id() :: emqx_persistent_session_ds:id(). -type subscription() :: #{ @@ -57,48 +59,54 @@ on_subscribe/3, on_unsubscribe/2, - on_session_drop/1, on_stream_progress/2, on_info/2, renew_streams/1 ]). +%%-------------------------------------------------------------------- +%% Behaviour +%%-------------------------------------------------------------------- + +-callback new(opts()) -> t(). +-callback open([{topic_filter(), subscription()}], opts()) -> t(). +-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> + {ok, t()} | {error, term()}. +-callback on_unsubscribe(t(), topic_filter()) -> t(). +-callback renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}. +-callback on_stream_progress(t(), [stream_progress()]) -> t(). +-callback on_info(t(), term()) -> t(). %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- -spec new(opts()) -> t(). -new(_Opts) -> - undefined. +new(Opts) -> + ?shared_subs_agent:new(Opts). -spec open([{topic_filter(), subscription()}], opts()) -> t(). -open(_Topics, _Opts) -> - undefined. +open(Topics, Opts) -> + ?shared_subs_agent:open(Topics, Opts). -spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> - {ok, t()} | {error, term()}. -on_subscribe(Agent, _TopicFilter, _SubOpts) -> - % {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED} - {ok, Agent}. + {ok, t()} | {error, emqx_types:reason_code()}. +on_subscribe(Agent, TopicFilter, SubOpts) -> + ?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts). -spec on_unsubscribe(t(), topic_filter()) -> t(). -on_unsubscribe(Agent, _TopicFilter) -> - Agent. - --spec on_session_drop(t()) -> t(). -on_session_drop(Agent) -> - Agent. +on_unsubscribe(Agent, TopicFilter) -> + ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter). -spec renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}. renew_streams(Agent) -> - {[], [], Agent}. + ?shared_subs_agent:renew_streams(Agent). -spec on_stream_progress(t(), [stream_progress()]) -> t(). -on_stream_progress(Agent, _StreamProgress) -> - Agent. +on_stream_progress(Agent, StreamProgress) -> + ?shared_subs_agent:on_stream_progress(Agent, StreamProgress). -spec on_info(t(), term()) -> t(). -on_info(Agent, _Info) -> - Agent. +on_info(Agent, Info) -> + ?shared_subs_agent:on_info(Agent, Info). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl new file mode 100644 index 000000000..4896fb01e --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl @@ -0,0 +1,46 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_ds_shared_subs_null_agent). + +-include("emqx_mqtt.hrl"). + +-export([ + new/1, + open/2, + + on_subscribe/3, + on_unsubscribe/2, + on_stream_progress/2, + on_info/2, + + renew_streams/1 +]). + +-behaviour(emqx_persistent_session_ds_shared_subs_agent). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +new(_Opts) -> + undefined. + +open(_Topics, _Opts) -> + undefined. + +on_subscribe(_Agent, _TopicFilter, _SubOpts) -> + {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}. + +on_unsubscribe(Agent, _TopicFilter) -> + Agent. + +renew_streams(Agent) -> + {[], [], Agent}. + +on_stream_progress(Agent, _StreamProgress) -> + Agent. + +on_info(Agent, _Info) -> + Agent. diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl index 03e343864..15d43c7cd 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_stream_scheduler.erl @@ -16,7 +16,7 @@ -module(emqx_persistent_session_ds_stream_scheduler). %% API: --export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2, shuffle/1]). +-export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2]). -export([renew_streams/1, on_unsubscribe/2]). %% behavior callbacks: @@ -87,20 +87,22 @@ find_new_streams(S) -> %% after timeout?) Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), - emqx_persistent_session_ds_state:fold_streams( - fun - (_Key, #srs{it_end = end_of_stream}, Acc) -> - Acc; - (Key, Stream, Acc) -> - case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of - true -> - [{Key, Stream} | Acc]; - false -> - Acc - end - end, - [], - S + shuffle( + emqx_persistent_session_ds_state:fold_streams( + fun + (_Key, #srs{it_end = end_of_stream}, Acc) -> + Acc; + (Key, Stream, Acc) -> + case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of + true -> + [{Key, Stream} | Acc]; + false -> + Acc + end + end, + [], + S + ) ). %% @doc This function makes the session aware of the new streams. @@ -201,19 +203,6 @@ is_fully_acked(Srs, S) -> CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), is_fully_acked(CommQos1, CommQos2, Srs). --spec shuffle([A]) -> [A]. -shuffle(L0) -> - L1 = lists:map( - fun(A) -> - %% maybe topic/stream prioritization could be introduced here? - {rand:uniform(), A} - end, - L0 - ), - L2 = lists:sort(L1), - {_, L} = lists:unzip(L2), - L. - %%================================================================================ %% Internal functions %%================================================================================ @@ -222,9 +211,6 @@ ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream Key = {SubId, Stream}, case emqx_persistent_session_ds_state:get_stream(Key, S) of undefined -> - ?SLOG(debug, #{ - msg => new_stream, key => Key, stream => Stream - }), case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of {ok, Iterator} -> NewStreamState = #srs{ @@ -424,6 +410,19 @@ is_fully_acked(_, _, #srs{ is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) -> (Comm1 >= S1) andalso (Comm2 >= S2). +-spec shuffle([A]) -> [A]. +shuffle(L0) -> + L1 = lists:map( + fun(A) -> + %% maybe topic/stream prioritization could be introduced here? + {rand:uniform(), A} + end, + L0 + ), + L2 = lists:sort(L1), + {_, L} = lists:unzip(L2), + L. + fold_proper_subscriptions(Fun, Acc, S) -> emqx_persistent_session_ds_state:fold_subscriptions( fun diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl index 8db3b240b..e8422674b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl @@ -172,7 +172,10 @@ on_session_drop(SessionId, S0) -> ok. %% @doc Remove subscription states that don't have a parent, and that -%% don't have any unacked messages: +%% don't have any unacked messages. +%% TODO +%% This function collects shared subs as well +%% Move to a separate module to keep symmetry? -spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). gc(S0) -> %% Create a set of subscription states IDs referenced either by a diff --git a/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl b/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl new file mode 100644 index 000000000..6ec42d36a --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-ifndef(SHARED_SUBS_AGENT_HRL). +-define(SHARED_SUBS_AGENT_HRL, true). + +-ifdef(EMQX_RELEASE_EDITION). + +-if(?EMQX_RELEASE_EDITION == ee). + +%% agent from BSL app +% -define(shared_subs_agent, emqx_ds_shared_sub_agent). +%% Till full implementation we need to dispach to the null agent. +%% It will report "not implemented" error for attempts to use shared subscriptions. +-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). + +%% -if(?EMQX_RELEASE_EDITION == ee). +-else. + +-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). + +%% -if(?EMQX_RELEASE_EDITION == ee). +-endif. + +%% -ifdef(EMQX_RELEASE_EDITION). +-else. + +-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). + +%% -ifdef(EMQX_RELEASE_EDITION). +-endif. + +-endif. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl new file mode 100644 index 000000000..425d5df3b --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -0,0 +1,156 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_agent). + +-include_lib("emqx/include/emqx_persistent_message.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-export([ + new/1, + open/2, + + on_subscribe/3, + on_unsubscribe/2, + on_stream_progress/2, + on_info/2, + + renew_streams/1 +]). + +-behaviour(emqx_persistent_session_ds_shared_subs_agent). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +new(Opts) -> + init_state(Opts). + +open(TopicSubscriptions, Opts) -> + State0 = init_state(Opts), + State1 = lists:foldl( + fun({ShareTopicFilter, #{start_time := StartTime}}, State) -> + add_subscription(State, ShareTopicFilter, StartTime) + end, + State0, + TopicSubscriptions + ), + State1. + +on_subscribe(State0, TopicFilter, _SubOpts) -> + StartTime = now_ms(), + State1 = add_subscription(State0, TopicFilter, StartTime), + {ok, State1}. + +on_unsubscribe(State, TopicFilter) -> + delete_subscription(State, TopicFilter). + +renew_streams(State0) -> + State1 = do_renew_streams(State0), + {State2, StreamLeases} = stream_leases(State1), + {StreamLeases, [], State2}. + +on_stream_progress(State, _StreamProgress) -> + State. + +on_info(State, _Info) -> + State. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +init_state(Opts) -> + SessionId = maps:get(session_id, Opts), + SendFuns = maps:get(send_funs, Opts), + Send = maps:get(send, SendFuns), + SendAfter = maps:get(send_after, SendFuns), + #{ + session_id => SessionId, + send => Send, + end_after => SendAfter, + subscriptions => #{} + }. + +% send(State, Pid, Msg) -> +% Send = maps:get(send, State), +% Send(Pid, Msg). + +% send_after(State, Time, Pid, Msg) -> +% SendAfter = maps:get(send_after, State), +% SendAfter(Time, Pid, Msg). + +do_renew_streams(#{subscriptions := Subs0} = State0) -> + Subs1 = maps:map( + fun( + ShareTopicFilter, + #{start_time := StartTime, streams := Streams0, stream_leases := StreamLeases} = Sub + ) -> + #share{topic = TopicFilterRaw} = ShareTopicFilter, + TopicFilter = emqx_topic:words(TopicFilterRaw), + {_, NewStreams} = lists:unzip( + emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime) + ), + {Streams1, NewLeases} = lists:foldl( + fun(Stream, {StreamsAcc, LeasesAcc}) -> + case StreamsAcc of + #{Stream := _} -> + {StreamsAcc, LeasesAcc}; + _ -> + {ok, It} = emqx_ds:make_iterator( + ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime + ), + StreamLease = #{ + topic_filter => ShareTopicFilter, + stream => Stream, + iterator => It + }, + {StreamsAcc#{Stream => It}, [StreamLease | LeasesAcc]} + end + end, + {Streams0, []}, + NewStreams + ), + Sub#{streams => Streams1, stream_leases => StreamLeases ++ NewLeases} + end, + Subs0 + ), + State0#{subscriptions => Subs1}. + +delete_subscription(#{session_id := SessionId, subscriptions := Subs0} = State0, ShareTopicFilter) -> + #share{topic = TopicFilter} = ShareTopicFilter, + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId), + Subs1 = maps:remove(ShareTopicFilter, Subs0), + State0#{subscriptions => Subs1}. + +stream_leases(#{subscriptions := Subs0} = State0) -> + {Subs1, StreamLeases} = lists:foldl( + fun({TopicFilter, #{stream_leases := Leases} = Sub}, {SubsAcc, LeasesAcc}) -> + {SubsAcc#{TopicFilter => Sub#{stream_leases => []}}, [Leases | LeasesAcc]} + end, + {Subs0, []}, + maps:to_list(Subs0) + ), + State1 = State0#{subscriptions => Subs1}, + {State1, lists:concat(StreamLeases)}. + +now_ms() -> + erlang:system_time(millisecond). + +add_subscription( + #{subscriptions := Subs0, session_id := SessionId} = State0, ShareTopicFilter, StartTime +) -> + #share{topic = TopicFilter} = ShareTopicFilter, + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId), + Subs1 = Subs0#{ + ShareTopicFilter => #{ + start_time => StartTime, + streams => #{}, + stream_leases => [] + } + }, + State1 = State0#{subscriptions => Subs1}, + State1. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_subs.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_subs.erl deleted file mode 100644 index 2728ef3bc..000000000 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_subs.erl +++ /dev/null @@ -1,5 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_ds_shared_subs).