Merge pull request #13407 from savonarola/0701-shared-sub

Implement shared subscriptions
This commit is contained in:
Ilia Averianov 2024-07-30 16:12:13 +03:00 committed by GitHub
commit 359bc38aa4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 4124 additions and 652 deletions

View File

@ -65,9 +65,20 @@
%% Route
%%--------------------------------------------------------------------
-record(share_dest, {
session_id :: emqx_session:session_id(),
group :: emqx_types:group()
}).
-record(route, {
topic :: binary(),
dest :: node() | {binary(), node()} | emqx_session:session_id() | emqx_external_broker:dest()
dest ::
node()
| {binary(), node()}
| emqx_session:session_id()
%% One session can also have multiple subscriptions to the same topic through different groups
| #share_dest{}
| emqx_external_broker:dest()
}).
%%--------------------------------------------------------------------

View File

@ -27,6 +27,7 @@
{emqx_ds,2}.
{emqx_ds,3}.
{emqx_ds,4}.
{emqx_ds_shared_sub,1}.
{emqx_eviction_agent,1}.
{emqx_eviction_agent,2}.
{emqx_eviction_agent,3}.

View File

@ -43,7 +43,9 @@
add_shared_route/2,
delete_shared_route/2,
add_persistent_route/2,
delete_persistent_route/2
delete_persistent_route/2,
add_persistent_shared_route/3,
delete_persistent_shared_route/3
]).
-export_type([dest/0]).
@ -129,6 +131,12 @@ add_persistent_route(Topic, ID) ->
delete_persistent_route(Topic, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, ID), ok).
add_persistent_shared_route(Topic, Group, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
delete_persistent_shared_route(Topic, Group, ID) ->
?safe_with_provider(?FUNCTION_NAME(Topic, Group, ID), ok).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -621,9 +621,13 @@ handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
Session = replay_streams(Session0, ClientInfo),
{ok, [], Session};
handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) ->
S1 = emqx_persistent_session_ds_subs:gc(S0),
S2 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S2, SharedSubS0),
%% `gc` and `renew_streams` methods may drop unsubscribed streams.
%% Shared subscription handler must have a chance to see unsubscribed streams
%% in the fully replayed state.
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:pre_renew_streams(S0, SharedSubS0),
S2 = emqx_persistent_session_ds_subs:gc(S1),
S3 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2),
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S3, SharedSubS1),
Interval = get_config(ClientInfo, [renew_streams_interval]),
Session = emqx_session:ensure_timer(
?TIMER_GET_STREAMS,
@ -757,7 +761,7 @@ skip_batch(StreamKey, SRS0, Session = #{s := S0}, ClientInfo, Reason) ->
%%--------------------------------------------------------------------
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
disconnect(Session = #{id := Id, s := S0, shared_sub_s := SharedSubS0}, ConnInfo) ->
S1 = maybe_set_offline_info(S0, Id),
S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1),
S3 =
@ -767,8 +771,9 @@ disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
_ ->
S2
end,
S = emqx_persistent_session_ds_state:commit(S3),
{shutdown, Session#{s => S}}.
{S4, SharedSubS} = emqx_persistent_session_ds_shared_subs:on_disconnect(S3, SharedSubS0),
S = emqx_persistent_session_ds_state:commit(S4),
{shutdown, Session#{s => S, shared_sub_s => SharedSubS}}.
-spec terminate(Reason :: term(), session()) -> ok.
terminate(_Reason, Session = #{id := Id, s := S}) ->
@ -816,10 +821,12 @@ list_client_subscriptions(ClientId) ->
{error, not_found}
end.
-spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) ->
-spec get_client_subscription(emqx_types:clientid(), topic_filter() | share_topic_filter()) ->
subscription() | undefined.
get_client_subscription(ClientId, Topic) ->
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic).
get_client_subscription(ClientId, #share{} = ShareTopicFilter) ->
emqx_persistent_session_ds_shared_subs:cold_get_subscription(ClientId, ShareTopicFilter);
get_client_subscription(ClientId, TopicFilter) ->
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, TopicFilter).
%%--------------------------------------------------------------------
%% Session tables operations
@ -986,14 +993,14 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
%% Normal replay:
%%--------------------------------------------------------------------
fetch_new_messages(Session0 = #{s := S0}, ClientInfo) ->
LFS = maps:get(last_fetched_stream, Session0, beginning),
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S0),
fetch_new_messages(Session0 = #{s := S0, shared_sub_s := SharedSubS0}, ClientInfo) ->
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0),
Session1 = Session0#{s => S1, shared_sub_s => SharedSubS1},
LFS = maps:get(last_fetched_stream, Session1, beginning),
ItStream = emqx_persistent_session_ds_stream_scheduler:iter_next_streams(LFS, S1),
BatchSize = get_config(ClientInfo, [batch_size]),
Session1 = fetch_new_messages(ItStream, BatchSize, Session0, ClientInfo),
#{s := S1, shared_sub_s := SharedSubS0} = Session1,
{S2, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replayed(S1, SharedSubS0),
Session1#{s => S2, shared_sub_s => SharedSubS1}.
Session2 = fetch_new_messages(ItStream, BatchSize, Session1, ClientInfo),
Session2#{shared_sub_s => SharedSubS1}.
fetch_new_messages(ItStream0, BatchSize, Session0, ClientInfo) ->
#{inflight := Inflight} = Session0,

View File

@ -17,7 +17,7 @@
-module(emqx_persistent_session_ds_router).
-include("emqx.hrl").
-include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
-include("emqx_ps_ds_int.hrl").
-export([init_tables/0]).
@ -47,7 +47,7 @@
-endif.
-type route() :: #ps_route{}.
-type dest() :: emqx_persistent_session_ds:id().
-type dest() :: emqx_persistent_session_ds:id() | #share_dest{}.
-export_type([dest/0, route/0]).
@ -161,7 +161,7 @@ topics() ->
print_routes(Topic) ->
lists:foreach(
fun(#ps_route{topic = To, dest = Dest}) ->
io:format("~ts -> ~ts~n", [To, Dest])
io:format("~ts -> ~tp~n", [To, Dest])
end,
match_routes(Topic)
).
@ -247,6 +247,8 @@ mk_filtertab_fold_fun(FoldFun) ->
match_filters(Topic) ->
emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []).
get_dest_session_id(#share_dest{session_id = DSSessionId}) ->
DSSessionId;
get_dest_session_id({_, DSSessionId}) ->
DSSessionId;
get_dest_session_id(DSSessionId) ->

View File

@ -2,11 +2,37 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% @doc This module
%% * handles creation and management of _shared_ subscriptions for the session;
%% * provides streams to the session;
%% * handles progress of stream replay.
%%
%% The logic is quite straightforward; most of the parts resemble the logic of the
%% `emqx_persistent_session_ds_subs` (subscribe/unsubscribe) and
%% `emqx_persistent_session_ds_scheduler` (providing new streams),
%% but some data is sent or received from the `emqx_persistent_session_ds_shared_subs_agent`
%% which communicates with remote shared subscription leaders.
%%
%% A tricky part is the concept of "scheduled actions". When we unsubscribe from a topic
%% we may have some streams that have unacked messages. So we do not have a reliable
%% progress for them. Sending the current progress to the leader and disconnecting
%% will lead to the duplication of messages. So after unsubscription, we need to wait
%% some time until all streams are acked, and only then we disconnect from the leader.
%%
%% For this purpose we have the `scheduled_actions` map in the state of the module.
%% We preserve there the streams that we need to wait for and collect their progress.
%% We also use `scheduled_actions` for resubscriptions. If a client quickly resubscribes
%% after unsubscription, we may still have the mentioned streams unacked. If we abandon
%% them, just connect to the leader, then it may lease us the same streams again, but with
%% the previous progress. So messages may duplicate.
-module(emqx_persistent_session_ds_shared_subs).
-include("emqx_mqtt.hrl").
-include("emqx.hrl").
-include("logger.hrl").
-include("session_internals.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-export([
@ -15,16 +41,51 @@
on_subscribe/3,
on_unsubscribe/4,
on_disconnect/2,
on_streams_replayed/2,
on_streams_replay/2,
on_info/3,
pre_renew_streams/2,
renew_streams/2,
to_map/2
]).
%% Management API:
-export([
cold_get_subscription/2
]).
-define(schedule_subscribe, schedule_subscribe).
-define(schedule_unsubscribe, schedule_unsubscribe).
-type stream_key() :: {emqx_persistent_session_ds:id(), emqx_ds:stream()}.
-type scheduled_action_type() ::
{?schedule_subscribe, emqx_types:subopts()} | ?schedule_unsubscribe.
-type agent_stream_progress() :: #{
stream := emqx_ds:stream(),
progress := progress(),
use_finished := boolean()
}.
-type progress() ::
#{
iterator := emqx_ds:iterator()
}.
-type scheduled_action() :: #{
type := scheduled_action_type(),
stream_keys_to_wait := [stream_key()],
progresses := [agent_stream_progress()]
}.
-type t() :: #{
agent := emqx_persistent_session_ds_shared_subs_agent:t()
agent := emqx_persistent_session_ds_shared_subs_agent:t(),
scheduled_actions := #{
share_topic_filter() => scheduled_action()
}
}.
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{
@ -34,184 +95,90 @@
-define(rank_x, rank_shared).
-define(rank_y, 0).
-export_type([
progress/0,
agent_stream_progress/0
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% new
-spec new(opts()) -> t().
new(Opts) ->
#{
agent => emqx_persistent_session_ds_shared_subs_agent:new(
agent_opts(Opts)
)
),
scheduled_actions => #{}
}.
%%--------------------------------------------------------------------
%% open
-spec open(emqx_persistent_session_ds_state:t(), opts()) ->
{ok, emqx_persistent_session_ds_state:t(), t()}.
open(S, Opts) ->
open(S0, Opts) ->
SharedSubscriptions = fold_shared_subs(
fun(#share{} = TopicFilter, Sub, Acc) ->
[{TopicFilter, to_agent_subscription(S, Sub)} | Acc]
fun(#share{} = ShareTopicFilter, Sub, Acc) ->
[{ShareTopicFilter, to_agent_subscription(S0, Sub)} | Acc]
end,
[],
S
S0
),
Agent = emqx_persistent_session_ds_shared_subs_agent:open(
SharedSubscriptions, agent_opts(Opts)
),
SharedSubS = #{agent => Agent},
{ok, S, SharedSubS}.
SharedSubS = #{agent => Agent, scheduled_actions => #{}},
S1 = revoke_all_streams(S0),
{ok, S1, SharedSubS}.
%%--------------------------------------------------------------------
%% on_subscribe
-spec on_subscribe(
share_topic_filter(),
emqx_types:subopts(),
emqx_persistent_session_ds:session()
) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}.
on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) ->
Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S),
on_subscribe(Subscription, TopicFilter, SubOpts, Session).
-spec on_unsubscribe(
emqx_persistent_session_ds:id(),
emqx_persistent_session_ds:topic_filter(),
emqx_persistent_session_ds_state:t(),
t()
) ->
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
| {error, emqx_types:reason_code()}.
on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) ->
case lookup(TopicFilter, S0) of
undefined ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
Subscription ->
?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, topic_filter => TopicFilter
}),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
Agent0, TopicFilter
),
SharedSubS = SharedSubS0#{agent => Agent1},
S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0),
{ok, S, SharedSubS, Subscription}
end.
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
Agent0
),
?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}),
S1 = lists:foldl(
fun
(#{type := lease} = Event, S) -> accept_stream(Event, S);
(#{type := revoke} = Event, S) -> revoke_stream(Event, S)
end,
S0,
StreamLeaseEvents
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S1, SharedSubS1}.
-spec on_streams_replayed(
emqx_persistent_session_ds_state:t(),
t()
) -> {emqx_persistent_session_ds_state:t(), t()}.
on_streams_replayed(S, #{agent := Agent0} = SharedSubS0) ->
%% TODO
%% Is it sufficient for a report?
Progress = fold_shared_stream_states(
fun(TopicFilter, Stream, SRS, Acc) ->
#srs{it_begin = BeginIt} = SRS,
StreamProgress = #{
topic_filter => TopicFilter,
stream => Stream,
iterator => BeginIt
},
[StreamProgress | Acc]
end,
[],
S
),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, Progress
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
{emqx_persistent_session_ds_state:t(), t()}.
on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
to_map(_S, _SharedSubS) ->
%% TODO
#{}.
on_subscribe(#share{} = ShareTopicFilter, SubOpts, #{s := S} = Session) ->
Subscription = emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S),
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%% on_subscribe internal functions
fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions(
fun
(#share{} = TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0);
(_, _Sub, Acc0) -> Acc0
end,
Acc,
S
).
fold_shared_stream_states(Fun, Acc, S) ->
%% TODO
%% Optimize or cache
TopicFilters = fold_shared_subs(
fun
(#share{} = TopicFilter, #{id := Id} = _Sub, Acc0) ->
Acc0#{Id => TopicFilter};
(_, _, Acc0) ->
Acc0
end,
#{},
S
),
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, Stream}, SRS, Acc0) ->
case TopicFilters of
#{SubId := TopicFilter} ->
Fun(TopicFilter, Stream, SRS, Acc0);
_ ->
Acc0
end
end,
Acc,
S
).
on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
on_subscribe(undefined, ShareTopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
#{max_subscriptions := MaxSubscriptions} = Props,
case emqx_persistent_session_ds_state:n_subscriptions(S) < MaxSubscriptions of
true ->
create_new_subscription(TopicFilter, SubOpts, Session);
create_new_subscription(ShareTopicFilter, SubOpts, Session);
false ->
{error, ?RC_QUOTA_EXCEEDED}
end;
on_subscribe(Subscription, TopicFilter, SubOpts, Session) ->
update_subscription(Subscription, TopicFilter, SubOpts, Session).
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session) ->
update_subscription(Subscription, ShareTopicFilter, SubOpts, Session).
-dialyzer({nowarn_function, create_new_subscription/3}).
create_new_subscription(TopicFilter, SubOpts, #{
id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props
create_new_subscription(#share{topic = TopicFilter, group = Group} = ShareTopicFilter, SubOpts, #{
id := SessionId,
s := S0,
shared_sub_s := #{agent := Agent} = SharedSubS0,
props := Props
}) ->
case
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent0, TopicFilter, SubOpts
emqx_persistent_session_ds_shared_subs_agent:can_subscribe(
Agent, ShareTopicFilter, SubOpts
)
of
{ok, Agent1} ->
ok ->
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, #share_dest{
session_id = SessionId, group = Group
}),
_ = emqx_external_broker:add_persistent_shared_route(TopicFilter, Group, SessionId),
#{upgrade_qos := UpgradeQoS} = Props,
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
@ -227,20 +194,20 @@ create_new_subscription(TopicFilter, SubOpts, #{
start_time => now_ms()
},
S = emqx_persistent_session_ds_state:put_subscription(
TopicFilter, Subscription, S3
ShareTopicFilter, Subscription, S3
),
SharedSubS = SharedSubS0#{agent => Agent1},
?tp(persistent_session_ds_shared_subscription_added, #{
topic_filter => TopicFilter, session => SessionId
}),
SharedSubS = schedule_subscribe(SharedSubS0, ShareTopicFilter, SubOpts),
{ok, S, SharedSubS};
{error, _} = Error ->
Error
end.
update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilter, SubOpts, #{
s := S0, shared_sub_s := SharedSubS, props := Props
}) ->
update_subscription(
#{current_state := SStateId0, id := SubId} = Sub0, ShareTopicFilter, SubOpts, #{
s := S0, shared_sub_s := SharedSubS, props := Props
}
) ->
#{upgrade_qos := UpgradeQoS} = Props,
SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
@ -254,36 +221,173 @@ update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilt
SStateId, SState, S1
),
Sub = Sub0#{current_state => SStateId},
S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
S = emqx_persistent_session_ds_state:put_subscription(ShareTopicFilter, Sub, S2),
{ok, S, SharedSubS}
end.
lookup(TopicFilter, S) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of
Sub = #{current_state := SStateId} ->
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
#{subopts := SubOpts} ->
Sub#{subopts => SubOpts};
undefined ->
undefined
end;
-dialyzer({nowarn_function, schedule_subscribe/3}).
schedule_subscribe(
#{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0,
ShareTopicFilter,
SubOpts
) ->
case ScheduledActions0 of
#{ShareTopicFilter := ScheduledAction} ->
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
},
?tp(warning, shared_subs_schedule_subscribe_override, #{
share_topic_filter => ShareTopicFilter,
new_type => {?schedule_subscribe, SubOpts},
old_action => format_schedule_action(ScheduledAction)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1};
_ ->
?tp(warning, shared_subs_schedule_subscribe_new, #{
share_topic_filter => ShareTopicFilter, subopts => SubOpts
}),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent0, ShareTopicFilter, SubOpts
),
SharedSubS0#{agent => Agent1}
end.
%%--------------------------------------------------------------------
%% on_unsubscribe
-spec on_unsubscribe(
emqx_persistent_session_ds:id(),
share_topic_filter(),
emqx_persistent_session_ds_state:t(),
t()
) ->
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
| {error, emqx_types:reason_code()}.
on_unsubscribe(
SessionId, #share{topic = TopicFilter, group = Group} = ShareTopicFilter, S0, SharedSubS0
) ->
case lookup(ShareTopicFilter, S0) of
undefined ->
undefined
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
#{id := SubId} = Subscription ->
?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, share_topic_filter => ShareTopicFilter
}),
_ = emqx_external_broker:delete_persistent_shared_route(TopicFilter, Group, SessionId),
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, #share_dest{
session_id = SessionId, group = Group
}),
S = emqx_persistent_session_ds_state:del_subscription(ShareTopicFilter, S0),
SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, ShareTopicFilter),
{ok, S, SharedSubS, Subscription}
end.
%%--------------------------------------------------------------------
%% on_unsubscribe internal functions
schedule_unsubscribe(
S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, ShareTopicFilter
) ->
case ScheduledActions0 of
#{ShareTopicFilter := ScheduledAction0} ->
ScheduledAction1 = ScheduledAction0#{type => ?schedule_unsubscribe},
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => ScheduledAction1
},
?tp(warning, shared_subs_schedule_unsubscribe_override, #{
share_topic_filter => ShareTopicFilter,
new_type => ?schedule_unsubscribe,
old_action => format_schedule_action(ScheduledAction0)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1};
_ ->
StreamKeys = stream_keys_by_sub_id(S, UnsubscridedSubId),
ScheduledActions1 = ScheduledActions0#{
ShareTopicFilter => #{
type => ?schedule_unsubscribe,
stream_keys_to_wait => StreamKeys,
progresses => []
}
},
?tp(warning, shared_subs_schedule_unsubscribe_new, #{
share_topic_filter => ShareTopicFilter,
stream_keys => format_stream_keys(StreamKeys)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1}
end.
%%--------------------------------------------------------------------
%% pre_renew_streams
-spec pre_renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
pre_renew_streams(S, SharedSubS) ->
on_streams_replay(S, SharedSubS).
%%--------------------------------------------------------------------
%% renew_streams
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}.
renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = SharedSubS0) ->
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
Agent0
),
StreamLeaseEvents =/= [] andalso
?tp(warning, shared_subs_new_stream_lease_events, #{
stream_lease_events => format_lease_events(StreamLeaseEvents)
}),
S1 = lists:foldl(
fun
(#{type := lease} = Event, S) -> accept_stream(Event, S, ScheduledActions);
(#{type := revoke} = Event, S) -> revoke_stream(Event, S)
end,
S0,
StreamLeaseEvents
),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S1, SharedSubS1}.
%%--------------------------------------------------------------------
%% renew_streams internal functions
accept_stream(#{share_topic_filter := ShareTopicFilter} = Event, S, ScheduledActions) ->
%% If we have a pending action (subscribe or unsubscribe) for this topic filter,
%% we should not accept a stream and start replaying it. We won't use it anyway:
%% * if subscribe is pending, we will reset agent obtain a new lease
%% * if unsubscribe is pending, we will drop connection
case ScheduledActions of
#{ShareTopicFilter := _Action} ->
S;
_ ->
accept_stream(Event, S)
end.
accept_stream(
#{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0
#{
share_topic_filter := ShareTopicFilter,
stream := Stream,
progress := #{iterator := Iterator} = _Progress
} = _Event,
S0
) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
undefined ->
%% This should not happen.
%% Agent should have received unsubscribe callback
%% and should not have passed this stream as a new one
error(new_stream_without_sub);
%% We unsubscribed
S0;
#{id := SubId, current_state := SStateId} ->
Key = {SubId, Stream},
case emqx_persistent_session_ds_state:get_stream(Key, S0) of
undefined ->
NeedCreateStream =
case emqx_persistent_session_ds_state:get_stream(Key, S0) of
undefined ->
true;
#srs{unsubscribed = true} ->
true;
_SRS ->
false
end,
case NeedCreateStream of
true ->
NewSRS =
#srs{
rank_x = ?rank_x,
@ -294,15 +398,15 @@ accept_stream(
},
S1 = emqx_persistent_session_ds_state:put_stream(Key, NewSRS, S0),
S1;
_SRS ->
false ->
S0
end
end.
revoke_stream(
#{topic_filter := TopicFilter, stream := Stream}, S0
#{share_topic_filter := ShareTopicFilter, stream := Stream}, S0
) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
undefined ->
%% This should not happen.
%% Agent should have received unsubscribe callback
@ -320,19 +424,363 @@ revoke_stream(
end
end.
-spec to_agent_subscription(
emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()
%%--------------------------------------------------------------------
%% on_streams_replay
-spec on_streams_replay(
emqx_persistent_session_ds_state:t(),
t()
) -> {emqx_persistent_session_ds_state:t(), t()}.
on_streams_replay(S0, SharedSubS0) ->
{S1, #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS1} =
renew_streams(S0, SharedSubS0),
Progresses = all_stream_progresses(S1, Agent0),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, Progresses
),
{Agent2, ScheduledActions1} = run_scheduled_actions(S1, Agent1, ScheduledActions0),
SharedSubS2 = SharedSubS1#{
agent => Agent2,
scheduled_actions => ScheduledActions1
},
{S1, SharedSubS2}.
%%--------------------------------------------------------------------
%% on_streams_replay internal functions
all_stream_progresses(S, Agent) ->
all_stream_progresses(S, Agent, _NeedUnacked = false).
all_stream_progresses(S, _Agent, NeedUnacked) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
fold_shared_stream_states(
fun(ShareTopicFilter, Stream, SRS, ProgressesAcc0) ->
case
is_stream_started(CommQos1, CommQos2, SRS) and
(NeedUnacked or is_stream_fully_acked(CommQos1, CommQos2, SRS))
of
true ->
StreamProgress = stream_progress(CommQos1, CommQos2, Stream, SRS),
maps:update_with(
ShareTopicFilter,
fun(Progresses) -> [StreamProgress | Progresses] end,
[StreamProgress],
ProgressesAcc0
);
false ->
ProgressesAcc0
end
end,
#{},
S
).
run_scheduled_actions(S, Agent, ScheduledActions) ->
maps:fold(
fun(ShareTopicFilter, Action0, {AgentAcc0, ScheduledActionsAcc}) ->
case run_scheduled_action(S, AgentAcc0, ShareTopicFilter, Action0) of
{ok, AgentAcc1} ->
{AgentAcc1, maps:remove(ShareTopicFilter, ScheduledActionsAcc)};
{continue, Action1} ->
{AgentAcc0, ScheduledActionsAcc#{ShareTopicFilter => Action1}}
end
end,
{Agent, ScheduledActions},
ScheduledActions
).
run_scheduled_action(
S,
Agent0,
ShareTopicFilter,
#{type := Type, stream_keys_to_wait := StreamKeysToWait0, progresses := Progresses0} = Action
) ->
emqx_persistent_session_ds_shared_subs_agent:subscription().
to_agent_subscription(_S, Subscription) ->
StreamKeysToWait1 = filter_unfinished_streams(S, StreamKeysToWait0),
Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0,
case StreamKeysToWait1 of
[] ->
?tp(warning, shared_subs_schedule_action_complete, #{
share_topic_filter => ShareTopicFilter,
progresses => format_stream_progresses(Progresses1),
type => Type
}),
%% Regular progress won't se unsubscribed streams, so we need to
%% send the progress explicitly.
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, #{ShareTopicFilter => Progresses1}
),
case Type of
{?schedule_subscribe, SubOpts} ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent1, ShareTopicFilter, SubOpts
)};
?schedule_unsubscribe ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
Agent1, ShareTopicFilter, Progresses1
)}
end;
_ ->
Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
?tp(warning, shared_subs_schedule_action_continue, #{
share_topic_filter => ShareTopicFilter,
new_action => format_schedule_action(Action1)
}),
{continue, Action1}
end.
filter_unfinished_streams(S, StreamKeysToWait) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
lists:filter(
fun(Key) ->
case emqx_persistent_session_ds_state:get_stream(Key, S) of
undefined ->
%% This should not happen: we should see any stream
%% in completed state before deletion
true;
SRS ->
not is_stream_fully_acked(CommQos1, CommQos2, SRS)
end
end,
StreamKeysToWait
).
stream_progresses(S, StreamKeys) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
lists:map(
fun({_SubId, Stream} = Key) ->
SRS = emqx_persistent_session_ds_state:get_stream(Key, S),
stream_progress(CommQos1, CommQos2, Stream, SRS)
end,
StreamKeys
).
%%--------------------------------------------------------------------
%% on_disconnect
on_disconnect(S0, #{agent := Agent0} = SharedSubS0) ->
S1 = revoke_all_streams(S0),
Progresses = all_stream_progresses(S1, Agent0, _NeedUnacked = true),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses),
SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}},
{S1, SharedSubS1}.
%%--------------------------------------------------------------------
%% on_disconnect helpers
revoke_all_streams(S0) ->
fold_shared_stream_states(
fun(ShareTopicFilter, Stream, _SRS, S) ->
revoke_stream(#{share_topic_filter => ShareTopicFilter, stream => Stream}, S)
end,
S0,
S0
).
%%--------------------------------------------------------------------
%% on_info
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
{emqx_persistent_session_ds_state:t(), t()}.
on_info(S, #{agent := Agent0} = SharedSubS0, Info) ->
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_info(Agent0, Info),
SharedSubS1 = SharedSubS0#{agent => Agent1},
{S, SharedSubS1}.
%%--------------------------------------------------------------------
%% to_map
-spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map().
to_map(S, _SharedSubS) ->
fold_shared_subs(
fun(ShareTopicFilter, _, Acc) -> Acc#{ShareTopicFilter => lookup(ShareTopicFilter, S)} end,
#{},
S
).
%%--------------------------------------------------------------------
%% cold_get_subscription
-spec cold_get_subscription(emqx_persistent_session_ds:id(), share_topic_filter()) ->
emqx_persistent_session_ds:subscription() | undefined.
cold_get_subscription(SessionId, ShareTopicFilter) ->
case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, ShareTopicFilter) of
[Sub = #{current_state := SStateId}] ->
case
emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId)
of
[#{subopts := Subopts}] ->
Sub#{subopts => Subopts};
_ ->
undefined
end;
_ ->
undefined
end.
%%--------------------------------------------------------------------
%% Generic helpers
%%--------------------------------------------------------------------
lookup(ShareTopicFilter, S) ->
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S) of
Sub = #{current_state := SStateId} ->
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
#{subopts := SubOpts} ->
Sub#{subopts => SubOpts};
undefined ->
undefined
end;
undefined ->
undefined
end.
stream_keys_by_sub_id(S, MatchSubId) ->
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, _Stream} = StreamKey, _SRS, StreamKeys) ->
case SubId of
MatchSubId ->
[StreamKey | StreamKeys];
_ ->
StreamKeys
end
end,
[],
S
).
stream_progress(
CommQos1,
CommQos2,
Stream,
#srs{
it_end = EndIt,
it_begin = BeginIt
} = SRS
) ->
Iterator =
case is_stream_fully_acked(CommQos1, CommQos2, SRS) of
true -> EndIt;
false -> BeginIt
end,
#{
stream => Stream,
progress => #{
iterator => Iterator
},
use_finished => is_use_finished(SRS)
}.
fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions(
fun
(#share{} = ShareTopicFilter, Sub, Acc0) -> Fun(ShareTopicFilter, Sub, Acc0);
(_, _Sub, Acc0) -> Acc0
end,
Acc,
S
).
fold_shared_stream_states(Fun, Acc, S) ->
%% TODO
%% do we need anything from sub state?
%% Optimize or cache
ShareTopicFilters = fold_shared_subs(
fun
(#share{} = ShareTopicFilter, #{id := Id} = _Sub, Acc0) ->
Acc0#{Id => ShareTopicFilter};
(_, _, Acc0) ->
Acc0
end,
#{},
S
),
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, Stream}, SRS, Acc0) ->
case ShareTopicFilters of
#{SubId := ShareTopicFilter} ->
Fun(ShareTopicFilter, Stream, SRS, Acc0);
_ ->
Acc0
end
end,
Acc,
S
).
to_agent_subscription(_S, Subscription) ->
maps:with([start_time], Subscription).
-spec agent_opts(opts()) -> emqx_persistent_session_ds_shared_subs_agent:opts().
agent_opts(#{session_id := SessionId}) ->
#{session_id => SessionId}.
-dialyzer({nowarn_function, now_ms/0}).
now_ms() ->
erlang:system_time(millisecond).
is_use_finished(#srs{unsubscribed = Unsubscribed}) ->
Unsubscribed.
is_stream_started(CommQos1, CommQos2, #srs{first_seqno_qos1 = Q1, last_seqno_qos1 = Q2}) ->
(CommQos1 >= Q1) or (CommQos2 >= Q2).
is_stream_fully_acked(_, _, #srs{
first_seqno_qos1 = Q1, last_seqno_qos1 = Q1, first_seqno_qos2 = Q2, last_seqno_qos2 = Q2
}) ->
%% Streams where the last chunk doesn't contain any QoS1 and 2
%% messages are considered fully acked:
true;
is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
(Comm1 >= S1) andalso (Comm2 >= S2).
%%--------------------------------------------------------------------
%% Formatters
%%--------------------------------------------------------------------
format_schedule_action(#{
type := Type, progresses := Progresses, stream_keys_to_wait := StreamKeysToWait
}) ->
#{
type => Type,
progresses => format_stream_progresses(Progresses),
stream_keys_to_wait => format_stream_keys(StreamKeysToWait)
}.
format_stream_progresses(Streams) ->
lists:map(
fun format_stream_progress/1,
Streams
).
format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
format_progress(#{iterator := Iterator} = Progress) ->
Progress#{iterator => format_opaque(Iterator)}.
format_stream_key(beginning) -> beginning;
format_stream_key({SubId, Stream}) -> {SubId, format_opaque(Stream)}.
format_stream_keys(StreamKeys) ->
lists:map(
fun format_stream_key/1,
StreamKeys
).
format_lease_events(Events) ->
lists:map(
fun format_lease_event/1,
Events
).
format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
format_lease_event(#{stream := Stream} = Event) ->
Event#{stream => format_opaque(Stream)}.
format_opaque(Opaque) ->
erlang:phash2(Opaque).

View File

@ -15,7 +15,7 @@
}.
-type t() :: term().
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{
session_id := session_id()
@ -28,41 +28,44 @@
-type stream_lease() :: #{
type => lease,
%% Used as "external" subscription_id
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
}.
-type stream_revoke() :: #{
type => revoke,
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream()
}.
-type stream_lease_event() :: stream_lease() | stream_revoke().
-type stream_progress() :: #{
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
iterator := emqx_ds:iterator(),
use_finished := boolean()
}.
-export_type([
t/0,
subscription/0,
session_id/0,
stream_lease/0,
stream_lease_event/0,
opts/0
]).
-export([
new/1,
open/2,
can_subscribe/3,
on_subscribe/3,
on_unsubscribe/2,
on_unsubscribe/3,
on_stream_progress/2,
on_info/2,
on_disconnect/2,
renew_streams/1
]).
@ -77,12 +80,13 @@
%%--------------------------------------------------------------------
-callback new(opts()) -> t().
-callback open([{topic_filter(), subscription()}], opts()) -> t().
-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
{ok, t()} | {error, term()}.
-callback on_unsubscribe(t(), topic_filter()) -> t().
-callback open([{share_topic_filter(), subscription()}], opts()) -> t().
-callback can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
-callback on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
-callback on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
-callback on_disconnect(t(), [stream_progress()]) -> t().
-callback renew_streams(t()) -> {[stream_lease_event()], t()}.
-callback on_stream_progress(t(), [stream_progress()]) -> t().
-callback on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
-callback on_info(t(), term()) -> t().
%%--------------------------------------------------------------------
@ -93,24 +97,31 @@
new(Opts) ->
?shared_subs_agent:new(Opts).
-spec open([{topic_filter(), subscription()}], opts()) -> t().
-spec open([{share_topic_filter(), subscription()}], opts()) -> t().
open(Topics, Opts) ->
?shared_subs_agent:open(Topics, Opts).
-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) ->
{ok, t()} | {error, emqx_types:reason_code()}.
on_subscribe(Agent, TopicFilter, SubOpts) ->
?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts).
-spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
can_subscribe(Agent, ShareTopicFilter, SubOpts) ->
?shared_subs_agent:can_subscribe(Agent, ShareTopicFilter, SubOpts).
-spec on_unsubscribe(t(), topic_filter()) -> t().
on_unsubscribe(Agent, TopicFilter) ->
?shared_subs_agent:on_unsubscribe(Agent, TopicFilter).
-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
on_subscribe(Agent, ShareTopicFilter, SubOpts) ->
?shared_subs_agent:on_subscribe(Agent, ShareTopicFilter, SubOpts).
-spec on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses) ->
?shared_subs_agent:on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses).
-spec on_disconnect(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
on_disconnect(Agent, StreamProgresses) ->
?shared_subs_agent:on_disconnect(Agent, StreamProgresses).
-spec renew_streams(t()) -> {[stream_lease_event()], t()}.
renew_streams(Agent) ->
?shared_subs_agent:renew_streams(Agent).
-spec on_stream_progress(t(), [stream_progress()]) -> t().
-spec on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
on_stream_progress(Agent, StreamProgress) ->
?shared_subs_agent:on_stream_progress(Agent, StreamProgress).

View File

@ -9,11 +9,13 @@
-export([
new/1,
open/2,
can_subscribe/3,
on_subscribe/3,
on_unsubscribe/2,
on_unsubscribe/3,
on_stream_progress/2,
on_info/2,
on_disconnect/2,
renew_streams/1
]).
@ -30,10 +32,16 @@ new(_Opts) ->
open(_Topics, _Opts) ->
undefined.
on_subscribe(_Agent, _TopicFilter, _SubOpts) ->
can_subscribe(_Agent, _TopicFilter, _SubOpts) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}.
on_unsubscribe(Agent, _TopicFilter) ->
on_subscribe(Agent, _TopicFilter, _SubOpts) ->
Agent.
on_unsubscribe(Agent, _TopicFilter, _Progresses) ->
Agent.
on_disconnect(Agent, _) ->
Agent.
renew_streams(Agent) ->

