feat(queue): implement unsubscribe

This commit is contained in:
Ilya Averyanov 2024-07-02 23:14:42 +03:00
parent 9bde981c44
commit b4a010d63b
6 changed files with 326 additions and 117 deletions

View File

@ -621,9 +621,13 @@ handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) ->
Session = replay_streams(Session0, ClientInfo), Session = replay_streams(Session0, ClientInfo),
{ok, [], Session}; {ok, [], Session};
handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) -> handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) ->
S1 = emqx_persistent_session_ds_subs:gc(S0), %% `gc` and `renew_streams` methods may drop unsubscribed streams.
S2 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1), %% Shared subscription handler must have a chance to see unsubscribed streams
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S2, SharedSubS0), %% in the fully replayed state.
{S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0),
S2 = emqx_persistent_session_ds_subs:gc(S1),
S3 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2),
{S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S3, SharedSubS1),
Interval = get_config(ClientInfo, [renew_streams_interval]), Interval = get_config(ClientInfo, [renew_streams_interval]),
Session = emqx_session:ensure_timer( Session = emqx_session:ensure_timer(
?TIMER_GET_STREAMS, ?TIMER_GET_STREAMS,

View File

@ -24,8 +24,24 @@
to_map/2 to_map/2
]). ]).
-define(schedule_subscribe, schedule_subscribe).
-define(schedule_unsubscribe, schedule_unsubscribe).
-type stream_key() :: {emqx_persistent_session_ds:id(), emqx_ds:stream()}.
-type scheduled_action_type() ::
{?schedule_subscribe, emqx_types:subopts()} | ?schedule_unsubscribe.
-type scheduled_action() :: #{
type := scheduled_action_type(),
stream_keys_to_wait := [stream_key()],
progresses := [emqx_ds_shared_sub_proto:agent_stream_progress()]
}.
-type t() :: #{ -type t() :: #{
agent := emqx_persistent_session_ds_shared_subs_agent:t() agent := emqx_persistent_session_ds_shared_subs_agent:t(),
scheduled_actions := #{
share_topic_filter() => scheduled_action()
}
}. }.
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{ -type opts() :: #{
@ -44,7 +60,8 @@ new(Opts) ->
#{ #{
agent => emqx_persistent_session_ds_shared_subs_agent:new( agent => emqx_persistent_session_ds_shared_subs_agent:new(
agent_opts(Opts) agent_opts(Opts)
) ),
scheduled_actions => #{}
}. }.
-spec open(emqx_persistent_session_ds_state:t(), opts()) -> -spec open(emqx_persistent_session_ds_state:t(), opts()) ->
@ -80,32 +97,29 @@ on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) ->
) -> ) ->
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()} {ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
| {error, emqx_types:reason_code()}. | {error, emqx_types:reason_code()}.
on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) -> on_unsubscribe(SessionId, TopicFilter, S0, SharedSubS0) ->
case lookup(TopicFilter, S0) of case lookup(TopicFilter, S0) of
undefined -> undefined ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED}; {error, ?RC_NO_SUBSCRIPTION_EXISTED};
Subscription -> #{id := SubId} = Subscription ->
?tp(persistent_session_ds_subscription_delete, #{ ?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, topic_filter => TopicFilter session_id => SessionId, topic_filter => TopicFilter
}), }),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
Agent0, TopicFilter
),
SharedSubS = SharedSubS0#{agent => Agent1},
S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0),
SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, TopicFilter),
{ok, S, SharedSubS, Subscription} {ok, S, SharedSubS, Subscription}
end. end.
-spec renew_streams(emqx_persistent_session_ds_state:t(), t()) -> -spec renew_streams(emqx_persistent_session_ds_state:t(), t()) ->
{emqx_persistent_session_ds_state:t(), t()}. {emqx_persistent_session_ds_state:t(), t()}.
renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = SharedSubS0) ->
{StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams( {StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams(
Agent0 Agent0
), ),
?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}), ?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}),
S1 = lists:foldl( S1 = lists:foldl(
fun fun
(#{type := lease} = Event, S) -> accept_stream(Event, S); (#{type := lease} = Event, S) -> accept_stream(Event, S, ScheduledActions);
(#{type := revoke} = Event, S) -> revoke_stream(Event, S) (#{type := revoke} = Event, S) -> revoke_stream(Event, S)
end, end,
S0, S0,
@ -118,19 +132,23 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) ->
emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds_state:t(),
t() t()
) -> {emqx_persistent_session_ds_state:t(), t()}. ) -> {emqx_persistent_session_ds_state:t(), t()}.
on_streams_replay(S, #{agent := Agent0} = SharedSubS0) -> on_streams_replay(S, #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0) ->
Progresses = stream_progresses(S), Progresses = stream_progresses(S),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress( Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, Progresses Agent0, Progresses
), ),
SharedSubS1 = SharedSubS0#{agent => Agent1}, {Agent2, ScheduledActions1} = run_scheduled_actions(S, Agent1, ScheduledActions0),
SharedSubS1 = SharedSubS0#{
agent => Agent2,
scheduled_actions => ScheduledActions1
},
{S, SharedSubS1}. {S, SharedSubS1}.
on_disconnect(S0, #{agent := Agent0} = SharedSubS0) -> on_disconnect(S0, #{agent := Agent0} = SharedSubS0) ->
S1 = revoke_all_streams(S0), S1 = revoke_all_streams(S0),
Progresses = stream_progresses(S1), Progresses = stream_progresses(S1),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses),
SharedSubS1 = SharedSubS0#{agent => Agent1}, SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}},
{S1, SharedSubS1}. {S1, SharedSubS1}.
-spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) -> -spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) ->
@ -149,9 +167,79 @@ to_map(_S, _SharedSubS) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
run_scheduled_actions(S, Agent, ScheduledActions) ->
maps:fold(
fun(TopicFilter, Action0, {AgentAcc0, ScheduledActionsAcc}) ->
case run_scheduled_action(S, AgentAcc0, TopicFilter, Action0) of
{ok, AgentAcc1} ->
{AgentAcc1, maps:remove(TopicFilter, ScheduledActionsAcc)};
{continue, Action1} ->
{AgentAcc0, ScheduledActionsAcc#{TopicFilter => Action1}}
end
end,
{Agent, ScheduledActions},
ScheduledActions
).
run_scheduled_action(
S,
Agent,
TopicFilter,
#{type := Type, stream_keys_to_wait := StreamKeysToWait0, progresses := Progresses0} = Action
) ->
StreamKeysToWait1 = lists:filter(
fun({_SubId, _Stream} = Key) ->
case emqx_persistent_session_ds_state:get_stream(Key, S) of
undefined ->
%% This should not happen: we should see any stream
%% in completed state before deletion
true;
SRS ->
not is_stream_fully_acked(S, SRS)
end
end,
StreamKeysToWait0
),
Progresses1 =
lists:map(
fun({_SubId, Stream} = Key) ->
#srs{it_end = ItEnd} = SRS = emqx_persistent_session_ds_state:get_stream(Key, S),
#{
stream => Stream,
iterator => ItEnd,
use_finished => is_use_finished(S, SRS)
}
end,
(StreamKeysToWait0 -- StreamKeysToWait1)
) ++ Progresses0,
case StreamKeysToWait1 of
[] ->
case Type of
{?schedule_subscribe, SubOpts} ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent, TopicFilter, SubOpts
)};
?schedule_unsubscribe ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
Agent, TopicFilter, Progresses1
)}
end;
_ ->
{continue, Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1}}
end.
stream_progresses(S) -> stream_progresses(S) ->
fold_shared_stream_states( fold_shared_stream_states(
fun(TopicFilter, Stream, SRS, Acc) -> fun(
#share{group = Group},
Stream,
SRS,
ProgressesAcc0
) ->
#srs{it_end = EndIt} = SRS, #srs{it_end = EndIt} = SRS,
case is_stream_fully_acked(S, SRS) of case is_stream_fully_acked(S, SRS) of
@ -159,17 +247,22 @@ stream_progresses(S) ->
%% TODO %% TODO
%% Is it sufficient for a report? %% Is it sufficient for a report?
StreamProgress = #{ StreamProgress = #{
topic_filter => TopicFilter,
stream => Stream, stream => Stream,
iterator => EndIt, iterator => EndIt,
use_finished => is_use_finished(S, SRS) use_finished => is_use_finished(S, SRS),
is_fully_acked => true
}, },
[StreamProgress | Acc]; maps:update_with(
Group,
fun(Progresses) -> [StreamProgress | Progresses] end,
[StreamProgress],
ProgressesAcc0
);
false -> false ->
Acc ProgressesAcc0
end end
end, end,
[], #{},
S S
). ).
@ -222,14 +315,16 @@ on_subscribe(Subscription, TopicFilter, SubOpts, Session) ->
-dialyzer({nowarn_function, create_new_subscription/3}). -dialyzer({nowarn_function, create_new_subscription/3}).
create_new_subscription(TopicFilter, SubOpts, #{ create_new_subscription(TopicFilter, SubOpts, #{
id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props s := S0,
shared_sub_s := #{agent := Agent} = SharedSubS0,
props := Props
}) -> }) ->
case case
emqx_persistent_session_ds_shared_subs_agent:on_subscribe( emqx_persistent_session_ds_shared_subs_agent:can_subscribe(
Agent0, TopicFilter, SubOpts Agent, TopicFilter, SubOpts
) )
of of
{ok, Agent1} -> ok ->
#{upgrade_qos := UpgradeQoS} = Props, #{upgrade_qos := UpgradeQoS} = Props,
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
@ -247,10 +342,7 @@ create_new_subscription(TopicFilter, SubOpts, #{
S = emqx_persistent_session_ds_state:put_subscription( S = emqx_persistent_session_ds_state:put_subscription(
TopicFilter, Subscription, S3 TopicFilter, Subscription, S3
), ),
SharedSubS = SharedSubS0#{agent => Agent1}, SharedSubS = schedule_subscribe(SharedSubS0, TopicFilter, SubOpts),
?tp(persistent_session_ds_shared_subscription_added, #{
topic_filter => TopicFilter, session => SessionId
}),
{ok, S, SharedSubS}; {ok, S, SharedSubS};
{error, _} = Error -> {error, _} = Error ->
Error Error
@ -289,15 +381,25 @@ lookup(TopicFilter, S) ->
undefined undefined
end. end.
accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) ->
%% If we have a pending action (subscribe or unsubscribe) for this topic filter,
%% we should not accept a stream and start replay it. We won't use it anyway:
%% * if subscribe is pending, we will reset agent obtain a new lease
%% * if unsubscribe is pending, we will drop connection
case ScheduledActions of
#{TopicFilter := _Action} ->
S;
_ ->
accept_stream(Event, S)
end.
accept_stream( accept_stream(
#{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0 #{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0
) -> ) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
undefined -> undefined ->
%% This should not happen. %% We unsubscribed
%% Agent should have received unsubscribe callback S0;
%% and should not have passed this stream as a new one
error(new_stream_without_sub);
#{id := SubId, current_state := SStateId} -> #{id := SubId, current_state := SStateId} ->
Key = {SubId, Stream}, Key = {SubId, Stream},
case emqx_persistent_session_ds_state:get_stream(Key, S0) of case emqx_persistent_session_ds_state:get_stream(Key, S0) of
@ -347,6 +449,57 @@ revoke_all_streams(S0) ->
S0 S0
). ).
schedule_subscribe(
#{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0, TopicFilter, SubOpts
) ->
case ScheduledActions0 of
#{TopicFilter := ScheduledAction} ->
ScheduledActions1 = ScheduledActions0#{
TopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
},
SharedSubS0#{scheduled_actions := ScheduledActions1};
_ ->
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent0, TopicFilter, SubOpts
),
SharedSubS0#{agent => Agent1}
end.
schedule_unsubscribe(
S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, TopicFilter
) ->
case ScheduledActions0 of
#{TopicFilter := ScheduledAction} ->
ScheduledActions1 = ScheduledActions0#{
TopicFilter => ScheduledAction#{type => ?schedule_unsubscribe}
},
SharedSubS0#{scheduled_actions := ScheduledActions1};
_ ->
StreamIdsToFinalize = stream_ids_by_sub_id(S, UnsubscridedSubId),
ScheduledActions1 = ScheduledActions0#{
TopicFilter => #{
type => ?schedule_unsubscribe,
stream_keys_to_wait => StreamIdsToFinalize,
progresses => []
}
},
SharedSubS0#{scheduled_actions := ScheduledActions1}
end.
stream_ids_by_sub_id(S, MatchSubId) ->
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, _Stream} = StreamStateId, _SRS, StreamStateIds) ->
case SubId of
MatchSubId ->
[StreamStateId | StreamStateIds];
_ ->
StreamStateIds
end
end,
[],
S
).
-spec to_agent_subscription( -spec to_agent_subscription(
emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription() emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()
) -> ) ->

