Merge pull request #13163 from savonarola/0523-ds-shared-subs-dispatch
Inject shared subcription handling into durable session
This commit is contained in:
commit
dc2e6d1695
|
@ -20,4 +20,11 @@
|
||||||
-define(IS_SESSION_IMPL_MEM(S), (is_tuple(S) andalso element(1, S) =:= session)).
|
-define(IS_SESSION_IMPL_MEM(S), (is_tuple(S) andalso element(1, S) =:= session)).
|
||||||
-define(IS_SESSION_IMPL_DS(S), (is_map_key(id, S))).
|
-define(IS_SESSION_IMPL_DS(S), (is_map_key(id, S))).
|
||||||
|
|
||||||
|
%% (Erlang) messages that a connection process should forward to the
|
||||||
|
%% session handler.
|
||||||
|
-record(session_message, {
|
||||||
|
message :: term()
|
||||||
|
}).
|
||||||
|
-define(session_message(MSG), #session_message{message = MSG}).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("emqx_channel.hrl").
|
-include("emqx_channel.hrl").
|
||||||
|
-include("emqx_session.hrl").
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
-include("emqx_access_control.hrl").
|
-include("emqx_access_control.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
@ -1299,6 +1300,9 @@ handle_info({'DOWN', Ref, process, Pid, Reason}, Channel) ->
|
||||||
[] -> {ok, Channel};
|
[] -> {ok, Channel};
|
||||||
Msgs -> {ok, Msgs, Channel}
|
Msgs -> {ok, Msgs, Channel}
|
||||||
end;
|
end;
|
||||||
|
handle_info(?session_message(Message), #channel{session = Session} = Channel) ->
|
||||||
|
NSession = emqx_session:handle_info(Message, Session),
|
||||||
|
{ok, Channel#channel{session = NSession}};
|
||||||
handle_info(Info, Channel) ->
|
handle_info(Info, Channel) ->
|
||||||
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
||||||
{ok, Channel}.
|
{ok, Channel}.
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
|
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
|
-include("emqx_session.hrl").
|
||||||
-include("emqx_persistent_session_ds/session_internals.hrl").
|
-include("emqx_persistent_session_ds/session_internals.hrl").
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
@ -63,6 +64,7 @@
|
||||||
deliver/3,
|
deliver/3,
|
||||||
replay/3,
|
replay/3,
|
||||||
handle_timeout/3,
|
handle_timeout/3,
|
||||||
|
handle_info/2,
|
||||||
disconnect/2,
|
disconnect/2,
|
||||||
terminate/2
|
terminate/2
|
||||||
]).
|
]).
|
||||||
|
@ -106,6 +108,7 @@
|
||||||
seqno/0,
|
seqno/0,
|
||||||
timestamp/0,
|
timestamp/0,
|
||||||
topic_filter/0,
|
topic_filter/0,
|
||||||
|
share_topic_filter/0,
|
||||||
subscription_id/0,
|
subscription_id/0,
|
||||||
subscription/0,
|
subscription/0,
|
||||||
session/0,
|
session/0,
|
||||||
|
@ -117,7 +120,8 @@
|
||||||
%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
|
%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
|
||||||
%% an atom, in theory (?).
|
%% an atom, in theory (?).
|
||||||
-type id() :: binary().
|
-type id() :: binary().
|
||||||
-type topic_filter() :: emqx_types:topic() | #share{}.
|
-type share_topic_filter() :: #share{}.
|
||||||
|
-type topic_filter() :: emqx_types:topic() | share_topic_filter().
|
||||||
|
|
||||||
%% Subscription and subscription states:
|
%% Subscription and subscription states:
|
||||||
%%
|
%%
|
||||||
|
@ -155,6 +159,8 @@
|
||||||
subopts := map()
|
subopts := map()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
-type shared_sub_state() :: term().
|
||||||
|
|
||||||
-define(TIMER_PULL, timer_pull).
|
-define(TIMER_PULL, timer_pull).
|
||||||
-define(TIMER_GET_STREAMS, timer_get_streams).
|
-define(TIMER_GET_STREAMS, timer_get_streams).
|
||||||
-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
|
-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
|
||||||
|
@ -172,6 +178,8 @@
|
||||||
props := map(),
|
props := map(),
|
||||||
%% Persistent state:
|
%% Persistent state:
|
||||||
s := emqx_persistent_session_ds_state:t(),
|
s := emqx_persistent_session_ds_state:t(),
|
||||||
|
%% Shared subscription state:
|
||||||
|
shared_sub_s := shared_sub_state(),
|
||||||
%% Buffer:
|
%% Buffer:
|
||||||
inflight := emqx_persistent_session_ds_inflight:t(),
|
inflight := emqx_persistent_session_ds_inflight:t(),
|
||||||
%% In-progress replay:
|
%% In-progress replay:
|
||||||
|
@ -277,8 +285,11 @@ info(created_at, #{s := S}) ->
|
||||||
emqx_persistent_session_ds_state:get_created_at(S);
|
emqx_persistent_session_ds_state:get_created_at(S);
|
||||||
info(is_persistent, #{}) ->
|
info(is_persistent, #{}) ->
|
||||||
true;
|
true;
|
||||||
info(subscriptions, #{s := S}) ->
|
info(subscriptions, #{s := S, shared_sub_s := SharedSubS}) ->
|
||||||
emqx_persistent_session_ds_subs:to_map(S);
|
maps:merge(
|
||||||
|
emqx_persistent_session_ds_subs:to_map(S),
|
||||||
|
emqx_persistent_session_ds_shared_subs:to_map(S, SharedSubS)
|
||||||
|
);
|
||||||
info(subscriptions_cnt, #{s := S}) ->
|
info(subscriptions_cnt, #{s := S}) ->
|
||||||
emqx_persistent_session_ds_state:n_subscriptions(S);
|
emqx_persistent_session_ds_state:n_subscriptions(S);
|
||||||
info(subscriptions_max, #{props := Conf}) ->
|
info(subscriptions_max, #{props := Conf}) ->
|
||||||
|
@ -356,15 +367,23 @@ print_session(ClientId) ->
|
||||||
%% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE
|
%% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% Suppress warnings about clauses handling unimplemented results
|
||||||
|
%% of `emqx_persistent_session_ds_shared_subs:on_subscribe/3`
|
||||||
|
-dialyzer({nowarn_function, subscribe/3}).
|
||||||
-spec subscribe(topic_filter(), emqx_types:subopts(), session()) ->
|
-spec subscribe(topic_filter(), emqx_types:subopts(), session()) ->
|
||||||
{ok, session()} | {error, emqx_types:reason_code()}.
|
{ok, session()} | {error, emqx_types:reason_code()}.
|
||||||
subscribe(
|
subscribe(
|
||||||
#share{},
|
#share{} = TopicFilter,
|
||||||
_SubOpts,
|
SubOpts,
|
||||||
_Session
|
Session
|
||||||
) ->
|
) ->
|
||||||
%% TODO: Shared subscriptions are not supported yet:
|
case emqx_persistent_session_ds_shared_subs:on_subscribe(TopicFilter, SubOpts, Session) of
|
||||||
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
|
{ok, S0, SharedSubS} ->
|
||||||
|
S = emqx_persistent_session_ds_state:commit(S0),
|
||||||
|
{ok, Session#{s => S, shared_sub_s => SharedSubS}};
|
||||||
|
Error = {error, _} ->
|
||||||
|
Error
|
||||||
|
end;
|
||||||
subscribe(
|
subscribe(
|
||||||
TopicFilter,
|
TopicFilter,
|
||||||
SubOpts,
|
SubOpts,
|
||||||
|
@ -378,8 +397,27 @@ subscribe(
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% Suppress warnings about clauses handling unimplemented results
|
||||||
|
%% of `emqx_persistent_session_ds_shared_subs:on_unsubscribe/4`
|
||||||
|
-dialyzer({nowarn_function, unsubscribe/2}).
|
||||||
-spec unsubscribe(topic_filter(), session()) ->
|
-spec unsubscribe(topic_filter(), session()) ->
|
||||||
{ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}.
|
{ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}.
|
||||||
|
unsubscribe(
|
||||||
|
#share{} = TopicFilter,
|
||||||
|
Session = #{id := SessionId, s := S0, shared_sub_s := SharedSubS0}
|
||||||
|
) ->
|
||||||
|
case
|
||||||
|
emqx_persistent_session_ds_shared_subs:on_unsubscribe(
|
||||||
|
SessionId, TopicFilter, S0, SharedSubS0
|
||||||
|
)
|
||||||
|
of
|
||||||
|
{ok, S1, SharedSubS1, #{id := SubId, subopts := SubOpts}} ->
|
||||||
|
S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
|
||||||
|
S = emqx_persistent_session_ds_state:commit(S2),
|
||||||
|
{ok, Session#{s => S, shared_sub_s => SharedSubS1}, SubOpts};
|
||||||
|
Error = {error, _} ->
|
||||||
|
Error
|
||||||
|
end;
|
||||||
unsubscribe(
|
unsubscribe(
|
||||||
TopicFilter,
|
TopicFilter,
|
||||||
Session = #{id := SessionId, s := S0}
|
Session = #{id := SessionId, s := S0}
|
||||||
|
@ -540,6 +578,8 @@ pubcomp(_ClientInfo, PacketId, Session0) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
%% Delivers
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
|
-spec deliver(clientinfo(), [emqx_types:deliver()], session()) ->
|
||||||
{ok, replies(), session()}.
|
{ok, replies(), session()}.
|
||||||
|
@ -551,6 +591,10 @@ deliver(ClientInfo, Delivers, Session0) ->
|
||||||
),
|
),
|
||||||
{ok, [], pull_now(Session)}.
|
{ok, [], pull_now(Session)}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Timeouts
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec handle_timeout(clientinfo(), _Timeout, session()) ->
|
-spec handle_timeout(clientinfo(), _Timeout, session()) ->
|
||||||
{ok, replies(), session()} | {ok, replies(), timeout(), session()}.
|
{ok, replies(), session()} | {ok, replies(), timeout(), session()}.
|
||||||
handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
|
handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
|
||||||
|
@ -573,14 +617,15 @@ handle_timeout(ClientInfo, ?TIMER_PULL, Session0) ->
|
||||||
handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
|
handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
|
||||||
Session = replay_streams(Session0, ClientInfo),
|
Session = replay_streams(Session0, ClientInfo),
|
||||||
{ok, [], Session};
|
{ok, [], Session};
|
||||||
handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
|
handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) ->
|
||||||
S1 = emqx_persistent_session_ds_subs:gc(S0),
|
S1 = emqx_persistent_session_ds_subs:gc(S0),
|
||||||
S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
|
S2 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
|
||||||
|
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S2, SharedSubS0),
|
||||||
Interval = get_config(ClientInfo, [renew_streams_interval]),
|
Interval = get_config(ClientInfo, [renew_streams_interval]),
|
||||||
Session = emqx_session:ensure_timer(
|
Session = emqx_session:ensure_timer(
|
||||||
?TIMER_GET_STREAMS,
|
?TIMER_GET_STREAMS,
|
||||||
Interval,
|
Interval,
|
||||||
Session0#{s => S}
|
Session0#{s => S, shared_sub_s => SharedSubS}
|
||||||
),
|
),
|
||||||
{ok, [], Session};
|
{ok, [], Session};
|
||||||
handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0 = #{s := S0}) ->
|
handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0 = #{s := S0}) ->
|
||||||
|
@ -601,6 +646,45 @@ handle_timeout(_ClientInfo, Timeout, Session) ->
|
||||||
?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}),
|
?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}),
|
||||||
{ok, [], Session}.
|
{ok, [], Session}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Generic messages
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec handle_info(term(), session()) -> session().
|
||||||
|
handle_info(?shared_sub_message(Msg), Session = #{s := S0, shared_sub_s := SharedSubS0}) ->
|
||||||
|
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_info(S0, SharedSubS0, Msg),
|
||||||
|
Session#{s => S, shared_sub_s => SharedSubS}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Shared subscription outgoing messages
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
shared_sub_opts(SessionId) ->
|
||||||
|
#{
|
||||||
|
session_id => SessionId,
|
||||||
|
send_funs => #{
|
||||||
|
send => fun send_message/2,
|
||||||
|
send_after => fun send_message_after/3
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
|
send_message(Dest, Msg) ->
|
||||||
|
case Dest =:= self() of
|
||||||
|
true ->
|
||||||
|
erlang:send(Dest, ?session_message(?shared_sub_message(Msg))),
|
||||||
|
Msg;
|
||||||
|
false ->
|
||||||
|
erlang:send(Dest, Msg)
|
||||||
|
end.
|
||||||
|
|
||||||
|
send_message_after(Time, Dest, Msg) ->
|
||||||
|
case Dest =:= self() of
|
||||||
|
true ->
|
||||||
|
erlang:send_after(Time, Dest, ?session_message(?shared_sub_message(Msg)));
|
||||||
|
false ->
|
||||||
|
erlang:send_after(Time, Dest, Msg)
|
||||||
|
end.
|
||||||
|
|
||||||
bump_last_alive(S0) ->
|
bump_last_alive(S0) ->
|
||||||
%% Note: we take a pessimistic approach here and assume that the client will be alive
|
%% Note: we take a pessimistic approach here and assume that the client will be alive
|
||||||
%% until the next bump timeout. With this, we avoid garbage collecting this session
|
%% until the next bump timeout. With this, we avoid garbage collecting this session
|
||||||
|
@ -814,13 +898,17 @@ session_open(
|
||||||
S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3),
|
S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3),
|
||||||
S5 = set_clientinfo(ClientInfo, S4),
|
S5 = set_clientinfo(ClientInfo, S4),
|
||||||
S6 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S5),
|
S6 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S5),
|
||||||
S = emqx_persistent_session_ds_state:commit(S6),
|
{ok, S7, SharedSubS} = emqx_persistent_session_ds_shared_subs:open(
|
||||||
|
S6, shared_sub_opts(SessionId)
|
||||||
|
),
|
||||||
|
S = emqx_persistent_session_ds_state:commit(S7),
|
||||||
Inflight = emqx_persistent_session_ds_inflight:new(
|
Inflight = emqx_persistent_session_ds_inflight:new(
|
||||||
receive_maximum(NewConnInfo)
|
receive_maximum(NewConnInfo)
|
||||||
),
|
),
|
||||||
#{
|
#{
|
||||||
id => SessionId,
|
id => SessionId,
|
||||||
s => S,
|
s => S,
|
||||||
|
shared_sub_s => SharedSubS,
|
||||||
inflight => Inflight,
|
inflight => Inflight,
|
||||||
props => #{}
|
props => #{}
|
||||||
}
|
}
|
||||||
|
@ -869,6 +957,7 @@ session_ensure_new(
|
||||||
id => Id,
|
id => Id,
|
||||||
props => Conf,
|
props => Conf,
|
||||||
s => S,
|
s => S,
|
||||||
|
shared_sub_s => emqx_persistent_session_ds_shared_subs:new(shared_sub_opts(Id)),
|
||||||
inflight => emqx_persistent_session_ds_inflight:new(receive_maximum(ConnInfo))
|
inflight => emqx_persistent_session_ds_inflight:new(receive_maximum(ConnInfo))
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -879,8 +968,8 @@ session_drop(SessionId, Reason) ->
|
||||||
case emqx_persistent_session_ds_state:open(SessionId) of
|
case emqx_persistent_session_ds_state:open(SessionId) of
|
||||||
{ok, S0} ->
|
{ok, S0} ->
|
||||||
?tp(debug, drop_persistent_session, #{client_id => SessionId, reason => Reason}),
|
?tp(debug, drop_persistent_session, #{client_id => SessionId, reason => Reason}),
|
||||||
emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0),
|
ok = emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0),
|
||||||
emqx_persistent_session_ds_state:delete(SessionId);
|
ok = emqx_persistent_session_ds_state:delete(SessionId);
|
||||||
undefined ->
|
undefined ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
@ -917,9 +1006,12 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
|
||||||
%% Normal replay:
|
%% Normal replay:
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
fetch_new_messages(Session = #{s := S}, ClientInfo) ->
|
fetch_new_messages(Session0 = #{s := S0}, ClientInfo) ->
|
||||||
Streams = emqx_persistent_session_ds_stream_scheduler:find_new_streams(S),
|
Streams = emqx_persistent_session_ds_stream_scheduler:find_new_streams(S0),
|
||||||
fetch_new_messages(Streams, Session, ClientInfo).
|
Session1 = fetch_new_messages(Streams, Session0, ClientInfo),
|
||||||
|
#{s := S1, shared_sub_s := SharedSubS0} = Session1,
|
||||||
|
{S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0),
|
||||||
|
Session1#{s => S2, shared_sub_s => SharedSubS1}.
|
||||||
|
|
||||||
fetch_new_messages([], Session, _ClientInfo) ->
|
fetch_new_messages([], Session, _ClientInfo) ->
|
||||||
Session;
|
Session;
|
||||||
|
|
|
@ -0,0 +1,379 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_persistent_session_ds_shared_subs).
|
||||||
|
|
||||||
|
-include("emqx_mqtt.hrl").
|
||||||
|
-include("logger.hrl").
|
||||||
|
-include("session_internals.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/trace.hrl").
|
||||||
|
|
||||||
|
-export([
|
||||||
|
new/1,
|
||||||
|
open/2,
|
||||||
|
|
||||||
|
on_subscribe/3,
|
||||||
|
on_unsubscribe/4,
|
||||||
|
|
||||||
|
on_streams_replayed/2,
|
||||||
|
on_info/3,
|
||||||
|
|
||||||
|
renew_streams/2,
|
||||||
|
to_map/2
|
||||||
|
]).
|
||||||
|
|
||||||
|
-record(agent_message, {
|
||||||
|
message :: term()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type t() :: #{
|
||||||
|
agent := emqx_persistent_session_ds_shared_subs_agent:t()
|
||||||
|
}.
|
||||||
|
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
|
||||||
|
-type opts() :: #{
|
||||||
|
session_id := emqx_persistent_session_ds:id(),
|
||||||
|
send_funs := #{
|
||||||
|
send := fun((pid(), term()) -> term()),
|
||||||
|
send_after := fun((non_neg_integer(), pid(), term()) -> reference())
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
|
-define(agent_message(Msg), #agent_message{message = Msg}).
|
||||||
|
-define(rank_x, rank_shared).
|
||||||
|
-define(rank_y, 0).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec new(opts()) -> t().
|
||||||
|
new(Opts) ->
|
||||||
|
#{
|
||||||
|
agent => emqx_persistent_session_ds_shared_subs_agent:new(
|
||||||
|
agent_opts(Opts)
|
||||||
|
)
|
||||||
|
}.
|
||||||
|
|
||||||
|
-spec open(emqx_persistent_session_ds_state:t(), opts()) ->
|
||||||
|
{ok, emqx_persistent_session_ds_state:t(), t()}.
|
||||||
|
open(S, Opts) ->
|
||||||
|
SharedSubscriptions = fold_shared_subs(
|
||||||
|
fun(#share{} = TopicFilter, Sub, Acc) ->
|
||||||
|
[{TopicFilter, to_agent_subscription(S, Sub)} | Acc]
|
||||||
|
end,
|
||||||
|
[],
|
||||||
|
S
|
||||||
|
),
|
||||||
|
Agent = emqx_persistent_session_ds_shared_subs_agent:open(
|
||||||
|
SharedSubscriptions, agent_opts(Opts)
|
||||||
|
),
|
||||||
|
SharedSubS = #{agent => Agent},
|
||||||
|
{ok, S, SharedSubS}.
|
||||||
|
|
||||||
|
-spec on_subscribe(
|
||||||
|
share_topic_filter(),
|
||||||
|
emqx_types:subopts(),
|
||||||
|
emqx_persistent_session_ds:session()
|
||||||
|
) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}.
|
||||||
|
on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) ->
|
||||||
|
Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S),
|
||||||
|
on_subscribe(Subscription, TopicFilter, SubOpts, Session).
|
||||||
|
|
||||||
|
-spec on_unsubscribe(
|
||||||
|
emqx_persistent_session_ds:id(),
|
||||||
|
emqx_persistent_session_ds:topic_filter(),
|
||||||
|
emqx_persistent_session_ds_state:t(),
|
||||||
|
t()
|
||||||
|
) ->
|
||||||
|
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
|
||||||
|
| {error, emqx_types:reason_code()}.
|
||||||
|
on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) ->
|
||||||
|
case lookup(TopicFilter, S0) of
|
||||||
|
undefined ->
|
||||||
|
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
|
||||||
|
Subscription ->
|
||||||
|
?tp(persistent_session_ds_subscription_delete, #{
|
||||||
|
session_id => SessionId, topic_filter => TopicFilter
|
||||||
|
}),
|
||||||
|
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
|
||||||
|
Agent0, TopicFilter
|
||||||
|
),
|
||||||
|
SharedSubS = SharedSubS0#{agent => Agent1},
|
||||||
|
S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0),
|
||||||
|
{ok, S, SharedSubS, Subscription}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
|
||||||
|
{emqx_persistent_session_ds_state:t(), t()}.
|
||||||
|
renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
|
||||||
|
{NewLeasedStreams, RevokedStreams, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
|
||||||
|
Agent0
|
||||||
|
),
|
||||||
|
NewLeasedStreams =/= [] andalso
|
||||||
|
?SLOG(
|
||||||
|
info, #{msg => shared_subs_new_stream_leases, stream_leases => NewLeasedStreams}
|
||||||
|
),
|
||||||
|
S1 = lists:foldl(fun accept_stream/2, S0, NewLeasedStreams),
|
||||||
|
S2 = lists:foldl(fun revoke_stream/2, S1, RevokedStreams),
|
||||||
|
SharedSubS1 = SharedSubS0#{agent => Agent1},
|
||||||
|
{S2, SharedSubS1}.
|
||||||
|
|
||||||
|
-spec on_streams_replayed(
|
||||||
|
emqx_persistent_session_ds_state:t(),
|
||||||
|
t()
|
||||||
|
) -> {emqx_persistent_session_ds_state:t(), t()}.
|
||||||
|
on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
|
||||||
|
%% TODO
|
||||||
|
%% Is it sufficient for a report?
|
||||||
|
Progress = fold_shared_stream_states(
|
||||||
|
fun(TopicFilter, Stream, SRS, Acc) ->
|
||||||
|
#srs{it_begin = BeginIt} = SRS,
|
||||||
|
StreamProgress = #{
|
||||||
|
topic_filter => TopicFilter,
|
||||||
|
stream => Stream,
|
||||||
|
iterator => BeginIt
|
||||||
|
},
|
||||||
|
[StreamProgress | Acc]
|
||||||
|
end,
|
||||||
|
[],
|
||||||
|
S
|
||||||
|
),
|
||||||
|
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
|
||||||
|
Agent0, Progress
|
||||||
|
),
|
||||||
|
SharedSubS1 = SharedSubS0#{agent => Agent1},
|
||||||
|
{S, SharedSubS1}.
|
||||||
|
|
||||||
|
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
|
||||||
|
{emqx_persistent_session_ds_state:t(), t()}.
|
||||||
|
on_info(S, #{agent := Agent0} = SharedSubS0, ?agent_message(Info)) ->
|
||||||
|
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
|
||||||
|
SharedSubS1 = SharedSubS0#{agent => Agent1},
|
||||||
|
{S, SharedSubS1};
|
||||||
|
on_info(S, SharedSubS, _Info) ->
|
||||||
|
%% TODO
|
||||||
|
%% Log warning
|
||||||
|
{S, SharedSubS}.
|
||||||
|
|
||||||
|
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
|
||||||
|
to_map(_S, _SharedSubS) ->
|
||||||
|
%% TODO
|
||||||
|
#{}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
fold_shared_subs(Fun, Acc, S) ->
|
||||||
|
emqx_persistent_session_ds_state:fold_subscriptions(
|
||||||
|
fun
|
||||||
|
(#share{} = TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0);
|
||||||
|
(_, _Sub, Acc0) -> Acc0
|
||||||
|
end,
|
||||||
|
Acc,
|
||||||
|
S
|
||||||
|
).
|
||||||
|
|
||||||
|
fold_shared_stream_states(Fun, Acc, S) ->
|
||||||
|
%% TODO
|
||||||
|
%% Optimize or cache
|
||||||
|
TopicFilters = fold_shared_subs(
|
||||||
|
fun
|
||||||
|
(#share{} = TopicFilter, #{id := Id} = _Sub, Acc0) ->
|
||||||
|
Acc0#{Id => TopicFilter};
|
||||||
|
(_, _, Acc0) ->
|
||||||
|
Acc0
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
S
|
||||||
|
),
|
||||||
|
emqx_persistent_session_ds_state:fold_streams(
|
||||||
|
fun({SubId, Stream}, SRS, Acc0) ->
|
||||||
|
case TopicFilters of
|
||||||
|
#{SubId := TopicFilter} ->
|
||||||
|
Fun(TopicFilter, Stream, SRS, Acc0);
|
||||||
|
_ ->
|
||||||
|
Acc0
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
Acc,
|
||||||
|
S
|
||||||
|
).
|
||||||
|
|
||||||
|
on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
|
||||||
|
#{max_subscriptions := MaxSubscriptions} = Props,
|
||||||
|
case emqx_persistent_session_ds_state:n_subscriptions(S) < MaxSubscriptions of
|
||||||
|
true ->
|
||||||
|
create_new_subscription(TopicFilter, SubOpts, Session);
|
||||||
|
false ->
|
||||||
|
{error, ?RC_QUOTA_EXCEEDED}
|
||||||
|
end;
|
||||||
|
on_subscribe(Subscription, TopicFilter, SubOpts, Session) ->
|
||||||
|
update_subscription(Subscription, TopicFilter, SubOpts, Session).
|
||||||
|
|
||||||
|
-dialyzer({nowarn_function, create_new_subscription/3}).
|
||||||
|
create_new_subscription(TopicFilter, SubOpts, #{
|
||||||
|
id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props
|
||||||
|
}) ->
|
||||||
|
case
|
||||||
|
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
|
||||||
|
Agent0, TopicFilter, SubOpts
|
||||||
|
)
|
||||||
|
of
|
||||||
|
{ok, Agent1} ->
|
||||||
|
#{upgrade_qos := UpgradeQoS} = Props,
|
||||||
|
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
|
||||||
|
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
|
||||||
|
SState = #{
|
||||||
|
parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts
|
||||||
|
},
|
||||||
|
S3 = emqx_persistent_session_ds_state:put_subscription_state(
|
||||||
|
SStateId, SState, S2
|
||||||
|
),
|
||||||
|
Subscription = #{
|
||||||
|
id => SubId,
|
||||||
|
current_state => SStateId,
|
||||||
|
start_time => now_ms()
|
||||||
|
},
|
||||||
|
S = emqx_persistent_session_ds_state:put_subscription(
|
||||||
|
TopicFilter, Subscription, S3
|
||||||
|
),
|
||||||
|
SharedSubS = SharedSubS0#{agent => Agent1},
|
||||||
|
?tp(persistent_session_ds_shared_subscription_added, #{
|
||||||
|
topic_filter => TopicFilter, session => SessionId
|
||||||
|
}),
|
||||||
|
{ok, S, SharedSubS};
|
||||||
|
{error, _} = Error ->
|
||||||
|
Error
|
||||||
|
end.
|
||||||
|
|
||||||
|
update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilter, SubOpts, #{
|
||||||
|
s := S0, shared_sub_s := SharedSubS, props := Props
|
||||||
|
}) ->
|
||||||
|
#{upgrade_qos := UpgradeQoS} = Props,
|
||||||
|
SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
|
||||||
|
case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
|
||||||
|
SState ->
|
||||||
|
%% Client resubscribed with the same parameters:
|
||||||
|
{ok, S0, SharedSubS};
|
||||||
|
_ ->
|
||||||
|
%% Subsription parameters changed:
|
||||||
|
{SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0),
|
||||||
|
S2 = emqx_persistent_session_ds_state:put_subscription_state(
|
||||||
|
SStateId, SState, S1
|
||||||
|
),
|
||||||
|
Sub = Sub0#{current_state => SStateId},
|
||||||
|
S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
|
||||||
|
{ok, S, SharedSubS}
|
||||||
|
end.
|
||||||
|
|
||||||
|
lookup(TopicFilter, S) ->
|
||||||
|
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of
|
||||||
|
Sub = #{current_state := SStateId} ->
|
||||||
|
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
|
||||||
|
#{subopts := SubOpts} ->
|
||||||
|
Sub#{subopts => SubOpts};
|
||||||
|
undefined ->
|
||||||
|
undefined
|
||||||
|
end;
|
||||||
|
undefined ->
|
||||||
|
undefined
|
||||||
|
end.
|
||||||
|
|
||||||
|
accept_stream(
|
||||||
|
#{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0
|
||||||
|
) ->
|
||||||
|
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
|
||||||
|
undefined ->
|
||||||
|
%% This should not happen.
|
||||||
|
%% Agent should have received unsubscribe callback
|
||||||
|
%% and should not have passed this stream as a new one
|
||||||
|
error(new_stream_without_sub);
|
||||||
|
#{id := SubId, current_state := SStateId} ->
|
||||||
|
Key = {SubId, Stream},
|
||||||
|
case emqx_persistent_session_ds_state:get_stream(Key, S0) of
|
||||||
|
undefined ->
|
||||||
|
NewSRS =
|
||||||
|
#srs{
|
||||||
|
rank_x = ?rank_x,
|
||||||
|
rank_y = ?rank_y,
|
||||||
|
it_begin = Iterator,
|
||||||
|
it_end = Iterator,
|
||||||
|
sub_state_id = SStateId
|
||||||
|
},
|
||||||
|
S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0),
|
||||||
|
S1;
|
||||||
|
_SRS ->
|
||||||
|
S0
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
revoke_stream(
|
||||||
|
#{topic_filter := TopicFilter, stream := Stream}, S0
|
||||||
|
) ->
|
||||||
|
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
|
||||||
|
undefined ->
|
||||||
|
%% This should not happen.
|
||||||
|
%% Agent should have received unsubscribe callback
|
||||||
|
%% and should not have revoked this stream
|
||||||
|
S0;
|
||||||
|
#{id := SubId} ->
|
||||||
|
Key = {SubId, Stream},
|
||||||
|
case emqx_persistent_session_ds_state:get_stream(Key, S0) of
|
||||||
|
undefined ->
|
||||||
|
S0;
|
||||||
|
SRS0 ->
|
||||||
|
SRS1 = SRS0#srs{unsubscribed = true},
|
||||||
|
S1 = emqx_persistent_session_ds_state:put_stream(Key, SRS1, S0),
|
||||||
|
S1
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec to_agent_subscription(
|
||||||
|
emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()
|
||||||
|
) ->
|
||||||
|
emqx_persistent_session_ds_shared_subs_agent:subscription().
|
||||||
|
to_agent_subscription(_S, Subscription) ->
|
||||||
|
%% TODO
|
||||||
|
%% do we need anything from sub state?
|
||||||
|
maps:with([start_time], Subscription).
|
||||||
|
|
||||||
|
-spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts().
|
||||||
|
agent_opts(#{session_id := SessionId, send_funs := SendFuns}) ->
|
||||||
|
#{
|
||||||
|
session_id => SessionId,
|
||||||
|
send_funs => agent_send_funs(SendFuns)
|
||||||
|
}.
|
||||||
|
|
||||||
|
agent_send_funs(#{
|
||||||
|
send := Send,
|
||||||
|
send_after := SendAfter
|
||||||
|
}) ->
|
||||||
|
#{
|
||||||
|
send => fun(Pid, Msg) -> send_from_agent(Send, Pid, Msg) end,
|
||||||
|
send_after => fun(Time, Pid, Msg) ->
|
||||||
|
send_after_from_agent(SendAfter, Time, Pid, Msg)
|
||||||
|
end
|
||||||
|
}.
|
||||||
|
|
||||||
|
send_from_agent(Send, Dest, Msg) ->
|
||||||
|
case Dest =:= self() of
|
||||||
|
true ->
|
||||||
|
Send(Dest, ?agent_message(Msg)),
|
||||||
|
Msg;
|
||||||
|
false ->
|
||||||
|
Send(Dest, Msg)
|
||||||
|
end.
|
||||||
|
|
||||||
|
send_after_from_agent(SendAfter, Time, Dest, Msg) ->
|
||||||
|
case Dest =:= self() of
|
||||||
|
true ->
|
||||||
|
SendAfter(Time, Dest, ?agent_message(Msg));
|
||||||
|
false ->
|
||||||
|
SendAfter(Time, Dest, Msg)
|
||||||
|
end.
|
||||||
|
|
||||||
|
-dialyzer({nowarn_function, now_ms/0}).
|
||||||
|
now_ms() ->
|
||||||
|
erlang:system_time(millisecond).
|
|
@ -0,0 +1,112 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_persistent_session_ds_shared_subs_agent).
|
||||||
|
|
||||||
|
-include("shared_subs_agent.hrl").
|
||||||
|
|
||||||
|
-type session_id() :: emqx_persistent_session_ds:id().
|
||||||
|
|
||||||
|
-type subscription() :: #{
|
||||||
|
start_time := emqx_ds:time()
|
||||||
|
}.
|
||||||
|
|
||||||
|
-type t() :: term().
|
||||||
|
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
|
||||||
|
|
||||||
|
-type opts() :: #{
|
||||||
|
session_id := session_id(),
|
||||||
|
send_funs := #{
|
||||||
|
send := fun((pid(), term()) -> term()),
|
||||||
|
send_after := fun((non_neg_integer(), pid(), term()) -> reference())
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
|
%% TODO
|
||||||
|
%% This records goe through network, we better shrink them
|
||||||
|
%% * use integer keys
|
||||||
|
%% * somehow avoid passing stream and topic_filter — they both are part of the iterator
|
||||||
|
-type stream_lease() :: #{
|
||||||
|
%% Used as "external" subscription_id
|
||||||
|
topic_filter := topic_filter(),
|
||||||
|
stream := emqx_ds:stream(),
|
||||||
|
iterator := emqx_ds:iterator()
|
||||||
|
}.
|
||||||
|
|
||||||
|
-type stream_revoke() :: #{
|
||||||
|
topic_filter := topic_filter(),
|
||||||
|
stream := emqx_ds:stream()
|
||||||
|
}.
|
||||||
|
|
||||||
|
-type stream_progress() :: #{
|
||||||
|
topic_filter := topic_filter(),
|
||||||
|
stream := emqx_ds:stream(),
|
||||||
|
iterator := emqx_ds:iterator()
|
||||||
|
}.
|
||||||
|
|
||||||
|
-export_type([
|
||||||
|
t/0,
|
||||||
|
subscription/0,
|
||||||
|
session_id/0,
|
||||||
|
stream_lease/0,
|
||||||
|
opts/0
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([
|
||||||
|
new/1,
|
||||||
|
open/2,
|
||||||
|
|
||||||
|
on_subscribe/3,
|
||||||
|
on_unsubscribe/2,
|
||||||
|
on_stream_progress/2,
|
||||||
|
on_info/2,
|
||||||
|
|
||||||
|
renew_streams/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Behaviour
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-callback new(opts()) -> t().
|
||||||
|
-callback open([{topic_filter(), subscription()}], opts()) -> t().
|
||||||
|
-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
|
||||||
|
{ok, t()} | {error, term()}.
|
||||||
|
-callback on_unsubscribe(t(), topic_filter()) -> t().
|
||||||
|
-callback renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}.
|
||||||
|
-callback on_stream_progress(t(), [stream_progress()]) -> t().
|
||||||
|
-callback on_info(t(), term()) -> t().
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec new(opts()) -> t().
|
||||||
|
new(Opts) ->
|
||||||
|
?shared_subs_agent:new(Opts).
|
||||||
|
|
||||||
|
-spec open([{topic_filter(), subscription()}], opts()) -> t().
|
||||||
|
open(Topics, Opts) ->
|
||||||
|
?shared_subs_agent:open(Topics, Opts).
|
||||||
|
|
||||||
|
-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
|
||||||
|
{ok, t()} | {error, emqx_types:reason_code()}.
|
||||||
|
on_subscribe(Agent, TopicFilter, SubOpts) ->
|
||||||
|
?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts).
|
||||||
|
|
||||||
|
-spec on_unsubscribe(t(), topic_filter()) -> t().
|
||||||
|
on_unsubscribe(Agent, TopicFilter) ->
|
||||||
|
?shared_subs_agent:on_unsubscribe(Agent, TopicFilter).
|
||||||
|
|
||||||
|
-spec renew_streams(t()) -> {[stream_lease()], [stream_revoke()], t()}.
|
||||||
|
renew_streams(Agent) ->
|
||||||
|
?shared_subs_agent:renew_streams(Agent).
|
||||||
|
|
||||||
|
-spec on_stream_progress(t(), [stream_progress()]) -> t().
|
||||||
|
on_stream_progress(Agent, StreamProgress) ->
|
||||||
|
?shared_subs_agent:on_stream_progress(Agent, StreamProgress).
|
||||||
|
|
||||||
|
-spec on_info(t(), term()) -> t().
|
||||||
|
on_info(Agent, Info) ->
|
||||||
|
?shared_subs_agent:on_info(Agent, Info).
|
|
@ -0,0 +1,46 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_persistent_session_ds_shared_subs_null_agent).
|
||||||
|
|
||||||
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
|
-export([
|
||||||
|
new/1,
|
||||||
|
open/2,
|
||||||
|
|
||||||
|
on_subscribe/3,
|
||||||
|
on_unsubscribe/2,
|
||||||
|
on_stream_progress/2,
|
||||||
|
on_info/2,
|
||||||
|
|
||||||
|
renew_streams/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-behaviour(emqx_persistent_session_ds_shared_subs_agent).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
new(_Opts) ->
|
||||||
|
undefined.
|
||||||
|
|
||||||
|
open(_Topics, _Opts) ->
|
||||||
|
undefined.
|
||||||
|
|
||||||
|
on_subscribe(_Agent, _TopicFilter, _SubOpts) ->
|
||||||
|
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}.
|
||||||
|
|
||||||
|
on_unsubscribe(Agent, _TopicFilter) ->
|
||||||
|
Agent.
|
||||||
|
|
||||||
|
renew_streams(Agent) ->
|
||||||
|
{[], [], Agent}.
|
||||||
|
|
||||||
|
on_stream_progress(Agent, _StreamProgress) ->
|
||||||
|
Agent.
|
||||||
|
|
||||||
|
on_info(Agent, _Info) ->
|
||||||
|
Agent.
|
|
@ -127,7 +127,12 @@ renew_streams(S0) ->
|
||||||
S1 = remove_unsubscribed_streams(S0),
|
S1 = remove_unsubscribed_streams(S0),
|
||||||
S2 = remove_fully_replayed_streams(S1),
|
S2 = remove_fully_replayed_streams(S1),
|
||||||
S3 = update_stream_subscription_state_ids(S2),
|
S3 = update_stream_subscription_state_ids(S2),
|
||||||
emqx_persistent_session_ds_subs:fold(
|
%% For shared subscriptions, the streams are populated by
|
||||||
|
%% `emqx_persistent_session_ds_shared_subs`.
|
||||||
|
%% TODO
|
||||||
|
%% Move discovery of proper streams
|
||||||
|
%% out of the scheduler for complete symmetry?
|
||||||
|
fold_proper_subscriptions(
|
||||||
fun
|
fun
|
||||||
(Key, #{start_time := StartTime, id := SubId, current_state := SStateId}, Acc) ->
|
(Key, #{start_time := StartTime, id := SubId, current_state := SStateId}, Acc) ->
|
||||||
TopicFilter = emqx_topic:words(Key),
|
TopicFilter = emqx_topic:words(Key),
|
||||||
|
@ -206,9 +211,6 @@ ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream
|
||||||
Key = {SubId, Stream},
|
Key = {SubId, Stream},
|
||||||
case emqx_persistent_session_ds_state:get_stream(Key, S) of
|
case emqx_persistent_session_ds_state:get_stream(Key, S) of
|
||||||
undefined ->
|
undefined ->
|
||||||
?SLOG(debug, #{
|
|
||||||
msg => new_stream, key => Key, stream => Stream
|
|
||||||
}),
|
|
||||||
case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of
|
case emqx_ds:make_iterator(?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime) of
|
||||||
{ok, Iterator} ->
|
{ok, Iterator} ->
|
||||||
NewStreamState = #srs{
|
NewStreamState = #srs{
|
||||||
|
@ -420,3 +422,13 @@ shuffle(L0) ->
|
||||||
L2 = lists:sort(L1),
|
L2 = lists:sort(L1),
|
||||||
{_, L} = lists:unzip(L2),
|
{_, L} = lists:unzip(L2),
|
||||||
L.
|
L.
|
||||||
|
|
||||||
|
fold_proper_subscriptions(Fun, Acc, S) ->
|
||||||
|
emqx_persistent_session_ds_state:fold_subscriptions(
|
||||||
|
fun
|
||||||
|
(#share{}, _Sub, Acc0) -> Acc0;
|
||||||
|
(TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0)
|
||||||
|
end,
|
||||||
|
Acc,
|
||||||
|
S
|
||||||
|
).
|
||||||
|
|
|
@ -30,8 +30,7 @@
|
||||||
on_session_drop/2,
|
on_session_drop/2,
|
||||||
gc/1,
|
gc/1,
|
||||||
lookup/2,
|
lookup/2,
|
||||||
to_map/1,
|
to_map/1
|
||||||
fold/3
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Management API:
|
%% Management API:
|
||||||
|
@ -160,7 +159,7 @@ on_unsubscribe(SessionId, TopicFilter, S0) ->
|
||||||
|
|
||||||
-spec on_session_drop(emqx_persistent_session_ds:id(), emqx_persistent_session_ds_state:t()) -> ok.
|
-spec on_session_drop(emqx_persistent_session_ds:id(), emqx_persistent_session_ds_state:t()) -> ok.
|
||||||
on_session_drop(SessionId, S0) ->
|
on_session_drop(SessionId, S0) ->
|
||||||
fold(
|
_ = fold_proper_subscriptions(
|
||||||
fun(TopicFilter, _Subscription, S) ->
|
fun(TopicFilter, _Subscription, S) ->
|
||||||
case on_unsubscribe(SessionId, TopicFilter, S) of
|
case on_unsubscribe(SessionId, TopicFilter, S) of
|
||||||
{ok, S1, _} -> S1;
|
{ok, S1, _} -> S1;
|
||||||
|
@ -169,10 +168,14 @@ on_session_drop(SessionId, S0) ->
|
||||||
end,
|
end,
|
||||||
S0,
|
S0,
|
||||||
S0
|
S0
|
||||||
).
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%% @doc Remove subscription states that don't have a parent, and that
|
%% @doc Remove subscription states that don't have a parent, and that
|
||||||
%% don't have any unacked messages:
|
%% don't have any unacked messages.
|
||||||
|
%% TODO
|
||||||
|
%% This function collects shared subs as well
|
||||||
|
%% Move to a separate module to keep symmetry?
|
||||||
-spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
|
-spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
|
||||||
gc(S0) ->
|
gc(S0) ->
|
||||||
%% Create a set of subscription states IDs referenced either by a
|
%% Create a set of subscription states IDs referenced either by a
|
||||||
|
@ -210,7 +213,7 @@ gc(S0) ->
|
||||||
S0
|
S0
|
||||||
).
|
).
|
||||||
|
|
||||||
%% @doc Fold over active subscriptions:
|
%% @doc Lookup a subscription and merge it with its current state:
|
||||||
-spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) ->
|
-spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) ->
|
||||||
emqx_persistent_session_ds:subscription() | undefined.
|
emqx_persistent_session_ds:subscription() | undefined.
|
||||||
lookup(TopicFilter, S) ->
|
lookup(TopicFilter, S) ->
|
||||||
|
@ -230,22 +233,12 @@ lookup(TopicFilter, S) ->
|
||||||
%% purpose:
|
%% purpose:
|
||||||
-spec to_map(emqx_persistent_session_ds_state:t()) -> map().
|
-spec to_map(emqx_persistent_session_ds_state:t()) -> map().
|
||||||
to_map(S) ->
|
to_map(S) ->
|
||||||
fold(
|
fold_proper_subscriptions(
|
||||||
fun(TopicFilter, _, Acc) -> Acc#{TopicFilter => lookup(TopicFilter, S)} end,
|
fun(TopicFilter, _, Acc) -> Acc#{TopicFilter => lookup(TopicFilter, S)} end,
|
||||||
#{},
|
#{},
|
||||||
S
|
S
|
||||||
).
|
).
|
||||||
|
|
||||||
%% @doc Fold over active subscriptions:
|
|
||||||
-spec fold(
|
|
||||||
fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc),
|
|
||||||
Acc,
|
|
||||||
emqx_persistent_session_ds_state:t()
|
|
||||||
) ->
|
|
||||||
Acc.
|
|
||||||
fold(Fun, Acc, S) ->
|
|
||||||
emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S).
|
|
||||||
|
|
||||||
-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
|
-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
|
||||||
emqx_persistent_session_ds:subscription() | undefined.
|
emqx_persistent_session_ds:subscription() | undefined.
|
||||||
cold_get_subscription(SessionId, Topic) ->
|
cold_get_subscription(SessionId, Topic) ->
|
||||||
|
@ -267,5 +260,15 @@ cold_get_subscription(SessionId, Topic) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
|
fold_proper_subscriptions(Fun, Acc, S) ->
|
||||||
|
emqx_persistent_session_ds_state:fold_subscriptions(
|
||||||
|
fun
|
||||||
|
(#share{}, _Sub, Acc0) -> Acc0;
|
||||||
|
(TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0)
|
||||||
|
end,
|
||||||
|
Acc,
|
||||||
|
S
|
||||||
|
).
|
||||||
|
|
||||||
now_ms() ->
|
now_ms() ->
|
||||||
erlang:system_time(millisecond).
|
erlang:system_time(millisecond).
|
||||||
|
|
|
@ -71,4 +71,11 @@
|
||||||
sub_state_id :: emqx_persistent_session_ds_subs:subscription_state_id()
|
sub_state_id :: emqx_persistent_session_ds_subs:subscription_state_id()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
%% (Erlang) messages that session should forward to the
|
||||||
|
%% shared subscription handler.
|
||||||
|
-record(shared_sub_message, {
|
||||||
|
message :: term()
|
||||||
|
}).
|
||||||
|
-define(shared_sub_message(MSG), #shared_sub_message{message = MSG}).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-ifndef(SHARED_SUBS_AGENT_HRL).
|
||||||
|
-define(SHARED_SUBS_AGENT_HRL, true).
|
||||||
|
|
||||||
|
-ifdef(EMQX_RELEASE_EDITION).
|
||||||
|
|
||||||
|
-if(?EMQX_RELEASE_EDITION == ee).
|
||||||
|
|
||||||
|
%% agent from BSL app
|
||||||
|
% -define(shared_subs_agent, emqx_ds_shared_sub_agent).
|
||||||
|
%% Till full implementation we need to dispach to the null agent.
|
||||||
|
%% It will report "not implemented" error for attempts to use shared subscriptions.
|
||||||
|
-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
|
||||||
|
|
||||||
|
%% -if(?EMQX_RELEASE_EDITION == ee).
|
||||||
|
-else.
|
||||||
|
|
||||||
|
-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
|
||||||
|
|
||||||
|
%% -if(?EMQX_RELEASE_EDITION == ee).
|
||||||
|
-endif.
|
||||||
|
|
||||||
|
%% -ifdef(EMQX_RELEASE_EDITION).
|
||||||
|
-else.
|
||||||
|
|
||||||
|
-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
|
||||||
|
|
||||||
|
%% -ifdef(EMQX_RELEASE_EDITION).
|
||||||
|
-endif.
|
||||||
|
|
||||||
|
-endif.
|
|
@ -83,6 +83,7 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
deliver/3,
|
deliver/3,
|
||||||
|
handle_info/2,
|
||||||
handle_timeout/3,
|
handle_timeout/3,
|
||||||
disconnect/3,
|
disconnect/3,
|
||||||
terminate/3
|
terminate/3
|
||||||
|
@ -188,6 +189,10 @@
|
||||||
-callback destroy(t() | clientinfo()) -> ok.
|
-callback destroy(t() | clientinfo()) -> ok.
|
||||||
-callback clear_will_message(t()) -> t().
|
-callback clear_will_message(t()) -> t().
|
||||||
-callback publish_will_message_now(t(), message()) -> t().
|
-callback publish_will_message_now(t(), message()) -> t().
|
||||||
|
-callback handle_timeout(clientinfo(), common_timer_name() | custom_timer_name(), t()) ->
|
||||||
|
{ok, replies(), t()}
|
||||||
|
| {ok, replies(), timeout(), t()}.
|
||||||
|
-callback handle_info(term(), t()) -> t().
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Create a Session
|
%% Create a Session
|
||||||
|
@ -484,6 +489,14 @@ enrich_subopts(_Opt, _V, Msg, _) ->
|
||||||
handle_timeout(ClientInfo, Timer, Session) ->
|
handle_timeout(ClientInfo, Timer, Session) ->
|
||||||
?IMPL(Session):handle_timeout(ClientInfo, Timer, Session).
|
?IMPL(Session):handle_timeout(ClientInfo, Timer, Session).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Generic Messages
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec handle_info(term(), t()) -> t().
|
||||||
|
handle_info(Info, Session) ->
|
||||||
|
?IMPL(Session):handle_info(Info, Session).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec ensure_timer(custom_timer_name(), timeout(), map()) ->
|
-spec ensure_timer(custom_timer_name(), timeout(), map()) ->
|
||||||
|
|
|
@ -87,6 +87,7 @@
|
||||||
deliver/3,
|
deliver/3,
|
||||||
replay/3,
|
replay/3,
|
||||||
handle_timeout/3,
|
handle_timeout/3,
|
||||||
|
handle_info/2,
|
||||||
disconnect/2,
|
disconnect/2,
|
||||||
terminate/2
|
terminate/2
|
||||||
]).
|
]).
|
||||||
|
@ -597,6 +598,15 @@ handle_timeout(ClientInfo, retry_delivery, Session) ->
|
||||||
handle_timeout(ClientInfo, expire_awaiting_rel, Session) ->
|
handle_timeout(ClientInfo, expire_awaiting_rel, Session) ->
|
||||||
expire(ClientInfo, Session).
|
expire(ClientInfo, Session).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Geneic messages
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec handle_info(term(), session()) -> session().
|
||||||
|
handle_info(Msg, Session) ->
|
||||||
|
?SLOG(warning, #{msg => emqx_session_mem_unknown_message, message => Msg}),
|
||||||
|
Session.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Retry Delivery
|
%% Retry Delivery
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
Business Source License 1.1
|
||||||
|
|
||||||
|
Licensor: Hangzhou EMQ Technologies Co., Ltd.
|
||||||
|
Licensed Work: EMQX Enterprise Edition
|
||||||
|
The Licensed Work is (c) 2024
|
||||||
|
Hangzhou EMQ Technologies Co., Ltd.
|
||||||
|
Additional Use Grant: Students and educators are granted right to copy,
|
||||||
|
modify, and create derivative work for research
|
||||||
|
or education.
|
||||||
|
Change Date: 2028-05-30
|
||||||
|
Change License: Apache License, Version 2.0
|
||||||
|
|
||||||
|
For information about alternative licensing arrangements for the Software,
|
||||||
|
please contact Licensor: https://www.emqx.com/en/contact
|
||||||
|
|
||||||
|
Notice
|
||||||
|
|
||||||
|
The Business Source License (this document, or the “License”) is not an Open
|
||||||
|
Source license. However, the Licensed Work will eventually be made available
|
||||||
|
under an Open Source License, as stated in this License.
|
||||||
|
|
||||||
|
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
|
||||||
|
“Business Source License” is a trademark of MariaDB Corporation Ab.
|
||||||
|
|
||||||
|
-----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
Business Source License 1.1
|
||||||
|
|
||||||
|
Terms
|
||||||
|
|
||||||
|
The Licensor hereby grants you the right to copy, modify, create derivative
|
||||||
|
works, redistribute, and make non-production use of the Licensed Work. The
|
||||||
|
Licensor may make an Additional Use Grant, above, permitting limited
|
||||||
|
production use.
|
||||||
|
|
||||||
|
Effective on the Change Date, or the fourth anniversary of the first publicly
|
||||||
|
available distribution of a specific version of the Licensed Work under this
|
||||||
|
License, whichever comes first, the Licensor hereby grants you rights under
|
||||||
|
the terms of the Change License, and the rights granted in the paragraph
|
||||||
|
above terminate.
|
||||||
|
|
||||||
|
If your use of the Licensed Work does not comply with the requirements
|
||||||
|
currently in effect as described in this License, you must purchase a
|
||||||
|
commercial license from the Licensor, its affiliated entities, or authorized
|
||||||
|
resellers, or you must refrain from using the Licensed Work.
|
||||||
|
|
||||||
|
All copies of the original and modified Licensed Work, and derivative works
|
||||||
|
of the Licensed Work, are subject to this License. This License applies
|
||||||
|
separately for each version of the Licensed Work and the Change Date may vary
|
||||||
|
for each version of the Licensed Work released by Licensor.
|
||||||
|
|
||||||
|
You must conspicuously display this License on each original or modified copy
|
||||||
|
of the Licensed Work. If you receive the Licensed Work in original or
|
||||||
|
modified form from a third party, the terms and conditions set forth in this
|
||||||
|
License apply to your use of that work.
|
||||||
|
|
||||||
|
Any use of the Licensed Work in violation of this License will automatically
|
||||||
|
terminate your rights under this License for the current and all other
|
||||||
|
versions of the Licensed Work.
|
||||||
|
|
||||||
|
This License does not grant you any right in any trademark or logo of
|
||||||
|
Licensor or its affiliates (provided that you may use a trademark or logo of
|
||||||
|
Licensor as expressly required by this License).
|
||||||
|
|
||||||
|
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
|
||||||
|
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
|
||||||
|
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
|
||||||
|
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
|
||||||
|
TITLE.
|
||||||
|
|
||||||
|
MariaDB hereby grants you permission to use this License’s text to license
|
||||||
|
your works, and to refer to it using the trademark “Business Source License”,
|
||||||
|
as long as you comply with the Covenants of Licensor below.
|
||||||
|
|
||||||
|
Covenants of Licensor
|
||||||
|
|
||||||
|
In consideration of the right to use this License’s text and the “Business
|
||||||
|
Source License” name and trademark, Licensor covenants to MariaDB, and to all
|
||||||
|
other recipients of the licensed work to be provided by Licensor:
|
||||||
|
|
||||||
|
1. To specify as the Change License the GPL Version 2.0 or any later version,
|
||||||
|
or a license that is compatible with GPL Version 2.0 or a later version,
|
||||||
|
where “compatible” means that software provided under the Change License can
|
||||||
|
be included in a program with software provided under GPL Version 2.0 or a
|
||||||
|
later version. Licensor may specify additional Change Licenses without
|
||||||
|
limitation.
|
||||||
|
|
||||||
|
2. To either: (a) specify an additional grant of rights to use that does not
|
||||||
|
impose any additional restriction on the right granted in this License, as
|
||||||
|
the Additional Use Grant; or (b) insert the text “None”.
|
||||||
|
|
||||||
|
3. To specify a Change Date.
|
||||||
|
|
||||||
|
4. Not to modify this License in any other way.
|
|
@ -0,0 +1,9 @@
|
||||||
|
# EMQX Durable Shared Subscriptions
|
||||||
|
|
||||||
|
# Contributing
|
||||||
|
|
||||||
|
Please see our [contributing.md](../../CONTRIBUTING.md).
|
||||||
|
|
||||||
|
# License
|
||||||
|
|
||||||
|
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).
|
|
@ -0,0 +1,6 @@
|
||||||
|
%% -*- mode: erlang; -*-
|
||||||
|
|
||||||
|
{erl_opts, [debug_info]}.
|
||||||
|
{deps, [
|
||||||
|
{emqx, {path, "../../apps/emqx"}}
|
||||||
|
]}.
|
|
@ -0,0 +1,15 @@
|
||||||
|
{application, emqx_ds_shared_sub, [
|
||||||
|
{description, "EMQX DS Shared Subscriptions"},
|
||||||
|
{vsn, "0.1.0"},
|
||||||
|
{registered, [emqx_ds_shared_sub_sup]},
|
||||||
|
{mod, {emqx_ds_shared_sub_app, []}},
|
||||||
|
{applications, [
|
||||||
|
kernel,
|
||||||
|
stdlib,
|
||||||
|
emqx
|
||||||
|
]},
|
||||||
|
{env, []},
|
||||||
|
{modules, []},
|
||||||
|
|
||||||
|
{links, []}
|
||||||
|
]}.
|
|
@ -0,0 +1,156 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_ds_shared_sub_agent).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/emqx_persistent_message.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||||
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-export([
|
||||||
|
new/1,
|
||||||
|
open/2,
|
||||||
|
|
||||||
|
on_subscribe/3,
|
||||||
|
on_unsubscribe/2,
|
||||||
|
on_stream_progress/2,
|
||||||
|
on_info/2,
|
||||||
|
|
||||||
|
renew_streams/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-behaviour(emqx_persistent_session_ds_shared_subs_agent).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
new(Opts) ->
|
||||||
|
init_state(Opts).
|
||||||
|
|
||||||
|
open(TopicSubscriptions, Opts) ->
|
||||||
|
State0 = init_state(Opts),
|
||||||
|
State1 = lists:foldl(
|
||||||
|
fun({ShareTopicFilter, #{start_time := StartTime}}, State) ->
|
||||||
|
add_subscription(State, ShareTopicFilter, StartTime)
|
||||||
|
end,
|
||||||
|
State0,
|
||||||
|
TopicSubscriptions
|
||||||
|
),
|
||||||
|
State1.
|
||||||
|
|
||||||
|
on_subscribe(State0, TopicFilter, _SubOpts) ->
|
||||||
|
StartTime = now_ms(),
|
||||||
|
State1 = add_subscription(State0, TopicFilter, StartTime),
|
||||||
|
{ok, State1}.
|
||||||
|
|
||||||
|
on_unsubscribe(State, TopicFilter) ->
|
||||||
|
delete_subscription(State, TopicFilter).
|
||||||
|
|
||||||
|
renew_streams(State0) ->
|
||||||
|
State1 = do_renew_streams(State0),
|
||||||
|
{State2, StreamLeases} = stream_leases(State1),
|
||||||
|
{StreamLeases, [], State2}.
|
||||||
|
|
||||||
|
on_stream_progress(State, _StreamProgress) ->
|
||||||
|
State.
|
||||||
|
|
||||||
|
on_info(State, _Info) ->
|
||||||
|
State.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
init_state(Opts) ->
|
||||||
|
SessionId = maps:get(session_id, Opts),
|
||||||
|
SendFuns = maps:get(send_funs, Opts),
|
||||||
|
Send = maps:get(send, SendFuns),
|
||||||
|
SendAfter = maps:get(send_after, SendFuns),
|
||||||
|
#{
|
||||||
|
session_id => SessionId,
|
||||||
|
send => Send,
|
||||||
|
end_after => SendAfter,
|
||||||
|
subscriptions => #{}
|
||||||
|
}.
|
||||||
|
|
||||||
|
% send(State, Pid, Msg) ->
|
||||||
|
% Send = maps:get(send, State),
|
||||||
|
% Send(Pid, Msg).
|
||||||
|
|
||||||
|
% send_after(State, Time, Pid, Msg) ->
|
||||||
|
% SendAfter = maps:get(send_after, State),
|
||||||
|
% SendAfter(Time, Pid, Msg).
|
||||||
|
|
||||||
|
do_renew_streams(#{subscriptions := Subs0} = State0) ->
|
||||||
|
Subs1 = maps:map(
|
||||||
|
fun(
|
||||||
|
ShareTopicFilter,
|
||||||
|
#{start_time := StartTime, streams := Streams0, stream_leases := StreamLeases} = Sub
|
||||||
|
) ->
|
||||||
|
#share{topic = TopicFilterRaw} = ShareTopicFilter,
|
||||||
|
TopicFilter = emqx_topic:words(TopicFilterRaw),
|
||||||
|
{_, NewStreams} = lists:unzip(
|
||||||
|
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime)
|
||||||
|
),
|
||||||
|
{Streams1, NewLeases} = lists:foldl(
|
||||||
|
fun(Stream, {StreamsAcc, LeasesAcc}) ->
|
||||||
|
case StreamsAcc of
|
||||||
|
#{Stream := _} ->
|
||||||
|
{StreamsAcc, LeasesAcc};
|
||||||
|
_ ->
|
||||||
|
{ok, It} = emqx_ds:make_iterator(
|
||||||
|
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
|
||||||
|
),
|
||||||
|
StreamLease = #{
|
||||||
|
topic_filter => ShareTopicFilter,
|
||||||
|
stream => Stream,
|
||||||
|
iterator => It
|
||||||
|
},
|
||||||
|
{StreamsAcc#{Stream => It}, [StreamLease | LeasesAcc]}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{Streams0, []},
|
||||||
|
NewStreams
|
||||||
|
),
|
||||||
|
Sub#{streams => Streams1, stream_leases => StreamLeases ++ NewLeases}
|
||||||
|
end,
|
||||||
|
Subs0
|
||||||
|
),
|
||||||
|
State0#{subscriptions => Subs1}.
|
||||||
|
|
||||||
|
delete_subscription(#{session_id := SessionId, subscriptions := Subs0} = State0, ShareTopicFilter) ->
|
||||||
|
#share{topic = TopicFilter} = ShareTopicFilter,
|
||||||
|
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId),
|
||||||
|
Subs1 = maps:remove(ShareTopicFilter, Subs0),
|
||||||
|
State0#{subscriptions => Subs1}.
|
||||||
|
|
||||||
|
stream_leases(#{subscriptions := Subs0} = State0) ->
|
||||||
|
{Subs1, StreamLeases} = lists:foldl(
|
||||||
|
fun({TopicFilter, #{stream_leases := Leases} = Sub}, {SubsAcc, LeasesAcc}) ->
|
||||||
|
{SubsAcc#{TopicFilter => Sub#{stream_leases => []}}, [Leases | LeasesAcc]}
|
||||||
|
end,
|
||||||
|
{Subs0, []},
|
||||||
|
maps:to_list(Subs0)
|
||||||
|
),
|
||||||
|
State1 = State0#{subscriptions => Subs1},
|
||||||
|
{State1, lists:concat(StreamLeases)}.
|
||||||
|
|
||||||
|
now_ms() ->
|
||||||
|
erlang:system_time(millisecond).
|
||||||
|
|
||||||
|
add_subscription(
|
||||||
|
#{subscriptions := Subs0, session_id := SessionId} = State0, ShareTopicFilter, StartTime
|
||||||
|
) ->
|
||||||
|
#share{topic = TopicFilter} = ShareTopicFilter,
|
||||||
|
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId),
|
||||||
|
Subs1 = Subs0#{
|
||||||
|
ShareTopicFilter => #{
|
||||||
|
start_time => StartTime,
|
||||||
|
streams => #{},
|
||||||
|
stream_leases => []
|
||||||
|
}
|
||||||
|
},
|
||||||
|
State1 = State0#{subscriptions => Subs1},
|
||||||
|
State1.
|
|
@ -0,0 +1,23 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_ds_shared_sub_app).
|
||||||
|
|
||||||
|
-behaviour(application).
|
||||||
|
|
||||||
|
%% application behaviour callbacks
|
||||||
|
-export([start/2, stop/1]).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% application behaviour callbacks
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec start(application:start_type(), term()) -> {ok, pid()}.
|
||||||
|
start(_Type, _Args) ->
|
||||||
|
{ok, Sup} = emqx_ds_shared_sub_sup:start_link(),
|
||||||
|
{ok, Sup}.
|
||||||
|
|
||||||
|
-spec stop(term()) -> ok.
|
||||||
|
stop(_State) ->
|
||||||
|
ok.
|
|
@ -0,0 +1,33 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_ds_shared_sub_sup).
|
||||||
|
|
||||||
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
%% supervisor behaviour callbacks
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% API
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
start_link() ->
|
||||||
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% supervisor behaviour callbacks
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
SupFlags = #{
|
||||||
|
strategy => one_for_one,
|
||||||
|
intensity => 10,
|
||||||
|
period => 10
|
||||||
|
},
|
||||||
|
ChildSpecs = [],
|
||||||
|
{ok, {SupFlags, ChildSpecs}}.
|
|
@ -130,7 +130,8 @@
|
||||||
emqx_gateway_ocpp,
|
emqx_gateway_ocpp,
|
||||||
emqx_gateway_jt808,
|
emqx_gateway_jt808,
|
||||||
emqx_bridge_syskeeper,
|
emqx_bridge_syskeeper,
|
||||||
emqx_bridge_confluent
|
emqx_bridge_confluent,
|
||||||
|
emqx_ds_shared_sub
|
||||||
],
|
],
|
||||||
%% must always be of type `load'
|
%% must always be of type `load'
|
||||||
ce_business_apps =>
|
ce_business_apps =>
|
||||||
|
|
3
mix.exs
3
mix.exs
|
@ -202,7 +202,8 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
:emqx_gateway_gbt32960,
|
:emqx_gateway_gbt32960,
|
||||||
:emqx_gateway_ocpp,
|
:emqx_gateway_ocpp,
|
||||||
:emqx_gateway_jt808,
|
:emqx_gateway_jt808,
|
||||||
:emqx_bridge_syskeeper
|
:emqx_bridge_syskeeper,
|
||||||
|
:emqx_ds_shared_sub
|
||||||
])
|
])
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -120,6 +120,7 @@ is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
|
||||||
is_community_umbrella_app("apps/emqx_schema_validation") -> false;
|
is_community_umbrella_app("apps/emqx_schema_validation") -> false;
|
||||||
is_community_umbrella_app("apps/emqx_eviction_agent") -> false;
|
is_community_umbrella_app("apps/emqx_eviction_agent") -> false;
|
||||||
is_community_umbrella_app("apps/emqx_node_rebalance") -> false;
|
is_community_umbrella_app("apps/emqx_node_rebalance") -> false;
|
||||||
|
is_community_umbrella_app("apps/emqx_ds_shared_sub") -> false;
|
||||||
is_community_umbrella_app(_) -> true.
|
is_community_umbrella_app(_) -> true.
|
||||||
|
|
||||||
%% BUILD_WITHOUT_JQ
|
%% BUILD_WITHOUT_JQ
|
||||||
|
|
Loading…
Reference in New Issue