View File

@ -399,7 +399,9 @@ new_id(Rec) ->
get_subscription(TopicFilter, Rec) ->
gen_get(?subscriptions, TopicFilter, Rec).
-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
-spec cold_get_subscription(
emqx_persistent_session_ds:id(), emqx_types:topic() | emqx_types:share()
) ->
[emqx_persistent_session_ds_subs:subscription()].
cold_get_subscription(SessionId, Topic) ->
kv_pmap_read(?subscription_tab, SessionId, Topic).

View File

@ -21,7 +21,7 @@
-record(ps_route, {
topic :: binary(),
dest :: emqx_persistent_session_ds:id() | '_'
dest :: emqx_persistent_session_ds_router:dest() | '_'
}).
-record(ps_routeidx, {

View File

@ -21,6 +21,7 @@
%% Till full implementation we need to dispach to the null agent.
%% It will report "not implemented" error for attempts to use shared subscriptions.
-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
% -define(shared_subs_agent, emqx_ds_shared_sub_agent).
%% end of -ifdef(TEST).
-endif.

View File

@ -391,6 +391,8 @@ default_appspec(emqx_schema_validation, _SuiteOpts) ->
#{schema_mod => emqx_schema_validation_schema, config => #{}};
default_appspec(emqx_message_transformation, _SuiteOpts) ->
#{schema_mod => emqx_message_transformation_schema, config => #{}};
default_appspec(emqx_ds_shared_sub, _SuiteOpts) ->
#{schema_mod => emqx_ds_shared_sub_schema, config => #{}};
default_appspec(_, _) ->
#{}.

View File

@ -21,3 +21,6 @@
-define(METRIC_NAME, cluster_link).
-define(route_metric, 'routes').
-define(PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID),
<<"$sp/", Group/binary, "/", ID/binary, "/", Topic/binary>>
).

View File

@ -16,6 +16,8 @@
delete_shared_route/2,
add_persistent_route/2,
delete_persistent_route/2,
add_persistent_shared_route/3,
delete_persistent_shared_route/3,
forward/1
]).
@ -71,6 +73,16 @@ add_persistent_route(Topic, ID) ->
delete_persistent_route(Topic, ID) ->
maybe_push_route_op(delete, Topic, ?PERSISTENT_ROUTE_ID(Topic, ID), push_persistent_route).
add_persistent_shared_route(Topic, Group, ID) ->
maybe_push_route_op(
add, Topic, ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID), push_persistent_route
).
delete_persistent_shared_route(Topic, Group, ID) ->
maybe_push_route_op(
delete, Topic, ?PERSISTENT_SHARED_ROUTE_ID(Topic, Group, ID), push_persistent_route
).
forward(#delivery{message = #message{extra = #{link_origin := _}}}) ->
%% Do not forward any external messages to other links.
%% Only forward locally originated messages to all the relevant links, i.e. no gossip

View File

@ -3,6 +3,7 @@
%%--------------------------------------------------------------------
-module(emqx_cluster_link_router_bootstrap).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_router.hrl").
-include_lib("emqx/include/emqx_shared_sub.hrl").
-include_lib("emqx/src/emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
@ -67,7 +68,7 @@ routes_by_topic(Topics, _IsPersistentRoute = true) ->
lists:foldl(
fun(T, Acc) ->
Routes = emqx_persistent_session_ds_router:lookup_routes(T),
[encode_route(T, ?PERSISTENT_ROUTE_ID(T, D)) || #ps_route{dest = D} <- Routes] ++ Acc
[encode_route(T, ps_route_id(PSRoute)) || #ps_route{} = PSRoute <- Routes] ++ Acc
end,
[],
Topics
@ -79,17 +80,22 @@ routes_by_wildcards(Wildcards, _IsPersistentRoute = false) ->
Routes ++ SharedRoutes;
routes_by_wildcards(Wildcards, _IsPersistentRoute = true) ->
emqx_persistent_session_ds_router:foldl_routes(
fun(#ps_route{dest = D, topic = T}, Acc) ->
fun(#ps_route{topic = T} = PSRoute, Acc) ->
case topic_intersect_any(T, Wildcards) of
false ->
Acc;
Intersec ->
[encode_route(Intersec, ?PERSISTENT_ROUTE_ID(T, D)) | Acc]
[encode_route(Intersec, ps_route_id(PSRoute)) | Acc]
end
end,
[]
).
ps_route_id(#ps_route{topic = T, dest = #share_dest{group = Group, session_id = SessionId}}) ->
?PERSISTENT_SHARED_ROUTE_ID(T, Group, SessionId);
ps_route_id(#ps_route{topic = T, dest = SessionId}) ->
?PERSISTENT_ROUTE_ID(T, SessionId).
select_routes_by_topics(Topics) ->
[encode_route(Topic, Topic) || Topic <- Topics, emqx_broker:subscribers(Topic) =/= []].

View File

@ -4,10 +4,14 @@ This application makes durable session capable to cooperatively replay messages
# General layout and interaction with session
The general idea is described in the [EIP 0028](https://github.com/emqx/eip/blob/main/active/0028-durable-shared-subscriptions.md).
On the code level, the application is organized in the following way:
![General layout](docs/images/ds_shared_subs.png)
* The nesting reflects nesting/ownership of entity states.
* The bold arrow represent the [most complex interaction](https://github.com/emqx/eip/blob/main/active/0028-durable-shared-subscriptions.md#shared-subscription-session-handler), between session-side group subscription state machine and the shared subscription leader.
* The bold arrow represent the [most complex interaction](https://github.com/emqx/eip/blob/main/active/0028-durable-shared-subscriptions.md#shared-subscription-session-handler), between session-side group subscription state machine (**GroupSM**) and the shared subscription leader (**Leader**).
# Contributing

View File

@ -6,83 +6,218 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_ds_shared_sub_proto.hrl").
-include("emqx_ds_shared_sub_config.hrl").
-export([
new/1,
open/2,
can_subscribe/3,
on_subscribe/3,
on_unsubscribe/2,
on_unsubscribe/3,
on_stream_progress/2,
on_info/2,
on_disconnect/2,
renew_streams/1
]).
-behaviour(emqx_persistent_session_ds_shared_subs_agent).
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type group_id() :: share_topic_filter().
-type progress() :: emqx_persistent_session_ds_shared_subs:progress().
-type external_lease_event() ::
#{
type => lease,
stream => emqx_ds:stream(),
progress => progress(),
share_topic_filter => emqx_persistent_session_ds:share_topic_filter()
}
| #{
type => revoke,
stream => emqx_ds:stream(),
share_topic_filter => emqx_persistent_session_ds:share_topic_filter()
}.
-type options() :: #{
session_id := emqx_persistent_session_ds:id()
}.
-type t() :: #{
groups := #{
group_id() => emqx_ds_shared_sub_group_sm:t()
},
session_id := emqx_persistent_session_ds:id()
}.
%% We speak in the terms of share_topic_filter in the module API
%% which is consumed by persistent session.
%%
%% We speak in the terms of group_id internally:
%% * to identfy shared subscription's group_sm in the state;
%% * to addres agent's group_sm while communicating with leader.
%% * to identify the leader itself.
%%
%% share_topic_filter should be uniquely determined by group_id. See MQTT 5.0 spec:
%%
%% > Note that "$share/consumer1//finance" and "$share/consumer1/sport/tennis/+"
%% > are distinct shared subscriptions, even though they have the same ShareName.
%% > While they might be related in some way, no specific relationship between them
%% > is implied by them having the same ShareName.
%%
%% So we just use the full share_topic_filter record as group_id.
-define(group_id(ShareTopicFilter), ShareTopicFilter).
-define(share_topic_filter(GroupId), GroupId).
-record(message_to_group_sm, {
group :: emqx_types:group(),
group_id :: group_id(),
message :: term()
}).
-export_type([
t/0,
group_id/0,
options/0,
external_lease_event/0
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec new(options()) -> t().
new(Opts) ->
init_state(Opts).
-spec open([{share_topic_filter(), emqx_types:subopts()}], options()) -> t().
open(TopicSubscriptions, Opts) ->
State0 = init_state(Opts),
State1 = lists:foldl(
fun({ShareTopicFilter, #{}}, State) ->
add_group_subscription(State, ShareTopicFilter)
?tp(warning, ds_agent_open_subscription, #{
topic_filter => ShareTopicFilter
}),
add_shared_subscription(State, ShareTopicFilter)
end,
State0,
TopicSubscriptions
),
State1.
on_subscribe(State0, TopicFilter, _SubOpts) ->
State1 = add_group_subscription(State0, TopicFilter),
{ok, State1}.
-spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) ->
ok | {error, emqx_types:reason_code()}.
can_subscribe(_State, _ShareTopicFilter, _SubOpts) ->
case ?dq_config(enable) of
true -> ok;
false -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}
end.
on_unsubscribe(State, TopicFilter) ->
delete_group_subscription(State, TopicFilter).
-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
on_subscribe(State0, ShareTopicFilter, _SubOpts) ->
?tp(warning, ds_agent_on_subscribe, #{
share_topic_filter => ShareTopicFilter
}),
add_shared_subscription(State0, ShareTopicFilter).
-spec on_unsubscribe(t(), share_topic_filter(), [
emqx_persistent_session_ds_shared_subs:agent_stream_progress()
]) -> t().
on_unsubscribe(State, ShareTopicFilter, GroupProgress) ->
delete_shared_subscription(State, ShareTopicFilter, GroupProgress).
-spec renew_streams(t()) ->
{[emqx_persistent_session_ds_shared_subs_agent:stream_lease_event()], t()}.
renew_streams(#{} = State) ->
fetch_stream_events(State).
on_stream_progress(State, _StreamProgress) ->
%% TODO https://emqx.atlassian.net/browse/EMQX-12572
%% Send to leader
State.
-spec on_stream_progress(t(), #{
share_topic_filter() => [emqx_persistent_session_ds_shared_subs:agent_stream_progress()]
}) -> t().
on_stream_progress(State, StreamProgresses) ->
maps:fold(
fun(ShareTopicFilter, GroupProgresses, StateAcc) ->
with_group_sm(StateAcc, ?group_id(ShareTopicFilter), fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_stream_progress(GSM, GroupProgresses)
end)
end,
State,
StreamProgresses
).
on_info(State, ?leader_lease_streams_match(Group, StreamProgresses, Version)) ->
-spec on_disconnect(t(), [emqx_persistent_session_ds_shared_subs:agent_stream_progress()]) -> t().
on_disconnect(#{groups := Groups0} = State, StreamProgresses) ->
ok = maps:foreach(
fun(GroupId, GroupSM0) ->
GroupProgresses = maps:get(?share_topic_filter(GroupId), StreamProgresses, []),
emqx_ds_shared_sub_group_sm:handle_disconnect(GroupSM0, GroupProgresses)
end,
Groups0
),
State#{groups => #{}}.
-spec on_info(t(), term()) -> t().
on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Version)) ->
?SLOG(info, #{
msg => leader_lease_streams,
group => Group,
group_id => GroupId,
streams => StreamProgresses,
version => Version
version => Version,
leader => Leader
}),
with_group_sm(State, Group, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_lease_streams(GSM, StreamProgresses, Version)
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_lease_streams(
GSM, Leader, StreamProgresses, Version
)
end);
on_info(State, ?leader_renew_stream_lease_match(Group, Version)) ->
on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) ->
?SLOG(info, #{
msg => leader_renew_stream_lease,
group => Group,
group_id => GroupId,
version => Version
}),
with_group_sm(State, Group, fun(GSM) ->
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version)
end);
on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew)) ->
?SLOG(info, #{
msg => leader_renew_stream_lease,
group_id => GroupId,
version_old => VersionOld,
version_new => VersionNew
}),
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew)
end);
on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew)) ->
?SLOG(info, #{
msg => leader_update_streams,
group_id => GroupId,
version_old => VersionOld,
version_new => VersionNew,
streams_new => StreamsNew
}),
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_update_streams(
GSM, VersionOld, VersionNew, StreamsNew
)
end);
on_info(State, ?leader_invalidate_match(GroupId)) ->
?SLOG(info, #{
msg => leader_invalidate,
group_id => GroupId
}),
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_invalidate(GSM)
end);
%% Generic messages sent by group_sm's to themselves (timeouts).
on_info(State, #message_to_group_sm{group = Group, message = Message}) ->
with_group_sm(State, Group, fun(GSM) ->
on_info(State, #message_to_group_sm{group_id = GroupId, message = Message}) ->
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_info(GSM, Message)
end).
@ -97,23 +232,30 @@ init_state(Opts) ->
groups => #{}
}.
delete_group_subscription(State, _ShareTopicFilter) ->
%% TODO https://emqx.atlassian.net/browse/EMQX-12572
State.
delete_shared_subscription(State, ShareTopicFilter, GroupProgress) ->
GroupId = ?group_id(ShareTopicFilter),
case State of
#{groups := #{GroupId := GSM} = Groups} ->
_ = emqx_ds_shared_sub_group_sm:handle_disconnect(GSM, GroupProgress),
State#{groups => maps:remove(GroupId, Groups)};
_ ->
State
end.
add_group_subscription(
#{groups := Groups0} = State0, ShareTopicFilter
add_shared_subscription(
#{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter
) ->
?SLOG(info, #{
msg => agent_add_group_subscription,
topic_filter => ShareTopicFilter
msg => agent_add_shared_subscription,
share_topic_filter => ShareTopicFilter
}),
#share{group = Group} = ShareTopicFilter,
GroupId = ?group_id(ShareTopicFilter),
Groups1 = Groups0#{
Group => emqx_ds_shared_sub_group_sm:new(#{
topic_filter => ShareTopicFilter,
agent => this_agent(),
send_after => send_to_subscription_after(Group)
GroupId => emqx_ds_shared_sub_group_sm:new(#{
session_id => SessionId,
share_topic_filter => ShareTopicFilter,
agent => this_agent(SessionId),
send_after => send_to_subscription_after(GroupId)
})
},
State1 = State0#{groups => Groups1},
@ -121,9 +263,9 @@ add_group_subscription(
fetch_stream_events(#{groups := Groups0} = State0) ->
{Groups1, Events} = maps:fold(
fun(Group, GroupSM0, {GroupsAcc, EventsAcc}) ->
fun(GroupId, GroupSM0, {GroupsAcc, EventsAcc}) ->
{GroupSM1, Events} = emqx_ds_shared_sub_group_sm:fetch_stream_events(GroupSM0),
{GroupsAcc#{Group => GroupSM1}, [Events | EventsAcc]}
{GroupsAcc#{GroupId => GroupSM1}, [Events | EventsAcc]}
end,
{#{}, []},
Groups0
@ -131,26 +273,23 @@ fetch_stream_events(#{groups := Groups0} = State0) ->
State1 = State0#{groups => Groups1},
{lists:concat(Events), State1}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
this_agent(Id) ->
emqx_ds_shared_sub_proto:agent(Id, self()).
this_agent() -> self().
send_to_subscription_after(Group) ->
send_to_subscription_after(GroupId) ->
fun(Time, Msg) ->
emqx_persistent_session_ds_shared_subs_agent:send_after(
Time,
self(),
#message_to_group_sm{group = Group, message = Msg}
#message_to_group_sm{group_id = GroupId, message = Msg}
)
end.
with_group_sm(State, Group, Fun) ->
with_group_sm(State, GroupId, Fun) ->
case State of
#{groups := #{Group := GSM0} = Groups} ->
GSM1 = Fun(GSM0),
State#{groups => Groups#{Group => GSM1}};
#{groups := #{GroupId := GSM0} = Groups} ->
#{} = GSM1 = Fun(GSM0),
State#{groups => Groups#{GroupId => GSM1}};
_ ->
%% TODO
%% Error?

View File

@ -0,0 +1,218 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_api).
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
%% Swagger specs from hocon schema
-export([
api_spec/0,
paths/0,
schema/1,
namespace/0
]).
-export([
fields/1,
roots/0
]).
-define(TAGS, [<<"Durable Queues">>]).
%% API callbacks
-export([
'/durable_queues'/2,
'/durable_queues/:id'/2
]).
-import(hoconsc, [mk/2, ref/1, ref/2]).
-import(emqx_dashboard_swagger, [error_codes/2]).
namespace() -> "durable_queues".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
[
"/durable_queues",
"/durable_queues/:id"
].
-define(NOT_FOUND, 'NOT_FOUND').
schema("/durable_queues") ->
#{
'operationId' => '/durable_queues',
get => #{
tags => ?TAGS,
summary => <<"List declared durable queues">>,
description => ?DESC("durable_queues_get"),
responses => #{
200 => emqx_dashboard_swagger:schema_with_example(
durable_queues_get(),
durable_queues_get_example()
)
}
}
};
schema("/durable_queues/:id") ->
#{
'operationId' => '/durable_queues/:id',
get => #{
tags => ?TAGS,
summary => <<"Get a declared durable queue">>,
description => ?DESC("durable_queue_get"),
parameters => [param_queue_id()],
responses => #{
200 => emqx_dashboard_swagger:schema_with_example(
durable_queue_get(),
durable_queue_get_example()
),
404 => error_codes([?NOT_FOUND], <<"Queue Not Found">>)
}
},
delete => #{
tags => ?TAGS,
summary => <<"Delete a declared durable queue">>,
description => ?DESC("durable_queue_delete"),
parameters => [param_queue_id()],
responses => #{
200 => <<"Queue deleted">>,
404 => error_codes([?NOT_FOUND], <<"Queue Not Found">>)
}
},
put => #{
tags => ?TAGS,
summary => <<"Declare a durable queue">>,
description => ?DESC("durable_queues_put"),
parameters => [param_queue_id()],
'requestBody' => durable_queue_put(),
responses => #{
200 => emqx_dashboard_swagger:schema_with_example(
durable_queue_get(),
durable_queue_get_example()
)
}
}
}.
'/durable_queues'(get, _Params) ->
{200, queue_list()}.
'/durable_queues/:id'(get, Params) ->
case queue_get(Params) of
{ok, Queue} -> {200, Queue};
not_found -> serialize_error(not_found)
end;
'/durable_queues/:id'(delete, Params) ->
case queue_delete(Params) of
ok -> {200, <<"Queue deleted">>};
not_found -> serialize_error(not_found)
end;
'/durable_queues/:id'(put, Params) ->
{200, queue_put(Params)}.
%%--------------------------------------------------------------------
%% Actual handlers: stubs
%%--------------------------------------------------------------------
queue_list() ->
persistent_term:get({?MODULE, queues}, []).
queue_get(#{bindings := #{id := ReqId}}) ->
case [Q || #{id := Id} = Q <- queue_list(), Id =:= ReqId] of
[Queue] -> {ok, Queue};
[] -> not_found
end.
queue_delete(#{bindings := #{id := ReqId}}) ->
Queues0 = queue_list(),
Queues1 = [Q || #{id := Id} = Q <- Queues0, Id =/= ReqId],
persistent_term:put({?MODULE, queues}, Queues1),
case Queues0 =:= Queues1 of
true -> not_found;
false -> ok
end.
queue_put(#{bindings := #{id := ReqId}}) ->
Queues0 = queue_list(),
Queues1 = [Q || #{id := Id} = Q <- Queues0, Id =/= ReqId],
NewQueue = #{
id => ReqId
},
Queues2 = [NewQueue | Queues1],
persistent_term:put({?MODULE, queues}, Queues2),
NewQueue.
%%--------------------------------------------------------------------
%% Schemas
%%--------------------------------------------------------------------
param_queue_id() ->
{
id,
mk(binary(), #{
in => path,
desc => ?DESC(param_queue_id),
required => true,
validator => fun validate_queue_id/1
})
}.
validate_queue_id(Id) ->
case emqx_topic:words(Id) of
[Segment] when is_binary(Segment) -> true;
_ -> {error, <<"Invalid queue id">>}
end.
durable_queues_get() ->
hoconsc:array(ref(durable_queue_get)).
durable_queue_get() ->
ref(durable_queue_get).
durable_queue_put() ->
map().
roots() -> [].
fields(durable_queue_get) ->
[
{id, mk(binary(), #{})}
].
%%--------------------------------------------------------------------
%% Examples
%%--------------------------------------------------------------------
durable_queue_get_example() ->
#{
id => <<"queue1">>
}.
durable_queues_get_example() ->
[
#{
id => <<"queue1">>
},
#{
id => <<"queue2">>
}
].
%%--------------------------------------------------------------------
%% Error codes
%%--------------------------------------------------------------------
serialize_error(not_found) ->
{404, #{
code => <<"NOT_FOUND">>,
message => <<"Queue Not Found">>
}}.

View File

@ -15,9 +15,11 @@
-spec start(application:start_type(), term()) -> {ok, pid()}.
start(_Type, _Args) ->
ok = emqx_ds_shared_sub_config:load(),
{ok, Sup} = emqx_ds_shared_sub_sup:start_link(),
{ok, Sup}.
-spec stop(term()) -> ok.
stop(_State) ->
ok = emqx_ds_shared_sub_config:unload(),
ok.

View File

@ -0,0 +1,84 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_config).
-behaviour(emqx_config_handler).
-behaviour(emqx_config_backup).
-type update_request() :: emqx_config:config().
%% callbacks for emqx_config_handler
-export([
pre_config_update/3,
post_config_update/5
]).
%% callbacks for emqx_config_backup
-export([
import_config/1
]).
%% API
-export([
load/0,
unload/0,
get/1
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec load() -> ok.
load() ->
emqx_conf:add_handler([durable_queues], ?MODULE).
-spec unload() -> ok.
unload() ->
ok = emqx_conf:remove_handler([durable_queues]).
-spec get(atom() | [atom()]) -> term().
get(Name) when is_atom(Name) ->
emqx_config:get([durable_queues, Name]);
get(Name) when is_list(Name) ->
emqx_config:get([durable_queues | Name]).
%%--------------------------------------------------------------------
%% emqx_config_handler callbacks
%%--------------------------------------------------------------------
-spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config()) ->
{ok, emqx_config:update_request()}.
pre_config_update([durable_queues | _], NewConfig, _OldConfig) ->
{ok, NewConfig}.
-spec post_config_update(
list(atom()),
update_request(),
emqx_config:config(),
emqx_config:config(),
emqx_config:app_envs()
) ->
ok.
post_config_update([durable_queues | _], _Req, _NewConfig, _OldConfig, _AppEnvs) ->
ok.
%%----------------------------------------------------------------------------------------
%% Data backup
%%----------------------------------------------------------------------------------------
import_config(#{<<"durable_queues">> := DQConf}) ->
OldDQConf = emqx:get_raw_config([durable_queues], #{}),
NewDQConf = maps:merge(OldDQConf, DQConf),
case emqx_conf:update([durable_queues], NewDQConf, #{override_to => cluster}) of
{ok, #{raw_config := NewRawConf}} ->
Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawConf, DQConf)),
ChangedPaths = [[durable_queues, K] || K <- maps:keys(Changed)],
{ok, #{root_key => durable_queues, changed => ChangedPaths}};
Error ->
{error, #{root_key => durable_queues, reason => Error}}
end;
import_config(_) ->
{ok, #{root_key => durable_queues, changed => []}}.

View File

@ -0,0 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-define(dq_config(Path), emqx_ds_shared_sub_config:get(Path)).

View File

@ -10,73 +10,113 @@
-module(emqx_ds_shared_sub_group_sm).
-include_lib("emqx/include/logger.hrl").
-include("emqx_ds_shared_sub_config.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([
new/1,
%% Leader messages
handle_leader_lease_streams/3,
handle_leader_lease_streams/4,
handle_leader_renew_stream_lease/2,
handle_leader_renew_stream_lease/3,
handle_leader_update_streams/4,
handle_leader_invalidate/1,
%% Self-initiated messages
handle_info/2,
%% API
fetch_stream_events/1
fetch_stream_events/1,
handle_stream_progress/2,
handle_disconnect/2
]).
-export_type([
t/0,
options/0,
state/0
]).
-type options() :: #{
session_id := emqx_persistent_session_ds:id(),
agent := emqx_ds_shared_sub_proto:agent(),
topic_filter := emqx_persistent_session_ds:share_topic_filter(),
share_topic_filter := emqx_persistent_session_ds:share_topic_filter(),
send_after := fun((non_neg_integer(), term()) -> reference())
}.
%% Subscription states
-type progress() :: emqx_persistent_session_ds_shared_subs:progress().
-type stream_lease_event() ::
#{
type => lease,
stream => emqx_ds:stream(),
progress => progress()
}
| #{
type => revoke,
stream => emqx_ds:stream()
}.
%% GroupSM States
-define(connecting, connecting).
-define(replaying, replaying).
-define(updating, updating).
-define(disconnected, disconnected).
-type state() :: ?connecting | ?replaying | ?updating.
-type state() :: ?connecting | ?replaying | ?updating | ?disconnected.
-type group_sm() :: #{
topic_filter => emqx_persistent_session_ds:share_topic_filter(),
agent => emqx_ds_shared_sub_proto:agent(),
send_after => fun((non_neg_integer(), term()) -> reference()),
state => state(),
state_data => map(),
state_timers => map()
-type connecting_data() :: #{}.
-type replaying_data() :: #{
leader => emqx_ds_shared_sub_proto:leader(),
streams => #{emqx_ds:stream() => progress()},
version => emqx_ds_shared_sub_proto:version(),
prev_version => undefined
}.
-type updating_data() :: #{
leader => emqx_ds_shared_sub_proto:leader(),
streams => #{emqx_ds:stream() => progress()},
version => emqx_ds_shared_sub_proto:version(),
prev_version => emqx_ds_shared_sub_proto:version()
}.
-type state_data() :: connecting_data() | replaying_data() | updating_data().
-record(state_timeout, {
id :: reference(),
name :: atom(),
message :: term()
}).
-record(timer, {
ref :: reference(),
id :: reference()
}).
%%-----------------------------------------------------------------------
%% Constants
%%-----------------------------------------------------------------------
-type timer_name() :: atom().
-type timer() :: #timer{}.
%% TODO https://emqx.atlassian.net/browse/EMQX-12574
%% Move to settings
-define(FIND_LEADER_TIMEOUT, 1000).
-define(RENEW_LEASE_TIMEOUT, 2000).
-type t() :: #{
share_topic_filter => emqx_persistent_session_ds:share_topic_filter(),
agent => emqx_ds_shared_sub_proto:agent(),
send_after => fun((non_neg_integer(), term()) -> reference()),
stream_lease_events => list(stream_lease_event()),
state => state(),
state_data => state_data(),
state_timers => #{timer_name() => timer()}
}.
%%-----------------------------------------------------------------------
%% API
%%-----------------------------------------------------------------------
-spec new(options()) -> group_sm().
-spec new(options()) -> t().
new(#{
session_id := SessionId,
agent := Agent,
topic_filter := ShareTopicFilter,
share_topic_filter := ShareTopicFilter,
send_after := SendAfter
}) ->
?SLOG(
@ -84,37 +124,49 @@ new(#{
#{
msg => group_sm_new,
agent => Agent,
topic_filter => ShareTopicFilter
share_topic_filter => ShareTopicFilter
}
),
GSM0 = #{
topic_filter => ShareTopicFilter,
id => SessionId,
share_topic_filter => ShareTopicFilter,
agent => Agent,
send_after => SendAfter
},
?tp(warning, group_sm_new, #{
agent => Agent,
share_topic_filter => ShareTopicFilter
}),
transition(GSM0, ?connecting, #{}).
-spec fetch_stream_events(t()) ->
{t(), [emqx_ds_shared_sub_agent:external_lease_event()]}.
fetch_stream_events(
#{
state := ?replaying,
topic_filter := TopicFilter,
state_data := #{stream_lease_events := Events0} = Data
state := _State,
share_topic_filter := ShareTopicFilter,
stream_lease_events := Events0
} = GSM
) ->
Events1 = lists:map(
fun(Event) ->
Event#{topic_filter => TopicFilter}
Event#{share_topic_filter => ShareTopicFilter}
end,
Events0
),
{
GSM#{
state_data => Data#{stream_lease_events => []}
},
Events1
};
fetch_stream_events(GSM) ->
{GSM, []}.
{GSM#{stream_lease_events => []}, Events1}.
-spec handle_disconnect(t(), emqx_ds_shared_sub_proto:agent_stream_progress()) -> t().
handle_disconnect(#{state := ?connecting} = GSM, _StreamProgresses) ->
transition(GSM, ?disconnected, #{});
handle_disconnect(
#{agent := Agent, state_data := #{leader := Leader, version := Version} = StateData} = GSM,
StreamProgresses
) ->
ok = emqx_ds_shared_sub_proto:agent_disconnect(
Leader, Agent, StreamProgresses, Version
),
transition(GSM, ?disconnected, StateData).
%%-----------------------------------------------------------------------
%% Event Handlers
@ -123,89 +175,282 @@ fetch_stream_events(GSM) ->
%%-----------------------------------------------------------------------
%% Connecting state
handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) ->
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter),
ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT).
handle_connecting(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) ->
?tp(warning, group_sm_enter_connecting, #{
agent => Agent,
share_topic_filter => ShareTopicFilter
}),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM), ShareTopicFilter),
ensure_state_timeout(GSM, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms)).
handle_leader_lease_streams(
#{state := ?connecting, topic_filter := TopicFilter} = GSM0, StreamProgresses, Version
#{state := ?connecting, share_topic_filter := ShareTopicFilter} = GSM0,
Leader,
StreamProgresses,
Version
) ->
?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}),
Streams = lists:foldl(
fun(#{stream := Stream, iterator := It}, Acc) ->
Acc#{Stream => It}
end,
#{},
StreamProgresses
),
StreamLeaseEvents = lists:map(
fun(#{stream := Stream, iterator := It}) ->
#{
type => lease,
stream => Stream,
iterator => It
}
end,
StreamProgresses
),
?tp(debug, leader_lease_streams, #{share_topic_filter => ShareTopicFilter}),
Streams = progresses_to_map(StreamProgresses),
StreamLeaseEvents = progresses_to_lease_events(StreamProgresses),
transition(
GSM0,
?replaying,
#{
leader => Leader,
streams => Streams,
stream_lease_events => StreamLeaseEvents,
prev_version => undefined,
version => Version
}
},
StreamLeaseEvents
);
handle_leader_lease_streams(GSM, _StreamProgresses, _Version) ->
handle_leader_lease_streams(GSM, _Leader, _StreamProgresses, _Version) ->
GSM.
handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) ->
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter),
GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT),
handle_find_leader_timeout(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM0) ->
?tp(warning, group_sm_find_leader_timeout, #{
agent => Agent,
share_topic_filter => ShareTopicFilter
}),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), ShareTopicFilter),
GSM1 = ensure_state_timeout(
GSM0, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms)
),
GSM1.
%%-----------------------------------------------------------------------
%% Replaying state
handle_replaying(GSM) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT).
handle_replaying(GSM0) ->
GSM1 = ensure_state_timeout(
GSM0, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms)
),
GSM2 = ensure_state_timeout(
GSM1, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms)
),
GSM2.
handle_leader_renew_stream_lease(
#{state := ?replaying, state_data := #{version := Version}} = GSM, Version
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
handle_leader_renew_stream_lease(GSM, _Version) ->
GSM.
handle_renew_lease_timeout(GSM) ->
?tp(debug, renew_lease_timeout, #{}),
handle_renew_lease_timeout(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) ->
?tp(warning, renew_lease_timeout, #{agent => Agent, share_topic_filter => ShareTopicFilter}),
transition(GSM, ?connecting, #{}).
%%-----------------------------------------------------------------------
%% Updating state
% handle_updating(GSM) ->
% GSM.
handle_updating(GSM0) ->
GSM1 = ensure_state_timeout(
GSM0, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms)
),
GSM2 = ensure_state_timeout(
GSM1, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms)
),
GSM2.
%%-----------------------------------------------------------------------
%% Disconnected state
handle_disconnected(GSM) ->
GSM.
%%-----------------------------------------------------------------------
%% Common handlers
handle_leader_update_streams(
#{
id := Id,
state := ?replaying,
state_data := #{streams := Streams0, version := VersionOld} = StateData
} = GSM,
VersionOld,
VersionNew,
StreamProgresses
) ->
?tp(warning, shared_sub_group_sm_leader_update_streams, #{
id => Id,
version_old => VersionOld,
version_new => VersionNew,
stream_progresses => emqx_ds_shared_sub_proto:format_stream_progresses(StreamProgresses)
}),
{AddEvents, Streams1} = lists:foldl(
fun(#{stream := Stream, progress := Progress}, {AddEventAcc, StreamsAcc}) ->
case maps:is_key(Stream, StreamsAcc) of
true ->
%% We prefer our own progress
{AddEventAcc, StreamsAcc};
false ->
{
[#{type => lease, stream => Stream, progress => Progress} | AddEventAcc],
StreamsAcc#{Stream => Progress}
}
end
end,
{[], Streams0},
StreamProgresses
),
NewStreamMap = progresses_to_map(StreamProgresses),
{RevokeEvents, Streams2} = lists:foldl(
fun(Stream, {RevokeEventAcc, StreamsAcc}) ->
case maps:is_key(Stream, NewStreamMap) of
true ->
{RevokeEventAcc, StreamsAcc};
false ->
{
[#{type => revoke, stream => Stream} | RevokeEventAcc],
maps:remove(Stream, StreamsAcc)
}
end
end,
{[], Streams1},
maps:keys(Streams1)
),
StreamLeaseEvents = AddEvents ++ RevokeEvents,
?tp(warning, shared_sub_group_sm_leader_update_streams, #{
id => Id,
stream_lease_events => emqx_ds_shared_sub_proto:format_lease_events(StreamLeaseEvents)
}),
transition(
GSM,
?updating,
StateData#{
streams => Streams2,
prev_version => VersionOld,
version => VersionNew
},
StreamLeaseEvents
);
handle_leader_update_streams(
#{
state := ?updating,
state_data := #{version := VersionNew} = _StreamData
} = GSM,
_VersionOld,
VersionNew,
_StreamProgresses
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms));
handle_leader_update_streams(
#{state := ?disconnected} = GSM, _VersionOld, _VersionNew, _StreamProgresses
) ->
GSM;
handle_leader_update_streams(GSM, VersionOld, VersionNew, _StreamProgresses) ->
%% Unexpected versions or state
?tp(warning, shared_sub_group_sm_unexpected_leader_update_streams, #{
gsm => GSM,
version_old => VersionOld,
version_new => VersionNew
}),
transition(GSM, ?connecting, #{}).
handle_leader_renew_stream_lease(
#{state := ?replaying, state_data := #{version := Version}} = GSM, Version
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms));
handle_leader_renew_stream_lease(
#{state := ?updating, state_data := #{version := Version} = StateData} = GSM, Version
) ->
transition(
GSM,
?replaying,
StateData#{prev_version => undefined}
);
handle_leader_renew_stream_lease(GSM, _Version) ->
GSM.
handle_leader_renew_stream_lease(
#{state := ?replaying, state_data := #{version := Version}} = GSM, VersionOld, VersionNew
) when VersionOld =:= Version orelse VersionNew =:= Version ->
ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms));
handle_leader_renew_stream_lease(
#{state := ?updating, state_data := #{version := VersionNew, prev_version := VersionOld}} = GSM,
VersionOld,
VersionNew
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms));
handle_leader_renew_stream_lease(
#{state := ?disconnected} = GSM, _VersionOld, _VersionNew
) ->
GSM;
handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) ->
%% Unexpected versions or state
?tp(warning, shared_sub_group_sm_unexpected_leader_renew_stream_lease, #{
gsm => GSM,
version_old => VersionOld,
version_new => VersionNew
}),
transition(GSM, ?connecting, #{}).
-spec handle_stream_progress(t(), list(emqx_ds_shared_sub_proto:agent_stream_progress())) ->
t().
handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) ->
GSM;
handle_stream_progress(
#{
state := ?replaying,
agent := Agent,
state_data := #{
leader := Leader,
version := Version
}
} = GSM,
StreamProgresses
) ->
ok = emqx_ds_shared_sub_proto:agent_update_stream_states(
Leader, Agent, StreamProgresses, Version
),
ensure_state_timeout(
GSM, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms)
);
handle_stream_progress(
#{
state := ?updating,
agent := Agent,
state_data := #{
leader := Leader,
version := Version,
prev_version := PrevVersion
}
} = GSM,
StreamProgresses
) ->
ok = emqx_ds_shared_sub_proto:agent_update_stream_states(
Leader, Agent, StreamProgresses, PrevVersion, Version
),
ensure_state_timeout(
GSM, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms)
);
handle_stream_progress(#{state := ?disconnected} = GSM, _StreamProgresses) ->
GSM.
handle_leader_invalidate(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) ->
?tp(warning, shared_sub_group_sm_leader_invalidate, #{
agent => Agent,
share_topic_filter => ShareTopicFilter
}),
transition(GSM, ?connecting, #{}).
%%-----------------------------------------------------------------------
%% Internal API
%%-----------------------------------------------------------------------
handle_state_timeout(
#{state := ?connecting, topic_filter := TopicFilter} = GSM,
#{state := ?connecting, share_topic_filter := ShareTopicFilter} = GSM,
find_leader_timeout,
_Message
) ->
?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}),
?tp(debug, find_leader_timeout, #{share_topic_filter => ShareTopicFilter}),
handle_find_leader_timeout(GSM);
handle_state_timeout(
#{state := ?replaying} = GSM,
renew_lease_timeout,
_Message
) ->
handle_renew_lease_timeout(GSM).
handle_renew_lease_timeout(GSM);
handle_state_timeout(
GSM,
update_stream_state_timeout,
_Message
) ->
?tp(debug, update_stream_state_timeout, #{}),
handle_stream_progress(GSM, []).
handle_info(
#{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = _Info
@ -225,6 +470,9 @@ handle_info(GSM, _Info) ->
%%--------------------------------------------------------------------
transition(GSM0, NewState, NewStateData) ->
transition(GSM0, NewState, NewStateData, []).
transition(GSM0, NewState, NewStateData, LeaseEvents) ->
Timers = maps:get(state_timers, GSM0, #{}),
TimerNames = maps:keys(Timers),
GSM1 = lists:foldl(
@ -237,10 +485,14 @@ transition(GSM0, NewState, NewStateData) ->
GSM2 = GSM1#{
state => NewState,
state_data => NewStateData,
state_timers => #{}
state_timers => #{},
stream_lease_events => LeaseEvents
},
run_enter_callback(GSM2).
agent_metadata(#{id := Id} = _GSM) ->
#{id => Id}.
ensure_state_timeout(GSM0, Name, Delay) ->
ensure_state_timeout(GSM0, Name, Delay, Name).
@ -277,6 +529,29 @@ cancel_timer(GSM, Name) ->
run_enter_callback(#{state := ?connecting} = GSM) ->
handle_connecting(GSM);
run_enter_callback(#{state := ?replaying} = GSM) ->
handle_replaying(GSM).
% run_enter_callback(#{state := ?updating} = GSM) ->
% handle_updating(GSM).
handle_replaying(GSM);
run_enter_callback(#{state := ?updating} = GSM) ->
handle_updating(GSM);
run_enter_callback(#{state := ?disconnected} = GSM) ->
handle_disconnected(GSM).
progresses_to_lease_events(StreamProgresses) ->
lists:map(
fun(#{stream := Stream, progress := Progress}) ->
#{
type => lease,
stream => Stream,
progress => Progress
}
end,
StreamProgresses
).
progresses_to_map(StreamProgresses) ->
lists:foldl(
fun(#{stream := Stream, progress := Progress}, Acc) ->
Acc#{Stream => Progress}
end,
#{},
StreamProgresses
).

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,171 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_leader_rank_progress).
-include_lib("emqx/include/logger.hrl").
-export([
init/0,
set_replayed/2,
add_streams/2,
replayed_up_to/2
]).
%% "shard"
-type rank_x() :: emqx_ds:rank_x().
%% "generation"
-type rank_y() :: emqx_ds:rank_y().
%% shard progress
-type x_progress() :: #{
%% All streams with given rank_x and rank_y =< min_y are replayed.
min_y := rank_y(),
ys := #{
rank_y() => #{
emqx_ds:stream() => _IdReplayed :: boolean()
}
}
}.
-type t() :: #{
rank_x() => x_progress()
}.
-export_type([
t/0
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec init() -> t().
init() -> #{}.
-spec set_replayed(emqx_ds:stream_rank(), t()) -> t().
set_replayed({{RankX, RankY}, Stream}, State) ->
case State of
#{RankX := #{ys := #{RankY := #{Stream := false} = RankYStreams} = Ys0}} ->
Ys1 = Ys0#{RankY => RankYStreams#{Stream => true}},
{MinY, Ys2} = update_min_y(maps:to_list(Ys1)),
State#{RankX => #{min_y => MinY, ys => Ys2}};
_ ->
?SLOG(
warning,
#{
msg => leader_rank_progress_double_or_invalid_update,
rank_x => RankX,
rank_y => RankY,
state => State
}
),
State
end.
-spec add_streams([{emqx_ds:stream_rank(), emqx_ds:stream()}], t()) ->
{[{emqx_ds:stream_rank(), emqx_ds:stream()}], t()}.
add_streams(StreamsWithRanks, State) ->
SortedStreamsWithRanks = lists:sort(
fun({{_RankX1, RankY1}, _Stream1}, {{_RankX2, RankY2}, _Stream2}) ->
RankY1 =< RankY2
end,
StreamsWithRanks
),
lists:foldl(
fun({Rank, Stream} = StreamWithRank, {StreamAcc, StateAcc0}) ->
case add_stream({Rank, Stream}, StateAcc0) of
{true, StateAcc1} ->
{[StreamWithRank | StreamAcc], StateAcc1};
false ->
{StreamAcc, StateAcc0}
end
end,
{[], State},
SortedStreamsWithRanks
).
-spec replayed_up_to(emqx_ds:rank_x(), t()) -> emqx_ds:rank_y().
replayed_up_to(RankX, State) ->
case State of
#{RankX := #{min_y := MinY}} ->
MinY;
_ ->
undefined
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
add_stream({{RankX, RankY}, Stream}, State0) ->
case State0 of
#{RankX := #{min_y := MinY}} when RankY =< MinY ->
false;
#{RankX := #{ys := #{RankY := #{Stream := true}}}} ->
false;
_ ->
XProgress = maps:get(RankX, State0, #{min_y => RankY - 1, ys => #{}}),
Ys0 = maps:get(ys, XProgress),
RankYStreams0 = maps:get(RankY, Ys0, #{}),
RankYStreams1 = RankYStreams0#{Stream => false},
Ys1 = Ys0#{RankY => RankYStreams1},
State1 = State0#{RankX => XProgress#{ys => Ys1}},
{true, State1}
end.
update_min_y([{RankY, RankYStreams} | Rest] = Ys) ->
case {has_unreplayed_streams(RankYStreams), Rest} of
{true, _} ->
{RankY - 1, maps:from_list(Ys)};
{false, []} ->
{RankY - 1, #{}};
{false, _} ->
update_min_y(Rest)
end.
has_unreplayed_streams(RankYStreams) ->
lists:any(
fun(IsReplayed) -> not IsReplayed end,
maps:values(RankYStreams)
).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
add_streams_set_replayed_test() ->
State0 = init(),
{_, State1} = add_streams(
[
{{shard1, 1}, s111},
{{shard1, 1}, s112},
{{shard1, 2}, s121},
{{shard1, 2}, s122},
{{shard1, 3}, s131},
{{shard1, 4}, s141},
{{shard3, 5}, s51}
],
State0
),
?assertEqual(0, replayed_up_to(shard1, State1)),
State2 = set_replayed({{shard1, 1}, s111}, State1),
State3 = set_replayed({{shard1, 3}, s131}, State2),
?assertEqual(0, replayed_up_to(shard1, State3)),
State4 = set_replayed({{shard1, 1}, s112}, State3),
?assertEqual(1, replayed_up_to(shard1, State4)),
State5 = set_replayed({{shard1, 2}, s121}, State4),
State6 = set_replayed({{shard1, 2}, s122}, State5),
?assertEqual(3, replayed_up_to(shard1, State6)),
State7 = set_replayed({{shard1, 4}, s141}, State6),
?assertEqual(3, replayed_up_to(shard1, State7)).
%% -ifdef(TEST) end
-endif.

View File

@ -2,71 +2,286 @@
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
%% TODO https://emqx.atlassian.net/browse/EMQX-12573
%% This should be wrapped with a proto_v1 module.
%% For simplicity, send as simple OTP messages for now.
-module(emqx_ds_shared_sub_proto).
-include("emqx_ds_shared_sub_proto.hrl").
-export([
agent_connect_leader/3,
agent_update_stream_states/4,
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
leader_lease_streams/4,
leader_renew_stream_lease/3
-export([
agent_connect_leader/4,
agent_update_stream_states/4,
agent_update_stream_states/5,
agent_disconnect/4,
leader_lease_streams/5,
leader_renew_stream_lease/3,
leader_renew_stream_lease/4,
leader_update_streams/5,
leader_invalidate/2
]).
-type agent() :: pid().
-export([
format_stream_progresses/1,
format_stream_progress/1,
format_stream_key/1,
format_stream_keys/1,
format_lease_event/1,
format_lease_events/1,
agent/2
]).
-type agent() :: ?agent(emqx_persistent_session_ds:id(), pid()).
-type leader() :: pid().
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type group() :: emqx_types:group().
-type version() :: non_neg_integer().
-type stream_progress() :: #{
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
-type agent_metadata() :: #{
id := emqx_persistent_session_ds:id()
}.
-type leader_stream_progress() :: #{
stream := emqx_ds:stream(),
progress := emqx_persistent_session_ds_shared_subs:progress()
}.
-type agent_stream_progress() :: emqx_persistent_session_ds_shared_subs:agent_stream_progress().
-export_type([
agent/0,
leader/0,
group/0,
version/0,
stream_progress/0
leader_stream_progress/0,
agent_stream_progress/0,
agent_metadata/0
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%% agent -> leader messages
-spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok.
agent_connect_leader(ToLeader, FromAgent, TopicFilter) ->
_ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)),
ok.
-spec agent_connect_leader(leader(), agent(), agent_metadata(), share_topic_filter()) -> ok.
agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) when
?is_local_leader(ToLeader)
->
?tp(warning, shared_sub_proto_msg, #{
type => agent_connect_leader,
to_leader => ToLeader,
from_agent => FromAgent,
agent_metadata => AgentMetadata,
share_topic_filter => ShareTopicFilter
}),
_ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter)),
ok;
agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) ->
emqx_ds_shared_sub_proto_v1:agent_connect_leader(
?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, ShareTopicFilter
).
-spec agent_update_stream_states(leader(), agent(), list(stream_progress()), version()) -> ok.
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
-spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok.
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) when
?is_local_leader(ToLeader)
->
?tp(warning, shared_sub_proto_msg, #{
type => agent_update_stream_states,
to_leader => ToLeader,
from_agent => FromAgent,
stream_progresses => format_stream_progresses(StreamProgresses),
version => Version
}),
_ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)),
ok.
ok;
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
emqx_ds_shared_sub_proto_v1:agent_update_stream_states(
?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version
).
%% ...
-spec agent_update_stream_states(
leader(), agent(), list(agent_stream_progress()), version(), version()
) -> ok.
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) when
?is_local_leader(ToLeader)
->
?tp(warning, shared_sub_proto_msg, #{
type => agent_update_stream_states,
to_leader => ToLeader,
from_agent => FromAgent,
stream_progresses => format_stream_progresses(StreamProgresses),
version_old => VersionOld,
version_new => VersionNew
}),
_ = erlang:send(
ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew)
),
ok;
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) ->
emqx_ds_shared_sub_proto_v1:agent_update_stream_states(
?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew
).
agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) when
?is_local_leader(ToLeader)
->
?tp(warning, shared_sub_proto_msg, #{
type => agent_disconnect,
to_leader => ToLeader,
from_agent => FromAgent,
stream_progresses => format_stream_progresses(StreamProgresses),
version => Version
}),
_ = erlang:send(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)),
ok;
agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) ->
emqx_ds_shared_sub_proto_v1:agent_disconnect(
?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version
).
%% leader -> agent messages
-spec leader_lease_streams(agent(), group(), list(stream_progress()), version()) -> ok.
leader_lease_streams(ToAgent, OfGroup, Streams, Version) ->
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_lease_streams(OfGroup, Streams, Version)
),
-spec leader_lease_streams(agent(), group(), leader(), list(leader_stream_progress()), version()) ->
ok.
leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{
type => leader_lease_streams,
to_agent => ToAgent,
of_group => OfGroup,
leader => Leader,
streams => format_stream_progresses(Streams),
version => Version
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
?agent_pid(ToAgent),
?leader_lease_streams(OfGroup, Leader, Streams, Version)
),
ok;
leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
emqx_ds_shared_sub_proto_v1:leader_lease_streams(
?agent_node(ToAgent), ToAgent, OfGroup, Leader, Streams, Version
).
-spec leader_renew_stream_lease(agent(), group(), version()) -> ok.
leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
leader_renew_stream_lease(ToAgent, OfGroup, Version) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{
type => leader_renew_stream_lease,
to_agent => ToAgent,
of_group => OfGroup,
version => Version
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?agent_pid(ToAgent),
?leader_renew_stream_lease(OfGroup, Version)
),
ok.
ok;
leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease(
?agent_node(ToAgent), ToAgent, OfGroup, Version
).
%% ...
-spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok.
leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{
type => leader_renew_stream_lease,
to_agent => ToAgent,
of_group => OfGroup,
version_old => VersionOld,
version_new => VersionNew
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
?agent_pid(ToAgent),
?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew)
),
ok;
leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease(
?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew
).
-spec leader_update_streams(agent(), group(), version(), version(), list(leader_stream_progress())) ->
ok.
leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when
?is_local_agent(ToAgent)
->
?tp(warning, shared_sub_proto_msg, #{
type => leader_update_streams,
to_agent => ToAgent,
of_group => OfGroup,
version_old => VersionOld,
version_new => VersionNew,
streams_new => format_stream_progresses(StreamsNew)
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
?agent_pid(ToAgent),
?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew)
),
ok;
leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
emqx_ds_shared_sub_proto_v1:leader_update_streams(
?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew
).
-spec leader_invalidate(agent(), group()) -> ok.
leader_invalidate(ToAgent, OfGroup) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{
type => leader_invalidate,
to_agent => ToAgent,
of_group => OfGroup
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
?agent_pid(ToAgent),
?leader_invalidate(OfGroup)
),
ok;
leader_invalidate(ToAgent, OfGroup) ->
emqx_ds_shared_sub_proto_v1:leader_invalidate(
?agent_node(ToAgent), ToAgent, OfGroup
).
%%--------------------------------------------------------------------
%% Internal API
%%--------------------------------------------------------------------
agent(Id, Pid) ->
_ = Id,
?agent(Id, Pid).
format_stream_progresses(Streams) ->
lists:map(
fun format_stream_progress/1,
Streams
).
format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
format_progress(#{iterator := Iterator} = Progress) ->
Progress#{iterator => format_opaque(Iterator)}.
format_stream_key({SubId, Stream}) ->
{SubId, format_opaque(Stream)}.
format_stream_keys(StreamKeys) ->
lists:map(
fun format_stream_key/1,
StreamKeys
).
format_lease_events(Events) ->
lists:map(
fun format_lease_event/1,
Events
).
format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
format_lease_event(#{stream := Stream} = Event) ->
Event#{stream => format_opaque(Stream)}.
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
format_opaque(Opaque) ->
erlang:phash2(Opaque).

View File

@ -6,9 +6,6 @@
%% These messages are instantiated on the receiver's side, so they do not
%% travel over the network.
-ifndef(EMQX_DS_SHARED_SUB_PROTO_HRL).
-define(EMQX_DS_SHARED_SUB_PROTO_HRL, true).
%% NOTE
%% We do not need any kind of request/response identification,
%% because the protocol is fully event-based.
@ -19,19 +16,22 @@
-define(agent_update_stream_states_msg, agent_update_stream_states).
-define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout).
-define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout).
-define(agent_disconnect_msg, agent_disconnect).
%% Agent messages sent to the leader.
%% Leader talks to many agents, `agent` field is used to identify the sender.
-define(agent_connect_leader(Agent, TopicFilter), #{
-define(agent_connect_leader(Agent, AgentMetadata, ShareTopicFilter), #{
type => ?agent_connect_leader_msg,
topic_filter => TopicFilter,
share_topic_filter => ShareTopicFilter,
agent_metadata => AgentMetadata,
agent => Agent
}).
-define(agent_connect_leader_match(Agent, TopicFilter), #{
-define(agent_connect_leader_match(Agent, AgentMetadata, ShareTopicFilter), #{
type := ?agent_connect_leader_msg,
topic_filter := TopicFilter,
share_topic_filter := ShareTopicFilter,
agent_metadata := AgentMetadata,
agent := Agent
}).
@ -49,37 +49,137 @@
agent := Agent
}).
-define(agent_update_stream_states(Agent, StreamStates, VersionOld, VersionNew), #{
type => ?agent_update_stream_states_msg,
stream_states => StreamStates,
version_old => VersionOld,
version_new => VersionNew,
agent => Agent
}).
-define(agent_update_stream_states_match(Agent, StreamStates, VersionOld, VersionNew), #{
type := ?agent_update_stream_states_msg,
stream_states := StreamStates,
version_old := VersionOld,
version_new := VersionNew,
agent := Agent
}).
-define(agent_disconnect(Agent, StreamStates, Version), #{
type => ?agent_disconnect_msg,
stream_states => StreamStates,
version => Version,
agent => Agent
}).
-define(agent_disconnect_match(Agent, StreamStates, Version), #{
type := ?agent_disconnect_msg,
stream_states := StreamStates,
version := Version,
agent := Agent
}).
%% leader messages, sent from the leader to the agent
%% Agent may have several shared subscriptions, so may talk to several leaders
%% `group` field is used to identify the leader.
%% `group_id` field is used to identify the leader.
-define(leader_lease_streams_msg, leader_lease_streams).
-define(leader_renew_stream_lease_msg, leader_renew_stream_lease).
-define(leader_lease_streams(Group, Streams, Version), #{
-define(leader_lease_streams(GrouId, Leader, Streams, Version), #{
type => ?leader_lease_streams_msg,
streams => Streams,
version => Version,
group => Group
leader => Leader,
group_id => GrouId
}).
-define(leader_lease_streams_match(Group, Streams, Version), #{
-define(leader_lease_streams_match(GroupId, Leader, Streams, Version), #{
type := ?leader_lease_streams_msg,
streams := Streams,
version := Version,
group := Group
leader := Leader,
group_id := GroupId
}).
-define(leader_renew_stream_lease(Group, Version), #{
-define(leader_renew_stream_lease(GroupId, Version), #{
type => ?leader_renew_stream_lease_msg,
version => Version,
group => Group
group_id => GroupId
}).
-define(leader_renew_stream_lease_match(Group, Version), #{
-define(leader_renew_stream_lease_match(GroupId, Version), #{
type := ?leader_renew_stream_lease_msg,
version := Version,
group := Group
group_id := GroupId
}).
-define(leader_renew_stream_lease(GroupId, VersionOld, VersionNew), #{
type => ?leader_renew_stream_lease_msg,
version_old => VersionOld,
version_new => VersionNew,
group_id => GroupId
}).
-define(leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew), #{
type := ?leader_renew_stream_lease_msg,
version_old := VersionOld,
version_new := VersionNew,
group_id := GroupId
}).
-define(leader_update_streams(GroupId, VersionOld, VersionNew, StreamsNew), #{
type => leader_update_streams,
version_old => VersionOld,
version_new => VersionNew,
streams_new => StreamsNew,
group_id => GroupId
}).
-define(leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew), #{
type := leader_update_streams,
version_old := VersionOld,
version_new := VersionNew,
streams_new := StreamsNew,
group_id := GroupId
}).
-define(leader_invalidate(GroupId), #{
type => leader_invalidate,
group_id => GroupId
}).
-define(leader_invalidate_match(GroupId), #{
type := leader_invalidate,
group_id := GroupId
}).
%% Helpers
%% In test mode we extend agents with (session) Id to have more
%% readable traces.
-ifdef(TEST).
-define(agent(Id, Pid), {Id, Pid}).
-define(agent_pid(Agent), element(2, Agent)).
-define(agent_node(Agent), node(element(2, Agent))).
%% -ifdef(TEST).
-else.
-define(agent(Id, Pid), Pid).
-define(agent_pid(Agent), Agent).
-define(agent_node(Agent), node(Agent)).
%% -ifdef(TEST).
-endif.
-define(is_local_agent(Agent), (?agent_node(Agent) =:= node())).
-define(leader_node(Leader), node(Leader)).
-define(is_local_leader(Leader), (?leader_node(Leader) =:= node())).