View File

@ -59,9 +59,10 @@
-export([ -export([
new/1, new/1,
open/2, open/2,
can_subscribe/3,
on_subscribe/3, on_subscribe/3,
on_unsubscribe/2, on_unsubscribe/3,
on_stream_progress/2, on_stream_progress/2,
on_info/2, on_info/2,
on_disconnect/2, on_disconnect/2,
@ -80,12 +81,12 @@
-callback new(opts()) -> t(). -callback new(opts()) -> t().
-callback open([{topic_filter(), subscription()}], opts()) -> t(). -callback open([{topic_filter(), subscription()}], opts()) -> t().
-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> -callback can_subscribe(t(), topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
{ok, t()} | {error, term()}. -callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> t().
-callback on_unsubscribe(t(), topic_filter()) -> t(). -callback on_unsubscribe(t(), topic_filter(), [stream_progress()]) -> t().
-callback on_disconnect(t(), [stream_progress()]) -> t(). -callback on_disconnect(t(), #{emqx_types:group() => [stream_progress()]}) -> t().
-callback renew_streams(t()) -> {[stream_lease_event()], t()}. -callback renew_streams(t()) -> {[stream_lease_event()], t()}.
-callback on_stream_progress(t(), [stream_progress()]) -> t(). -callback on_stream_progress(t(), #{emqx_types:group() => [stream_progress()]}) -> t().
-callback on_info(t(), term()) -> t(). -callback on_info(t(), term()) -> t().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -100,16 +101,19 @@ new(Opts) ->
open(Topics, Opts) -> open(Topics, Opts) ->
?shared_subs_agent:open(Topics, Opts). ?shared_subs_agent:open(Topics, Opts).
-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> -spec can_subscribe(t(), topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
{ok, t()} | {error, emqx_types:reason_code()}. can_subscribe(Agent, TopicFilter, SubOpts) ->
?shared_subs_agent:can_subscribe(Agent, TopicFilter, SubOpts).
-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> t().
on_subscribe(Agent, TopicFilter, SubOpts) -> on_subscribe(Agent, TopicFilter, SubOpts) ->
?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts). ?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts).
-spec on_unsubscribe(t(), topic_filter()) -> t(). -spec on_unsubscribe(t(), topic_filter(), [stream_progress()]) -> t().
on_unsubscribe(Agent, TopicFilter) -> on_unsubscribe(Agent, TopicFilter, StreamProgresses) ->
?shared_subs_agent:on_unsubscribe(Agent, TopicFilter). ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter, StreamProgresses).
-spec on_disconnect(t(), [stream_progress()]) -> t(). -spec on_disconnect(t(), #{emqx_types:group() => [stream_progress()]}) -> t().
on_disconnect(Agent, StreamProgresses) -> on_disconnect(Agent, StreamProgresses) ->
?shared_subs_agent:on_disconnect(Agent, StreamProgresses). ?shared_subs_agent:on_disconnect(Agent, StreamProgresses).
@ -117,7 +121,7 @@ on_disconnect(Agent, StreamProgresses) ->
renew_streams(Agent) -> renew_streams(Agent) ->
?shared_subs_agent:renew_streams(Agent). ?shared_subs_agent:renew_streams(Agent).
-spec on_stream_progress(t(), [stream_progress()]) -> t(). -spec on_stream_progress(t(), #{emqx_types:group() => [stream_progress()]}) -> t().
on_stream_progress(Agent, StreamProgress) -> on_stream_progress(Agent, StreamProgress) ->
?shared_subs_agent:on_stream_progress(Agent, StreamProgress). ?shared_subs_agent:on_stream_progress(Agent, StreamProgress).

View File

@ -9,9 +9,10 @@
-export([ -export([
new/1, new/1,
open/2, open/2,
can_subscribe/3,
on_subscribe/3, on_subscribe/3,
on_unsubscribe/2, on_unsubscribe/3,
on_stream_progress/2, on_stream_progress/2,
on_info/2, on_info/2,
on_disconnect/2, on_disconnect/2,
@ -31,10 +32,13 @@ new(_Opts) ->
open(_Topics, _Opts) -> open(_Topics, _Opts) ->
undefined. undefined.
on_subscribe(_Agent, _TopicFilter, _SubOpts) -> can_subscribe(_Agent, _TopicFilter, _SubOpts) ->
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}. {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}.
on_unsubscribe(Agent, _TopicFilter) -> on_subscribe(Agent, _TopicFilter, _SubOpts) ->
Agent.
on_unsubscribe(Agent, _TopicFilter, _Progresses) ->
Agent. Agent.
on_disconnect(Agent, _) -> on_disconnect(Agent, _) ->

View File

@ -12,9 +12,10 @@
-export([ -export([
new/1, new/1,
open/2, open/2,
can_subscribe/3,
on_subscribe/3, on_subscribe/3,
on_unsubscribe/2, on_unsubscribe/3,
on_stream_progress/2, on_stream_progress/2,
on_info/2, on_info/2,
on_disconnect/2, on_disconnect/2,
@ -47,40 +48,38 @@ open(TopicSubscriptions, Opts) ->
), ),
State1. State1.
on_subscribe(State0, TopicFilter, _SubOpts) -> can_subscribe(_State, _TopicFilter, _SubOpts) ->
State1 = add_group_subscription(State0, TopicFilter), ok.
{ok, State1}.
on_unsubscribe(State, TopicFilter) -> on_subscribe(State0, TopicFilter, _SubOpts) ->
delete_group_subscription(State, TopicFilter). add_group_subscription(State0, TopicFilter).
on_unsubscribe(State, TopicFilter, GroupProgress) ->
delete_group_subscription(State, TopicFilter, GroupProgress).
renew_streams(#{} = State) -> renew_streams(#{} = State) ->
fetch_stream_events(State). fetch_stream_events(State).
on_stream_progress(State, StreamProgresses) -> on_stream_progress(State, StreamProgresses) ->
ProgressesByGroup = stream_progresses_by_group(StreamProgresses), maps:fold(
lists:foldl( fun(Group, GroupProgresses, StateAcc) ->
fun({Group, GroupProgresses}, StateAcc) ->
with_group_sm(StateAcc, Group, fun(GSM) -> with_group_sm(StateAcc, Group, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_stream_progress(GSM, GroupProgresses) emqx_ds_shared_sub_group_sm:handle_stream_progress(GSM, GroupProgresses)
end) end)
end, end,
State, State,
maps:to_list(ProgressesByGroup) StreamProgresses
). ).
on_disconnect(#{groups := Groups0} = State, StreamProgresses) -> on_disconnect(#{groups := Groups0} = State, StreamProgresses) ->
ProgressesByGroup = stream_progresses_by_group(StreamProgresses), ok = maps:foreach(
Groups1 = maps:fold( fun(Group, GroupSM0) ->
fun(Group, GroupSM0, GroupsAcc) -> GroupProgresses = maps:get(Group, StreamProgresses, []),
GroupProgresses = maps:get(Group, ProgressesByGroup, []), emqx_ds_shared_sub_group_sm:handle_disconnect(GroupSM0, GroupProgresses)
GroupSM1 = emqx_ds_shared_sub_group_sm:handle_disconnect(GroupSM0, GroupProgresses),
GroupsAcc#{Group => GroupSM1}
end, end,
#{},
Groups0 Groups0
), ),
State#{groups => Groups1}. State#{groups => #{}}.
on_info(State, ?leader_lease_streams_match(Group, Leader, StreamProgresses, Version)) -> on_info(State, ?leader_lease_streams_match(Group, Leader, StreamProgresses, Version)) ->
?SLOG(info, #{ ?SLOG(info, #{
@ -152,9 +151,14 @@ init_state(Opts) ->
groups => #{} groups => #{}
}. }.
delete_group_subscription(State, _ShareTopicFilter) -> delete_group_subscription(State, #share{group = Group}, GroupProgress) ->
%% TODO https://emqx.atlassian.net/browse/EMQX-12572 case State of
State. #{groups := #{Group := GSM} = Groups} ->
_ = emqx_ds_shared_sub_group_sm:handle_disconnect(GSM, GroupProgress),
State#{groups => maps:remove(Group, Groups)};
_ ->
State
end.
add_group_subscription( add_group_subscription(
#{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter #{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter
@ -209,20 +213,3 @@ with_group_sm(State, Group, Fun) ->
%% Error? %% Error?
State State
end. end.
stream_progresses_by_group(StreamProgresses) ->
lists:foldl(
fun(#{topic_filter := #share{group = Group}} = Progress0, Acc) ->
Progress1 = maps:remove(topic_filter, Progress0),
maps:update_with(
Group,
fun(GroupStreams0) ->
[Progress1 | GroupStreams0]
end,
[Progress1],
Acc
)
end,
#{},
StreamProgresses
).

View File

@ -221,35 +221,7 @@ t_intensive_reassign(_Config) ->
end end
end, end,
Messages = lists:foldl( {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
fun(#{payload := Payload, client_pid := Pid}, Acc) ->
maps:update_with(
binary_to_integer(Payload),
fun(Clients) ->
[ClientByBid(Pid) | Clients]
end,
[ClientByBid(Pid)],
Acc
)
end,
#{},
Pubs
),
Missing = lists:filter(
fun(N) -> not maps:is_key(N, Messages) end,
lists:seq(1, 2 * NPubs)
),
Duplicate = lists:filtermap(
fun(N) ->
case Messages of
#{N := [_]} -> false;
#{N := [_ | _] = Clients} -> {true, {N, Clients}};
_ -> false
end
end,
lists:seq(1, 2 * NPubs)
),
?assertEqual( ?assertEqual(
[], [],
@ -266,6 +238,58 @@ t_intensive_reassign(_Config) ->
ok = emqtt:disconnect(ConnShared3), ok = emqtt:disconnect(ConnShared3),
ok = emqtt:disconnect(ConnPub). ok = emqtt:disconnect(ConnPub).
t_unsubscribe(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr9/topic9/#">>, 1),
ct:sleep(1000),
NPubs = 10_000,
Topics = [<<"topic9/1">>, <<"topic9/2">>, <<"topic9/3">>],
ok = publish_n(ConnPub, Topics, 1, NPubs),
Self = self(),
_ = spawn_link(fun() ->
ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs),
Self ! publish_done
end),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr9/topic9/#">>, 1),
{ok, _, _} = emqtt:unsubscribe(ConnShared1, <<"$share/gr9/topic9/#">>),
receive
publish_done -> ok
end,
Pubs = drain_publishes(),
ClientByBid = fun(Pid) ->
case Pid of
ConnShared1 -> <<"client_shared1">>;
ConnShared2 -> <<"client_shared2">>
end
end,
{Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
?assertEqual(
[],
Missing
),
?assertEqual(
[],
Duplicate
),
ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnShared2),
ok = emqtt:disconnect(ConnPub).
t_lease_reconnect(_Config) -> t_lease_reconnect(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>), ConnPub = emqtt_connect_pub(<<"client_pub">>),
@ -364,3 +388,36 @@ drain_publishes(Acc) ->
after 5_000 -> after 5_000 ->
lists:reverse(Acc) lists:reverse(Acc)
end. end.
verify_received_pubs(Pubs, NPubs, ClientByBid) ->
Messages = lists:foldl(
fun(#{payload := Payload, client_pid := Pid}, Acc) ->
maps:update_with(
binary_to_integer(Payload),
fun(Clients) ->
[ClientByBid(Pid) | Clients]
end,
[ClientByBid(Pid)],
Acc
)
end,
#{},
Pubs
),
Missing = lists:filter(
fun(N) -> not maps:is_key(N, Messages) end,
lists:seq(1, NPubs)
),
Duplicate = lists:filtermap(
fun(N) ->
case Messages of
#{N := [_]} -> false;
#{N := [_ | _] = Clients} -> {true, {N, Clients}};
_ -> false
end
end,
lists:seq(1, NPubs)
),
{Missing, Duplicate}.