View File

@ -20,12 +20,13 @@
]).
-export([
lookup_leader/2
lookup_leader/3
]).
-record(lookup_leader, {
agent :: emqx_ds_shared_sub_proto:agent(),
topic_filter :: emqx_persistent_session_ds:share_topic_filter()
agent_metadata :: emqx_ds_shared_sub_proto:agent_metadata(),
share_topic_filter :: emqx_persistent_session_ds:share_topic_filter()
}).
-define(gproc_id(ID), {n, l, ID}).
@ -35,10 +36,14 @@
%%--------------------------------------------------------------------
-spec lookup_leader(
emqx_ds_shared_sub_proto:agent(), emqx_persistent_session_ds:share_topic_filter()
emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:agent_metadata(),
emqx_persistent_session_ds:share_topic_filter()
) -> ok.
lookup_leader(Agent, TopicFilter) ->
gen_server:cast(?MODULE, #lookup_leader{agent = Agent, topic_filter = TopicFilter}).
lookup_leader(Agent, AgentMetadata, ShareTopicFilter) ->
gen_server:cast(?MODULE, #lookup_leader{
agent = Agent, agent_metadata = AgentMetadata, share_topic_filter = ShareTopicFilter
}).
%%--------------------------------------------------------------------
%% Internal API
@ -66,8 +71,15 @@ init([]) ->
handle_call(_Request, _From, State) ->
{reply, {error, unknown_request}, State}.
handle_cast(#lookup_leader{agent = Agent, topic_filter = TopicFilter}, State) ->
State1 = do_lookup_leader(Agent, TopicFilter, State),
handle_cast(
#lookup_leader{
agent = Agent,
agent_metadata = AgentMetadata,
share_topic_filter = ShareTopicFilter
},
State
) ->
State1 = do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State),
{noreply, State1}.
handle_info(_Info, State) ->
@ -80,15 +92,15 @@ terminate(_Reason, _State) ->
%% Internal functions
%%--------------------------------------------------------------------
do_lookup_leader(Agent, TopicFilter, State) ->
do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State) ->
%% TODO https://emqx.atlassian.net/browse/EMQX-12309
%% Cluster-wide unique leader election should be implemented
Id = emqx_ds_shared_sub_leader:id(TopicFilter),
Id = emqx_ds_shared_sub_leader:id(ShareTopicFilter),
LeaderPid =
case gproc:where(?gproc_id(Id)) of
undefined ->
{ok, Pid} = emqx_ds_shared_sub_leader_sup:start_leader(#{
topic_filter => TopicFilter
share_topic_filter => ShareTopicFilter
}),
{ok, NewLeaderPid} = emqx_ds_shared_sub_leader:register(
Pid,
@ -104,8 +116,10 @@ do_lookup_leader(Agent, TopicFilter, State) ->
?SLOG(info, #{
msg => lookup_leader,
agent => Agent,
topic_filter => TopicFilter,
share_topic_filter => ShareTopicFilter,
leader => LeaderPid
}),
ok = emqx_ds_shared_sub_proto:agent_connect_leader(LeaderPid, Agent, TopicFilter),
ok = emqx_ds_shared_sub_proto:agent_connect_leader(
LeaderPid, Agent, AgentMetadata, ShareTopicFilter
),
State.

View File

@ -0,0 +1,57 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_schema).
-include_lib("hocon/include/hoconsc.hrl").
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
namespace() -> emqx_shared_subs.
roots() ->
[
durable_queues
].
fields(durable_queues) ->
[
{enable,
?HOCON(
boolean(),
#{
required => false,
default => true,
desc => ?DESC(enable)
}
)},
duration(session_find_leader_timeout_ms, 1000),
duration(session_renew_lease_timeout_ms, 5000),
duration(session_min_update_stream_state_interval_ms, 500),
duration(leader_renew_lease_interval_ms, 1000),
duration(leader_renew_streams_interval_ms, 1000),
duration(leader_drop_timeout_interval_ms, 1000),
duration(leader_session_update_timeout_ms, 5000),
duration(leader_session_not_replaying_timeout_ms, 5000)
].
duration(MsFieldName, Default) ->
{MsFieldName,
?HOCON(
emqx_schema:timeout_duration_ms(),
#{
required => false,
default => Default,
desc => ?DESC(MsFieldName),
importance => ?IMPORTANCE_HIDDEN
}
)}.
desc(durable_queues) -> "Settings for durable queues".

View File

@ -0,0 +1,130 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_proto_v1).
-behaviour(emqx_bpapi).
-include_lib("emqx/include/bpapi.hrl").
-export([
introduced_in/0,
agent_connect_leader/5,
agent_update_stream_states/5,
agent_update_stream_states/6,
agent_disconnect/5,
leader_lease_streams/6,
leader_renew_stream_lease/4,
leader_renew_stream_lease/5,
leader_update_streams/6,
leader_invalidate/3
]).
introduced_in() ->
"5.8.0".
-spec agent_connect_leader(
node(),
emqx_ds_shared_sub_proto:leader(),
emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:agent_metadata(),
emqx_persistent_session_ds:share_topic_filter()
) -> ok.
agent_connect_leader(Node, ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, agent_connect_leader, [
ToLeader, FromAgent, AgentMetadata, ShareTopicFilter
]).
-spec agent_update_stream_states(
node(),
emqx_ds_shared_sub_proto:leader(),
emqx_ds_shared_sub_proto:agent(),
list(emqx_ds_shared_sub_proto:agent_stream_progress()),
emqx_ds_shared_sub_proto:version()
) -> ok.
agent_update_stream_states(Node, ToLeader, FromAgent, StreamProgresses, Version) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, agent_update_stream_states, [
ToLeader, FromAgent, StreamProgresses, Version
]).
-spec agent_update_stream_states(
node(),
emqx_ds_shared_sub_proto:leader(),
emqx_ds_shared_sub_proto:agent(),
list(emqx_ds_shared_sub_proto:agent_stream_progress()),
emqx_ds_shared_sub_proto:version(),
emqx_ds_shared_sub_proto:version()
) -> ok.
agent_update_stream_states(Node, ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, agent_update_stream_states, [
ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew
]).
-spec agent_disconnect(
node(),
emqx_ds_shared_sub_proto:leader(),
emqx_ds_shared_sub_proto:agent(),
list(emqx_ds_shared_sub_proto:agent_stream_progress()),
emqx_ds_shared_sub_proto:version()
) -> ok.
agent_disconnect(Node, ToLeader, FromAgent, StreamProgresses, Version) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, agent_disconnect, [
ToLeader, FromAgent, StreamProgresses, Version
]).
%% leader -> agent messages
-spec leader_lease_streams(
node(),
emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:group(),
emqx_ds_shared_sub_proto:leader(),
list(emqx_ds_shared_sub_proto:leader_stream_progress()),
emqx_ds_shared_sub_proto:version()
) -> ok.
leader_lease_streams(Node, ToAgent, OfGroup, Leader, Streams, Version) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, leader_lease_streams, [
ToAgent, OfGroup, Leader, Streams, Version
]).
-spec leader_renew_stream_lease(
node(),
emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:group(),
emqx_ds_shared_sub_proto:version()
) -> ok.
leader_renew_stream_lease(Node, ToAgent, OfGroup, Version) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, leader_renew_stream_lease, [ToAgent, OfGroup, Version]).
-spec leader_renew_stream_lease(
node(),
emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:group(),
emqx_ds_shared_sub_proto:version(),
emqx_ds_shared_sub_proto:version()
) -> ok.
leader_renew_stream_lease(Node, ToAgent, OfGroup, VersionOld, VersionNew) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, leader_renew_stream_lease, [
ToAgent, OfGroup, VersionOld, VersionNew
]).
-spec leader_update_streams(
node(),
emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:group(),
emqx_ds_shared_sub_proto:version(),
emqx_ds_shared_sub_proto:version(),
list(emqx_ds_shared_sub_proto:leader_stream_progress())
) -> ok.
leader_update_streams(Node, ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, leader_update_streams, [
ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew
]).
-spec leader_invalidate(node(), emqx_ds_shared_sub_proto:agent(), emqx_ds_shared_sub_proto:group()) ->
ok.
leader_invalidate(Node, ToAgent, OfGroup) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, leader_invalidate, [ToAgent, OfGroup]).

View File

@ -10,10 +10,10 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/asserts.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
@ -51,29 +51,364 @@ end_per_testcase(_TC, _Config) ->
ok.
t_lease_initial(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
%% Need to pre-create some streams in "topic/#".
%% Leader is dummy by far and won't update streams after the first lease to the agent.
%% So there should be some streams already when the agent connects.
ok = init_streams(ConnPub, <<"topic1/1">>),
ConnShared = emqtt_connect_sub(<<"client_shared">>),
{ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic1/#">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic1/1">>, <<"hello2">>, 1),
ConnPub = emqtt_connect_pub(<<"client_pub">>),
{ok, _} = emqtt:publish(ConnPub, <<"topic1/1">>, <<"hello1">>, 1),
ct:sleep(2_000),
{ok, _} = emqtt:publish(ConnPub, <<"topic1/2">>, <<"hello2">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
ok = emqtt:disconnect(ConnShared),
ok = emqtt:disconnect(ConnPub).
t_lease_reconnect(_Config) ->
t_two_clients(_Config) ->
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr4/topic4/#">>, 1),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr4/topic4/#">>, 1),
ConnPub = emqtt_connect_pub(<<"client_pub">>),
%% Need to pre-create some streams in "topic/#".
%% Leader is dummy by far and won't update streams after the first lease to the agent.
%% So there should be some streams already when the agent connects.
ok = init_streams(ConnPub, <<"topic2/2">>),
{ok, _} = emqtt:publish(ConnPub, <<"topic4/1">>, <<"hello1">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic4/2">>, <<"hello2">>, 1),
ct:sleep(2_000),
{ok, _} = emqtt:publish(ConnPub, <<"topic4/1">>, <<"hello3">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic4/2">>, <<"hello4">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello3">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello4">>}}, 10_000),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_client_loss(_Config) ->
process_flag(trap_exit, true),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr5/topic5/#">>, 1),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr5/topic5/#">>, 1),
ConnPub = emqtt_connect_pub(<<"client_pub">>),
{ok, _} = emqtt:publish(ConnPub, <<"topic5/1">>, <<"hello1">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic5/2">>, <<"hello2">>, 1),
exit(ConnShared1, kill),
{ok, _} = emqtt:publish(ConnPub, <<"topic5/1">>, <<"hello3">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic5/2">>, <<"hello4">>, 1),
?assertReceive({publish, #{payload := <<"hello3">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello4">>}}, 10_000),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_stream_revoke(_Config) ->
process_flag(trap_exit, true),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr6/topic6/#">>, 1),
ConnPub = emqtt_connect_pub(<<"client_pub">>),
{ok, _} = emqtt:publish(ConnPub, <<"topic6/1">>, <<"hello1">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic6/2">>, <<"hello2">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
?assertWaitEvent(
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr6/topic6/#">>, 1),
#{
?snk_kind := shared_sub_group_sm_leader_update_streams,
stream_progresses := [_ | _],
id := <<"client_shared2">>
},
5_000
),
{ok, _} = emqtt:publish(ConnPub, <<"topic6/1">>, <<"hello3">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic6/2">>, <<"hello4">>, 1),
?assertReceive({publish, #{payload := <<"hello3">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello4">>}}, 10_000),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_graceful_disconnect(_Config) ->
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr4/topic7/#">>, 1),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr4/topic7/#">>, 1),
ConnPub = emqtt_connect_pub(<<"client_pub">>),
{ok, _} = emqtt:publish(ConnPub, <<"topic7/1">>, <<"hello1">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic7/2">>, <<"hello2">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 2_000),
?assertReceive({publish, #{payload := <<"hello2">>}}, 2_000),
?assertWaitEvent(
ok = emqtt:disconnect(ConnShared1),
#{?snk_kind := shared_sub_leader_disconnect_agent},
1_000
),
{ok, _} = emqtt:publish(ConnPub, <<"topic7/1">>, <<"hello3">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic7/2">>, <<"hello4">>, 1),
%% Since the disconnect is graceful, the streams should rebalance quickly,
%% before the timeout.
?assertReceive({publish, #{payload := <<"hello3">>}}, 2_000),
?assertReceive({publish, #{payload := <<"hello4">>}}, 2_000),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_intensive_reassign(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr8/topic8/#">>, 1),
ct:sleep(1000),
NPubs = 10_000,
Topics = [<<"topic8/1">>, <<"topic8/2">>, <<"topic8/3">>],
ok = publish_n(ConnPub, Topics, 1, NPubs),
Self = self(),
_ = spawn_link(fun() ->
ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs),
Self ! publish_done
end),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
ConnShared3 = emqtt_connect_sub(<<"client_shared3">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr8/topic8/#">>, 1),
{ok, _, _} = emqtt:subscribe(ConnShared3, <<"$share/gr8/topic8/#">>, 1),
receive
publish_done -> ok
end,
Pubs = drain_publishes(),
ClientByBid = fun(Pid) ->
case Pid of
ConnShared1 -> <<"client_shared1">>;
ConnShared2 -> <<"client_shared2">>;
ConnShared3 -> <<"client_shared3">>
end
end,
{Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
?assertEqual([], Missing),
?assertEqual([], Duplicate),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnShared3),
ok = emqtt:disconnect(ConnPub).
t_unsubscribe(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr9/topic9/#">>, 1),
ct:sleep(1000),
NPubs = 10_000,
Topics = [<<"topic9/1">>, <<"topic9/2">>, <<"topic9/3">>],
ok = publish_n(ConnPub, Topics, 1, NPubs),
Self = self(),
_ = spawn_link(fun() ->
ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs),
Self ! publish_done
end),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr9/topic9/#">>, 1),
{ok, _, _} = emqtt:unsubscribe(ConnShared1, <<"$share/gr9/topic9/#">>),
receive
publish_done -> ok
end,
Pubs = drain_publishes(),
ClientByBid = fun(Pid) ->
case Pid of
ConnShared1 -> <<"client_shared1">>;
ConnShared2 -> <<"client_shared2">>
end
end,
{Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
?assertEqual([], Missing),
?assertEqual([], Duplicate),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_quick_resubscribe(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr10/topic10/#">>, 1),
ct:sleep(1000),
NPubs = 10_000,
Topics = [<<"topic10/1">>, <<"topic10/2">>, <<"topic10/3">>],
ok = publish_n(ConnPub, Topics, 1, NPubs),
Self = self(),
_ = spawn_link(fun() ->
ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs),
Self ! publish_done
end),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr10/topic10/#">>, 1),
ok = lists:foreach(
fun(_) ->
{ok, _, _} = emqtt:unsubscribe(ConnShared1, <<"$share/gr10/topic10/#">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr10/topic10/#">>, 1),
ct:sleep(5)
end,
lists:seq(1, 10)
),
receive
publish_done -> ok
end,
Pubs = drain_publishes(),
ClientByBid = fun(Pid) ->
case Pid of
ConnShared1 -> <<"client_shared1">>;
ConnShared2 -> <<"client_shared2">>
end
end,
{Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
?assertEqual([], Missing),
?assertEqual([], Duplicate),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_disconnect_no_double_replay1(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr11/topic11/#">>, 1),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr11/topic11/#">>, 1),
ct:sleep(1000),
NPubs = 10_000,
Topics = [<<"topic11/1">>, <<"topic11/2">>, <<"topic11/3">>],
ok = publish_n(ConnPub, Topics, 1, NPubs),
Self = self(),
_ = spawn_link(fun() ->
ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs),
Self ! publish_done
end),
ok = emqtt:disconnect(ConnShared2),
receive
publish_done -> ok
end,
Pubs = drain_publishes(),
ClientByBid = fun(Pid) ->
case Pid of
ConnShared1 -> <<"client_shared1">>;
ConnShared2 -> <<"client_shared2">>
end
end,
{Missing, _Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
?assertEqual([], Missing),
%% We cannnot garantee that the message are not duplicated until we are able
%% to send progress of a partially replayed stream range to the leader.
% ?assertEqual([], Duplicate),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnPub).
t_disconnect_no_double_replay2(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>, [{auto_ack, false}]),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr12/topic12/#">>, 1),
ct:sleep(1000),
ok = publish_n(ConnPub, [<<"topic12/1">>], 1, 20),
receive
{publish, #{payload := <<"1">>, packet_id := PacketId1}} ->
ok = emqtt:puback(ConnShared1, PacketId1)
after 5000 ->
ct:fail("No publish received")
end,
ok = emqtt:disconnect(ConnShared1),
ConnShared12 = emqtt_connect_sub(<<"client_shared12">>),
{ok, _, _} = emqtt:subscribe(ConnShared12, <<"$share/gr12/topic12/#">>, 1),
%% We cannnot garantee that the message is not duplicated until we are able
%% to send progress of a partially replayed stream range to the leader.
% ?assertNotReceive(
% {publish, #{payload := <<"1">>}},
% 3000
% ),
ok = emqtt:disconnect(ConnShared12).
t_lease_reconnect(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared = emqtt_connect_sub(<<"client_shared">>),
@ -93,7 +428,6 @@ t_lease_reconnect(_Config) ->
5_000
),
ct:sleep(1_000),
{ok, _} = emqtt:publish(ConnPub, <<"topic2/2">>, <<"hello2">>, 1),
?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
@ -114,7 +448,7 @@ t_renew_lease_timeout(_Config) ->
?wait_async_action(
ok = terminate_leaders(),
#{?snk_kind := leader_lease_streams},
5_000
10_000
),
fun(Trace) ->
?strict_causality(
@ -131,28 +465,24 @@ t_renew_lease_timeout(_Config) ->
%% Helper functions
%%--------------------------------------------------------------------
init_streams(ConnPub, Topic) ->
ConnRegular = emqtt_connect_sub(<<"client_regular">>),
{ok, _, _} = emqtt:subscribe(ConnRegular, Topic, 1),
{ok, _} = emqtt:publish(ConnPub, Topic, <<"hello1">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 5_000),
ok = emqtt:disconnect(ConnRegular).
emqtt_connect_sub(ClientId) ->
{ok, C} = emqtt:start_link([
{client_id, ClientId},
{clean_start, true},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 7_200}}
]),
emqtt_connect_sub(ClientId, []).
emqtt_connect_sub(ClientId, Options) ->
{ok, C} = emqtt:start_link(
[
{clientid, ClientId},
{clean_start, true},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 7_200}}
] ++ Options
),
{ok, _} = emqtt:connect(C),
C.
emqtt_connect_pub(ClientId) ->
{ok, C} = emqtt:start_link([
{client_id, ClientId},
{clientid, ClientId},
{clean_start, true},
{proto_ver, v5}
]),
@ -163,3 +493,53 @@ terminate_leaders() ->
ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
{ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
ok.
publish_n(_Conn, _Topics, From, To) when From > To ->
ok;
publish_n(Conn, [Topic | RestTopics], From, To) ->
{ok, _} = emqtt:publish(Conn, Topic, integer_to_binary(From), 1),
publish_n(Conn, RestTopics ++ [Topic], From + 1, To).
drain_publishes() ->
drain_publishes([]).
drain_publishes(Acc) ->
receive
{publish, Msg} ->
drain_publishes([Msg | Acc])
after 5_000 ->
lists:reverse(Acc)
end.
verify_received_pubs(Pubs, NPubs, ClientByBid) ->
Messages = lists:foldl(
fun(#{payload := Payload, client_pid := Pid}, Acc) ->
maps:update_with(
binary_to_integer(Payload),
fun(Clients) ->
[ClientByBid(Pid) | Clients]
end,
[ClientByBid(Pid)],
Acc
)
end,
#{},
Pubs
),
Missing = lists:filter(
fun(N) -> not maps:is_key(N, Messages) end,
lists:seq(1, NPubs)
),
Duplicate = lists:filtermap(
fun(N) ->
case Messages of
#{N := [_]} -> false;
#{N := [_ | _] = Clients} -> {true, {N, Clients}};
_ -> false
end
end,
lists:seq(1, NPubs)
),
{Missing, Duplicate}.

View File

@ -0,0 +1,140 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-import(
emqx_mgmt_api_test_util,
[
request_api/2,
request/3,
uri/1
]
).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx, #{
config => #{
<<"durable_sessions">> => #{
<<"enable">> => true,
<<"renew_streams_interval">> => "100ms"
},
<<"durable_storage">> => #{
<<"messages">> => #{
<<"backend">> => <<"builtin_raft">>
}
}
}
}},
emqx_ds_shared_sub,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => ?config(priv_dir, Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)),
ok.
init_per_testcase(_TC, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(_TC, _Config) ->
ok = snabbkaffe:stop(),
ok = terminate_leaders(),
ok.
%%--------------------------------------------------------------------
%% Tests
%%--------------------------------------------------------------------
t_basic_crud(_Config) ->
?assertMatch(
{ok, []},
api_get(["durable_queues"])
),
?assertMatch(
{ok, 200, #{
<<"id">> := <<"q1">>
}},
api(put, ["durable_queues", "q1"], #{})
),
?assertMatch(
{error, {_, 404, _}},
api_get(["durable_queues", "q2"])
),
?assertMatch(
{ok, 200, #{
<<"id">> := <<"q2">>
}},
api(put, ["durable_queues", "q2"], #{})
),
?assertMatch(
{ok, #{
<<"id">> := <<"q2">>
}},
api_get(["durable_queues", "q2"])
),
?assertMatch(
{ok, [#{<<"id">> := <<"q2">>}, #{<<"id">> := <<"q1">>}]},
api_get(["durable_queues"])
),
?assertMatch(
{ok, 200, <<"Queue deleted">>},
api(delete, ["durable_queues", "q2"], #{})
),
?assertMatch(
{ok, [#{<<"id">> := <<"q1">>}]},
api_get(["durable_queues"])
).
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
api_get(Path) ->
case request_api(get, uri(Path)) of
{ok, ResponseBody} ->
{ok, jiffy:decode(list_to_binary(ResponseBody), [return_maps])};
{error, _} = Error ->
Error
end.
api(Method, Path, Data) ->
case request(Method, uri(Path), Data) of
{ok, Code, ResponseBody} ->
Res =
case emqx_utils_json:safe_decode(ResponseBody, [return_maps]) of
{ok, Decoded} -> Decoded;
{error, _} -> ResponseBody
end,
{ok, Code, Res};
{error, _} = Error ->
Error
end.
terminate_leaders() ->
ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
{ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
ok.

View File

@ -0,0 +1,62 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_config_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/asserts.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
emqx_conf,
{emqx, #{
config => #{
<<"durable_sessions">> => #{
<<"enable">> => true,
<<"renew_streams_interval">> => "100ms"
},
<<"durable_storage">> => #{
<<"messages">> => #{
<<"backend">> => <<"builtin_raft">>
}
}
}
}},
{emqx_ds_shared_sub, #{
config => #{
<<"durable_queues">> => #{
<<"enable">> => true,
<<"session_find_leader_timeout_ms">> => "1200ms"
}
}
}}
],
#{work_dir => ?config(priv_dir, Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)),
ok.
t_update_config(_Config) ->
?assertEqual(
1200,
emqx_ds_shared_sub_config:get(session_find_leader_timeout_ms)
),
{ok, _} = emqx_conf:update([durable_queues], #{session_find_leader_timeout_ms => 2000}, #{}),
?assertEqual(
2000,
emqx_ds_shared_sub_config:get(session_find_leader_timeout_ms)
).

View File

@ -0,0 +1,125 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_mgmt_api_subscription_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CLIENTID, <<"api_clientid">>).
-define(USERNAME, <<"api_username">>).
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"durable_sessions {\n"
" enable = true\n"
" renew_streams_interval = 10ms\n"
"}"},
{emqx_ds_shared_sub, #{
config => #{
<<"durable_queues">> => #{
<<"enable">> => true,
<<"session_find_leader_timeout_ms">> => "1200ms"
}
}
}},
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(_TC, Config) ->
ClientConfig = #{
username => ?USERNAME,
clientid => ?CLIENTID,
proto_ver => v5,
clean_start => true,
properties => #{'Session-Expiry-Interval' => 300}
},
{ok, Client} = emqtt:start_link(ClientConfig),
{ok, _} = emqtt:connect(Client),
[{client_config, ClientConfig}, {client, Client} | Config].
end_per_testcase(_TC, Config) ->
Client = proplists:get_value(client, Config),
emqtt:disconnect(Client).
t_list_with_shared_sub(_Config) ->
Client = proplists:get_value(client, _Config),
RealTopic = <<"t/+">>,
Topic = <<"$share/g1/", RealTopic/binary>>,
{ok, _, _} = emqtt:subscribe(Client, Topic),
{ok, _, _} = emqtt:subscribe(Client, RealTopic),
QS0 = [
{"clientid", ?CLIENTID},
{"match_topic", "t/#"}
],
Headers = emqx_mgmt_api_test_util:auth_header_(),
?assertMatch(
#{<<"data">> := [#{<<"clientid">> := ?CLIENTID}, #{<<"clientid">> := ?CLIENTID}]},
request_json(get, QS0, Headers)
),
QS1 = [
{"clientid", ?CLIENTID},
{"share_group", "g1"}
],
?assertMatch(
#{<<"data">> := [#{<<"clientid">> := ?CLIENTID, <<"topic">> := <<"$share/g1/t/+">>}]},
request_json(get, QS1, Headers)
).
t_list_with_invalid_match_topic(Config) ->
Client = proplists:get_value(client, Config),
RealTopic = <<"t/+">>,
Topic = <<"$share/g1/", RealTopic/binary>>,
{ok, _, _} = emqtt:subscribe(Client, Topic),
{ok, _, _} = emqtt:subscribe(Client, RealTopic),
QS = [
{"clientid", ?CLIENTID},
{"match_topic", "$share/g1/t/1"}
],
Headers = emqx_mgmt_api_test_util:auth_header_(),
?assertMatch(
{error,
{{_, 400, _}, _, #{
<<"message">> := <<"match_topic_invalid">>,
<<"code">> := <<"INVALID_PARAMETER">>
}}},
begin
{error, {R, _H, Body}} = emqx_mgmt_api_test_util:request_api(
get, path(), uri_string:compose_query(QS), Headers, [], #{return_all => true}
),
{error, {R, _H, emqx_utils_json:decode(Body, [return_maps])}}
end
),
ok.
request_json(Method, Query, Headers) when is_list(Query) ->
Qs = uri_string:compose_query(Query),
{ok, MatchRes} = emqx_mgmt_api_test_util:request_api(Method, path(), Qs, Headers),
emqx_utils_json:decode(MatchRes, [return_maps]).
path() ->
emqx_mgmt_api_test_util:api_path(["subscriptions"]).

View File

@ -18,7 +18,8 @@
emqx_schema_registry_schema,
emqx_schema_validation_schema,
emqx_message_transformation_schema,
emqx_ft_schema
emqx_ft_schema,
emqx_ds_shared_sub_schema
]).
%% Callback to upgrade config after loaded from config file but before validation.

View File

@ -242,20 +242,25 @@ do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} =
%% TODO: filtering by client ID can be implemented more efficiently:
FilterTopic = maps:get(<<"topic">>, QString, '_'),
Stream0 = emqx_persistent_session_ds_router:stream(FilterTopic),
SubPred = fun(Sub) ->
compare_optional(<<"topic">>, QString, topic, Sub) andalso
compare_optional(<<"topic">>, QString, '_real_topic', Sub) andalso
compare_optional(<<"clientid">>, QString, clientid, Sub) andalso
compare_optional(<<"qos">>, QString, qos, Sub) andalso
compare_match_topic_optional(<<"match_topic">>, QString, topic, Sub)
compare_optional(<<"share_group">>, QString, '_group', Sub) andalso
compare_match_topic_optional(<<"match_topic">>, QString, '_real_topic', Sub)
end,
NDropped = (Page - 1) * Limit,
{_, Stream} = consume_n_matching(
fun persistent_route_to_subscription/1, SubPred, NDropped, Stream0
),
{Subscriptions, Stream1} = consume_n_matching(
{Subscriptions0, Stream1} = consume_n_matching(
fun persistent_route_to_subscription/1, SubPred, Limit, Stream
),
HasNext = Stream1 =/= [],
Subscriptions1 = lists:map(
fun remove_temp_match_fields/1, Subscriptions0
),
Meta =
case maps:is_key(<<"match_topic">>, QString) orelse maps:is_key(<<"qos">>, QString) of
true ->
@ -276,7 +281,7 @@ do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} =
#{
meta => Meta,
data => Subscriptions
data => Subscriptions1
}.
compare_optional(QField, Query, SField, Subscription) ->
@ -328,29 +333,63 @@ consume_n_matching(Map, Pred, N, S0, Acc) ->
end
end.
persistent_route_to_subscription(#route{topic = Topic, dest = SessionId}) ->
case emqx_persistent_session_ds:get_client_subscription(SessionId, Topic) of
#{subopts := SubOpts} ->
#{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts,
#{
topic => Topic,
clientid => SessionId,
node => all,
persistent_route_to_subscription(#route{dest = Dest} = Route) ->
Sub =
case get_client_subscription(Route) of
#{subopts := SubOpts} ->
#{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts,
#{
topic => format_topic(Route),
clientid => session_id(Dest),
node => all,
qos => Qos,
nl => Nl,
rh => Rh,
rap => Rap,
durable => true
};
undefined ->
#{
topic => Topic,
clientid => SessionId,
node => all,
durable => true
}
end.
qos => Qos,
nl => Nl,
rh => Rh,
rap => Rap,
durable => true
};
undefined ->
#{
topic => format_topic(Route),
clientid => session_id(Dest),
node => all,
durable => true
}
end,
add_temp_match_fields(Route, Sub).
get_client_subscription(#route{
topic = Topic, dest = #share_dest{session_id = SessionId, group = Group}
}) ->
emqx_persistent_session_ds:get_client_subscription(SessionId, #share{
topic = Topic, group = Group
});
get_client_subscription(#route{topic = Topic, dest = SessionId}) ->
emqx_persistent_session_ds:get_client_subscription(SessionId, Topic).
session_id(#share_dest{session_id = SessionId}) -> SessionId;
session_id(SessionId) -> SessionId.
add_temp_match_fields(Route, Sub) ->
add_temp_match_fields(['_real_topic', '_group'], Route, Sub).
add_temp_match_fields([], _Route, Sub) ->
Sub;
add_temp_match_fields(['_real_topic' | Rest], #route{topic = Topic} = Route, Sub) ->
add_temp_match_fields(Rest, Route, Sub#{'_real_topic' => Topic});
add_temp_match_fields(['_group' | Rest], #route{dest = #share_dest{group = Group}} = Route, Sub) ->
add_temp_match_fields(Rest, Route, Sub#{'_group' => Group});
add_temp_match_fields(['_group' | Rest], Route, Sub) ->
add_temp_match_fields(Rest, Route, Sub#{'_group' => undefined}).
remove_temp_match_fields(Sub) ->
maps:without(['_real_topic', '_group'], Sub).
format_topic(#route{topic = Topic, dest = #share_dest{group = Group}}) ->
<<"$share/", Group/binary, "/", Topic/binary>>;
format_topic(#route{topic = Topic}) ->
Topic.
%% @private This function merges paginated results from two sources.
%%

View File

@ -47,9 +47,12 @@ groups() ->
CommonTCs = AllTCs -- persistent_only_tcs(),
[
{mem, CommonTCs},
%% Shared subscriptions are currently not supported:
%% Persistent shared subscriptions are an EE app.
%% So they are tested outside emqx_management app which is CE.
{persistent,
(CommonTCs -- [t_list_with_shared_sub, t_subscription_api]) ++ persistent_only_tcs()}
(CommonTCs --
[t_list_with_shared_sub, t_list_with_invalid_match_topic, t_subscription_api]) ++
persistent_only_tcs()}
].
persistent_only_tcs() ->

View File

@ -0,0 +1,34 @@
emqx_ds_shared_sub_api {
param_queue_id.desc:
"""The ID of the durable queue."""
param_queue_id.label:
"""Queue ID"""
durable_queues_get.desc:
"""Get the list of durable queues."""
durable_queues_get.label:
"""Durable Queues"""
durable_queue_get.desc:
"""Get the information of a durable queue."""
durable_queue_get.label:
"""Durable Queue"""
durable_queue_delete.desc:
"""Delete a durable queue."""
durable_queue_delete.label:
"""Delete Durable Queue"""
durable_queues_put.desc:
"""Create a durable queue."""
durable_queues_put.label:
"""Create Durable Queue"""
}

View File

@ -0,0 +1,63 @@
emqx_ds_shared_sub_schema {
enable.desc:
"""Enable the shared subscription feature."""
enable.label:
"""Enable Shared Subscription"""
session_find_leader_timeout_ms.desc:
"""The timeout in milliseconds for the session to find a leader.
If the session cannot find a leader within this time, the session will retry."""
session_find_leader_timeout_ms.label:
"""Session Find Leader Timeout"""
session_renew_lease_timeout_ms.desc:
"""The timeout in milliseconds for the session to wait for the leader to renew the lease.
If the leader does not renew the lease within this time, the session will consider
the leader as lost and try to find a new leader."""
session_renew_lease_timeout_ms.label:
"""Session Renew Lease Timeout"""
session_min_update_stream_state_interval_ms.desc:
"""The minimum interval in milliseconds for the session to update the stream state.
If session has no updates for the stream state within this time, the session will
send empty updates."""
session_min_update_stream_state_interval_ms.label:
"""Session Min Update Stream State Interval"""
leader_renew_lease_interval_ms.desc:
"""The interval in milliseconds for the leader to renew the lease."""
leader_renew_lease_interval_ms.label:
"""Leader Renew Lease Interval"""
leader_renew_streams_interval_ms.desc:
"""The interval in milliseconds for the leader to renew the streams."""
leader_renew_streams_interval_ms.label:
"""Leader Renew Streams Interval"""
leader_drop_timeout_interval_ms.desc:
"""The interval in milliseconds for the leader to drop non-responsive sessions."""
leader_drop_timeout_interval_ms.label:
"""Leader Drop Timeout Interval"""
leader_session_update_timeout_ms.desc:
"""The timeout in milliseconds for the leader to wait for the session to update the stream state.
If the session does not update the stream state within this time, the leader will drop the session."""
leader_session_update_timeout_ms.label:
"""Leader Session Update Timeout"""
leader_session_not_replaying_timeout_ms.desc:
"""The timeout in milliseconds for the leader to wait for the session leave intermediate states."""
leader_session_not_replaying_timeout_ms.label:
"""Leader Session Not Replaying Timeout"""
}