diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 62e6bdd26..517681f9a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -621,9 +621,13 @@ handle_timeout(ClientInfo, ?TIMER_RETRY_REPLAY, Session0) -> Session = replay_streams(Session0, ClientInfo), {ok, [], Session}; handle_timeout(ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0, shared_sub_s := SharedSubS0}) -> - S1 = emqx_persistent_session_ds_subs:gc(S0), - S2 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1), - {S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S2, SharedSubS0), + %% `gc` and `renew_streams` methods may drop unsubscribed streams. + %% Shared subscription handler must have a chance to see unsubscribed streams + %% in the fully replayed state. + {S1, SharedSubS1} = emqx_persistent_session_ds_shared_subs:on_streams_replay(S0, SharedSubS0), + S2 = emqx_persistent_session_ds_subs:gc(S1), + S3 = emqx_persistent_session_ds_stream_scheduler:renew_streams(S2), + {S, SharedSubS} = emqx_persistent_session_ds_shared_subs:renew_streams(S3, SharedSubS1), Interval = get_config(ClientInfo, [renew_streams_interval]), Session = emqx_session:ensure_timer( ?TIMER_GET_STREAMS, diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index ad00fadbd..94bd2c82f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -24,8 +24,24 @@ to_map/2 ]). +-define(schedule_subscribe, schedule_subscribe). +-define(schedule_unsubscribe, schedule_unsubscribe). + +-type stream_key() :: {emqx_persistent_session_ds:id(), emqx_ds:stream()}. + +-type scheduled_action_type() :: + {?schedule_subscribe, emqx_types:subopts()} | ?schedule_unsubscribe. +-type scheduled_action() :: #{ + type := scheduled_action_type(), + stream_keys_to_wait := [stream_key()], + progresses := [emqx_ds_shared_sub_proto:agent_stream_progress()] +}. + -type t() :: #{ - agent := emqx_persistent_session_ds_shared_subs_agent:t() + agent := emqx_persistent_session_ds_shared_subs_agent:t(), + scheduled_actions := #{ + share_topic_filter() => scheduled_action() + } }. -type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type opts() :: #{ @@ -44,7 +60,8 @@ new(Opts) -> #{ agent => emqx_persistent_session_ds_shared_subs_agent:new( agent_opts(Opts) - ) + ), + scheduled_actions => #{} }. -spec open(emqx_persistent_session_ds_state:t(), opts()) -> @@ -80,32 +97,29 @@ on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) -> ) -> {ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()} | {error, emqx_types:reason_code()}. -on_unsubscribe(SessionId, TopicFilter, S0, #{agent := Agent0} = SharedSubS0) -> +on_unsubscribe(SessionId, TopicFilter, S0, SharedSubS0) -> case lookup(TopicFilter, S0) of undefined -> {error, ?RC_NO_SUBSCRIPTION_EXISTED}; - Subscription -> + #{id := SubId} = Subscription -> ?tp(persistent_session_ds_subscription_delete, #{ session_id => SessionId, topic_filter => TopicFilter }), - Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe( - Agent0, TopicFilter - ), - SharedSubS = SharedSubS0#{agent => Agent1}, S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), + SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, TopicFilter), {ok, S, SharedSubS, Subscription} end. -spec renew_streams(emqx_persistent_session_ds_state:t(), t()) -> {emqx_persistent_session_ds_state:t(), t()}. -renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> +renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = SharedSubS0) -> {StreamLeaseEvents, Agent1} = emqx_persistent_session_ds_shared_subs_agent:renew_streams( Agent0 ), ?tp(info, shared_subs_new_stream_lease_events, #{stream_lease_events => StreamLeaseEvents}), S1 = lists:foldl( fun - (#{type := lease} = Event, S) -> accept_stream(Event, S); + (#{type := lease} = Event, S) -> accept_stream(Event, S, ScheduledActions); (#{type := revoke} = Event, S) -> revoke_stream(Event, S) end, S0, @@ -118,19 +132,23 @@ renew_streams(S0, #{agent := Agent0} = SharedSubS0) -> emqx_persistent_session_ds_state:t(), t() ) -> {emqx_persistent_session_ds_state:t(), t()}. -on_streams_replay(S, #{agent := Agent0} = SharedSubS0) -> +on_streams_replay(S, #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0) -> Progresses = stream_progresses(S), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress( Agent0, Progresses ), - SharedSubS1 = SharedSubS0#{agent => Agent1}, + {Agent2, ScheduledActions1} = run_scheduled_actions(S, Agent1, ScheduledActions0), + SharedSubS1 = SharedSubS0#{ + agent => Agent2, + scheduled_actions => ScheduledActions1 + }, {S, SharedSubS1}. on_disconnect(S0, #{agent := Agent0} = SharedSubS0) -> S1 = revoke_all_streams(S0), Progresses = stream_progresses(S1), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_disconnect(Agent0, Progresses), - SharedSubS1 = SharedSubS0#{agent => Agent1}, + SharedSubS1 = SharedSubS0#{agent => Agent1, scheduled_actions => #{}}, {S1, SharedSubS1}. -spec on_info(emqx_persistent_session_ds_state:t(), t(), term()) -> @@ -149,9 +167,79 @@ to_map(_S, _SharedSubS) -> %% Internal functions %%-------------------------------------------------------------------- +run_scheduled_actions(S, Agent, ScheduledActions) -> + maps:fold( + fun(TopicFilter, Action0, {AgentAcc0, ScheduledActionsAcc}) -> + case run_scheduled_action(S, AgentAcc0, TopicFilter, Action0) of + {ok, AgentAcc1} -> + {AgentAcc1, maps:remove(TopicFilter, ScheduledActionsAcc)}; + {continue, Action1} -> + {AgentAcc0, ScheduledActionsAcc#{TopicFilter => Action1}} + end + end, + {Agent, ScheduledActions}, + ScheduledActions + ). + +run_scheduled_action( + S, + Agent, + TopicFilter, + #{type := Type, stream_keys_to_wait := StreamKeysToWait0, progresses := Progresses0} = Action +) -> + StreamKeysToWait1 = lists:filter( + fun({_SubId, _Stream} = Key) -> + case emqx_persistent_session_ds_state:get_stream(Key, S) of + undefined -> + %% This should not happen: we should see any stream + %% in completed state before deletion + true; + SRS -> + not is_stream_fully_acked(S, SRS) + end + end, + StreamKeysToWait0 + ), + + Progresses1 = + lists:map( + fun({_SubId, Stream} = Key) -> + #srs{it_end = ItEnd} = SRS = emqx_persistent_session_ds_state:get_stream(Key, S), + #{ + stream => Stream, + iterator => ItEnd, + use_finished => is_use_finished(S, SRS) + } + end, + (StreamKeysToWait0 -- StreamKeysToWait1) + ) ++ Progresses0, + + case StreamKeysToWait1 of + [] -> + case Type of + {?schedule_subscribe, SubOpts} -> + {ok, + emqx_persistent_session_ds_shared_subs_agent:on_subscribe( + Agent, TopicFilter, SubOpts + )}; + ?schedule_unsubscribe -> + {ok, + emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe( + Agent, TopicFilter, Progresses1 + )} + end; + _ -> + {continue, Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1}} + end. + stream_progresses(S) -> fold_shared_stream_states( - fun(TopicFilter, Stream, SRS, Acc) -> + fun( + #share{group = Group}, + Stream, + SRS, + ProgressesAcc0 + ) -> #srs{it_end = EndIt} = SRS, case is_stream_fully_acked(S, SRS) of @@ -159,17 +247,22 @@ stream_progresses(S) -> %% TODO %% Is it sufficient for a report? StreamProgress = #{ - topic_filter => TopicFilter, stream => Stream, iterator => EndIt, - use_finished => is_use_finished(S, SRS) + use_finished => is_use_finished(S, SRS), + is_fully_acked => true }, - [StreamProgress | Acc]; + maps:update_with( + Group, + fun(Progresses) -> [StreamProgress | Progresses] end, + [StreamProgress], + ProgressesAcc0 + ); false -> - Acc + ProgressesAcc0 end end, - [], + #{}, S ). @@ -222,14 +315,16 @@ on_subscribe(Subscription, TopicFilter, SubOpts, Session) -> -dialyzer({nowarn_function, create_new_subscription/3}). create_new_subscription(TopicFilter, SubOpts, #{ - id := SessionId, s := S0, shared_sub_s := #{agent := Agent0} = SharedSubS0, props := Props + s := S0, + shared_sub_s := #{agent := Agent} = SharedSubS0, + props := Props }) -> case - emqx_persistent_session_ds_shared_subs_agent:on_subscribe( - Agent0, TopicFilter, SubOpts + emqx_persistent_session_ds_shared_subs_agent:can_subscribe( + Agent, TopicFilter, SubOpts ) of - {ok, Agent1} -> + ok -> #{upgrade_qos := UpgradeQoS} = Props, {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), @@ -247,10 +342,7 @@ create_new_subscription(TopicFilter, SubOpts, #{ S = emqx_persistent_session_ds_state:put_subscription( TopicFilter, Subscription, S3 ), - SharedSubS = SharedSubS0#{agent => Agent1}, - ?tp(persistent_session_ds_shared_subscription_added, #{ - topic_filter => TopicFilter, session => SessionId - }), + SharedSubS = schedule_subscribe(SharedSubS0, TopicFilter, SubOpts), {ok, S, SharedSubS}; {error, _} = Error -> Error @@ -289,15 +381,25 @@ lookup(TopicFilter, S) -> undefined end. +accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) -> + %% If we have a pending action (subscribe or unsubscribe) for this topic filter, + %% we should not accept a stream and start replay it. We won't use it anyway: + %% * if subscribe is pending, we will reset agent obtain a new lease + %% * if unsubscribe is pending, we will drop connection + case ScheduledActions of + #{TopicFilter := _Action} -> + S; + _ -> + accept_stream(Event, S) + end. + accept_stream( #{topic_filter := TopicFilter, stream := Stream, iterator := Iterator}, S0 ) -> case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of undefined -> - %% This should not happen. - %% Agent should have received unsubscribe callback - %% and should not have passed this stream as a new one - error(new_stream_without_sub); + %% We unsubscribed + S0; #{id := SubId, current_state := SStateId} -> Key = {SubId, Stream}, case emqx_persistent_session_ds_state:get_stream(Key, S0) of @@ -347,6 +449,57 @@ revoke_all_streams(S0) -> S0 ). +schedule_subscribe( + #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0, TopicFilter, SubOpts +) -> + case ScheduledActions0 of + #{TopicFilter := ScheduledAction} -> + ScheduledActions1 = ScheduledActions0#{ + TopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}} + }, + SharedSubS0#{scheduled_actions := ScheduledActions1}; + _ -> + Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe( + Agent0, TopicFilter, SubOpts + ), + SharedSubS0#{agent => Agent1} + end. + +schedule_unsubscribe( + S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, TopicFilter +) -> + case ScheduledActions0 of + #{TopicFilter := ScheduledAction} -> + ScheduledActions1 = ScheduledActions0#{ + TopicFilter => ScheduledAction#{type => ?schedule_unsubscribe} + }, + SharedSubS0#{scheduled_actions := ScheduledActions1}; + _ -> + StreamIdsToFinalize = stream_ids_by_sub_id(S, UnsubscridedSubId), + ScheduledActions1 = ScheduledActions0#{ + TopicFilter => #{ + type => ?schedule_unsubscribe, + stream_keys_to_wait => StreamIdsToFinalize, + progresses => [] + } + }, + SharedSubS0#{scheduled_actions := ScheduledActions1} + end. + +stream_ids_by_sub_id(S, MatchSubId) -> + emqx_persistent_session_ds_state:fold_streams( + fun({SubId, _Stream} = StreamStateId, _SRS, StreamStateIds) -> + case SubId of + MatchSubId -> + [StreamStateId | StreamStateIds]; + _ -> + StreamStateIds + end + end, + [], + S + ). + -spec to_agent_subscription( emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription() ) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl index 72b4fa22d..b49ceabcf 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl @@ -59,9 +59,10 @@ -export([ new/1, open/2, + can_subscribe/3, on_subscribe/3, - on_unsubscribe/2, + on_unsubscribe/3, on_stream_progress/2, on_info/2, on_disconnect/2, @@ -80,12 +81,12 @@ -callback new(opts()) -> t(). -callback open([{topic_filter(), subscription()}], opts()) -> t(). --callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> - {ok, t()} | {error, term()}. --callback on_unsubscribe(t(), topic_filter()) -> t(). --callback on_disconnect(t(), [stream_progress()]) -> t(). +-callback can_subscribe(t(), topic_filter(), emqx_types:subopts()) -> ok | {error, term()}. +-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> t(). +-callback on_unsubscribe(t(), topic_filter(), [stream_progress()]) -> t(). +-callback on_disconnect(t(), #{emqx_types:group() => [stream_progress()]}) -> t(). -callback renew_streams(t()) -> {[stream_lease_event()], t()}. --callback on_stream_progress(t(), [stream_progress()]) -> t(). +-callback on_stream_progress(t(), #{emqx_types:group() => [stream_progress()]}) -> t(). -callback on_info(t(), term()) -> t(). %%-------------------------------------------------------------------- @@ -100,16 +101,19 @@ new(Opts) -> open(Topics, Opts) -> ?shared_subs_agent:open(Topics, Opts). --spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> - {ok, t()} | {error, emqx_types:reason_code()}. +-spec can_subscribe(t(), topic_filter(), emqx_types:subopts()) -> ok | {error, term()}. +can_subscribe(Agent, TopicFilter, SubOpts) -> + ?shared_subs_agent:can_subscribe(Agent, TopicFilter, SubOpts). + +-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> t(). on_subscribe(Agent, TopicFilter, SubOpts) -> ?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts). --spec on_unsubscribe(t(), topic_filter()) -> t(). -on_unsubscribe(Agent, TopicFilter) -> - ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter). +-spec on_unsubscribe(t(), topic_filter(), [stream_progress()]) -> t(). +on_unsubscribe(Agent, TopicFilter, StreamProgresses) -> + ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter, StreamProgresses). --spec on_disconnect(t(), [stream_progress()]) -> t(). +-spec on_disconnect(t(), #{emqx_types:group() => [stream_progress()]}) -> t(). on_disconnect(Agent, StreamProgresses) -> ?shared_subs_agent:on_disconnect(Agent, StreamProgresses). @@ -117,7 +121,7 @@ on_disconnect(Agent, StreamProgresses) -> renew_streams(Agent) -> ?shared_subs_agent:renew_streams(Agent). --spec on_stream_progress(t(), [stream_progress()]) -> t(). +-spec on_stream_progress(t(), #{emqx_types:group() => [stream_progress()]}) -> t(). on_stream_progress(Agent, StreamProgress) -> ?shared_subs_agent:on_stream_progress(Agent, StreamProgress). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl index d984194a8..8156db76d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_null_agent.erl @@ -9,9 +9,10 @@ -export([ new/1, open/2, + can_subscribe/3, on_subscribe/3, - on_unsubscribe/2, + on_unsubscribe/3, on_stream_progress/2, on_info/2, on_disconnect/2, @@ -31,10 +32,13 @@ new(_Opts) -> open(_Topics, _Opts) -> undefined. -on_subscribe(_Agent, _TopicFilter, _SubOpts) -> +can_subscribe(_Agent, _TopicFilter, _SubOpts) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}. -on_unsubscribe(Agent, _TopicFilter) -> +on_subscribe(Agent, _TopicFilter, _SubOpts) -> + Agent. + +on_unsubscribe(Agent, _TopicFilter, _Progresses) -> Agent. on_disconnect(Agent, _) -> diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index 0e8d17614..70b203661 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -12,9 +12,10 @@ -export([ new/1, open/2, + can_subscribe/3, on_subscribe/3, - on_unsubscribe/2, + on_unsubscribe/3, on_stream_progress/2, on_info/2, on_disconnect/2, @@ -47,40 +48,38 @@ open(TopicSubscriptions, Opts) -> ), State1. -on_subscribe(State0, TopicFilter, _SubOpts) -> - State1 = add_group_subscription(State0, TopicFilter), - {ok, State1}. +can_subscribe(_State, _TopicFilter, _SubOpts) -> + ok. -on_unsubscribe(State, TopicFilter) -> - delete_group_subscription(State, TopicFilter). +on_subscribe(State0, TopicFilter, _SubOpts) -> + add_group_subscription(State0, TopicFilter). + +on_unsubscribe(State, TopicFilter, GroupProgress) -> + delete_group_subscription(State, TopicFilter, GroupProgress). renew_streams(#{} = State) -> fetch_stream_events(State). on_stream_progress(State, StreamProgresses) -> - ProgressesByGroup = stream_progresses_by_group(StreamProgresses), - lists:foldl( - fun({Group, GroupProgresses}, StateAcc) -> + maps:fold( + fun(Group, GroupProgresses, StateAcc) -> with_group_sm(StateAcc, Group, fun(GSM) -> emqx_ds_shared_sub_group_sm:handle_stream_progress(GSM, GroupProgresses) end) end, State, - maps:to_list(ProgressesByGroup) + StreamProgresses ). on_disconnect(#{groups := Groups0} = State, StreamProgresses) -> - ProgressesByGroup = stream_progresses_by_group(StreamProgresses), - Groups1 = maps:fold( - fun(Group, GroupSM0, GroupsAcc) -> - GroupProgresses = maps:get(Group, ProgressesByGroup, []), - GroupSM1 = emqx_ds_shared_sub_group_sm:handle_disconnect(GroupSM0, GroupProgresses), - GroupsAcc#{Group => GroupSM1} + ok = maps:foreach( + fun(Group, GroupSM0) -> + GroupProgresses = maps:get(Group, StreamProgresses, []), + emqx_ds_shared_sub_group_sm:handle_disconnect(GroupSM0, GroupProgresses) end, - #{}, Groups0 ), - State#{groups => Groups1}. + State#{groups => #{}}. on_info(State, ?leader_lease_streams_match(Group, Leader, StreamProgresses, Version)) -> ?SLOG(info, #{ @@ -152,9 +151,14 @@ init_state(Opts) -> groups => #{} }. -delete_group_subscription(State, _ShareTopicFilter) -> - %% TODO https://emqx.atlassian.net/browse/EMQX-12572 - State. +delete_group_subscription(State, #share{group = Group}, GroupProgress) -> + case State of + #{groups := #{Group := GSM} = Groups} -> + _ = emqx_ds_shared_sub_group_sm:handle_disconnect(GSM, GroupProgress), + State#{groups => maps:remove(Group, Groups)}; + _ -> + State + end. add_group_subscription( #{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter @@ -209,20 +213,3 @@ with_group_sm(State, Group, Fun) -> %% Error? State end. - -stream_progresses_by_group(StreamProgresses) -> - lists:foldl( - fun(#{topic_filter := #share{group = Group}} = Progress0, Acc) -> - Progress1 = maps:remove(topic_filter, Progress0), - maps:update_with( - Group, - fun(GroupStreams0) -> - [Progress1 | GroupStreams0] - end, - [Progress1], - Acc - ) - end, - #{}, - StreamProgresses - ). diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index 3e80b44a9..defc90c78 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -221,35 +221,7 @@ t_intensive_reassign(_Config) -> end end, - Messages = lists:foldl( - fun(#{payload := Payload, client_pid := Pid}, Acc) -> - maps:update_with( - binary_to_integer(Payload), - fun(Clients) -> - [ClientByBid(Pid) | Clients] - end, - [ClientByBid(Pid)], - Acc - ) - end, - #{}, - Pubs - ), - - Missing = lists:filter( - fun(N) -> not maps:is_key(N, Messages) end, - lists:seq(1, 2 * NPubs) - ), - Duplicate = lists:filtermap( - fun(N) -> - case Messages of - #{N := [_]} -> false; - #{N := [_ | _] = Clients} -> {true, {N, Clients}}; - _ -> false - end - end, - lists:seq(1, 2 * NPubs) - ), + {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), ?assertEqual( [], @@ -266,6 +238,58 @@ t_intensive_reassign(_Config) -> ok = emqtt:disconnect(ConnShared3), ok = emqtt:disconnect(ConnPub). +t_unsubscribe(_Config) -> + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + ConnShared1 = emqtt_connect_sub(<<"client_shared1">>), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr9/topic9/#">>, 1), + + ct:sleep(1000), + + NPubs = 10_000, + + Topics = [<<"topic9/1">>, <<"topic9/2">>, <<"topic9/3">>], + ok = publish_n(ConnPub, Topics, 1, NPubs), + + Self = self(), + _ = spawn_link(fun() -> + ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs), + Self ! publish_done + end), + + ConnShared2 = emqtt_connect_sub(<<"client_shared2">>), + {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr9/topic9/#">>, 1), + {ok, _, _} = emqtt:unsubscribe(ConnShared1, <<"$share/gr9/topic9/#">>), + + receive + publish_done -> ok + end, + + Pubs = drain_publishes(), + + ClientByBid = fun(Pid) -> + case Pid of + ConnShared1 -> <<"client_shared1">>; + ConnShared2 -> <<"client_shared2">> + end + end, + + {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), + + ?assertEqual( + [], + Missing + ), + + ?assertEqual( + [], + Duplicate + ), + + ok = emqtt:disconnect(ConnShared1), + ok = emqtt:disconnect(ConnShared2), + ok = emqtt:disconnect(ConnPub). + t_lease_reconnect(_Config) -> ConnPub = emqtt_connect_pub(<<"client_pub">>), @@ -364,3 +388,36 @@ drain_publishes(Acc) -> after 5_000 -> lists:reverse(Acc) end. + +verify_received_pubs(Pubs, NPubs, ClientByBid) -> + Messages = lists:foldl( + fun(#{payload := Payload, client_pid := Pid}, Acc) -> + maps:update_with( + binary_to_integer(Payload), + fun(Clients) -> + [ClientByBid(Pid) | Clients] + end, + [ClientByBid(Pid)], + Acc + ) + end, + #{}, + Pubs + ), + + Missing = lists:filter( + fun(N) -> not maps:is_key(N, Messages) end, + lists:seq(1, NPubs) + ), + Duplicate = lists:filtermap( + fun(N) -> + case Messages of + #{N := [_]} -> false; + #{N := [_ | _] = Clients} -> {true, {N, Clients}}; + _ -> false + end + end, + lists:seq(1, NPubs) + ), + + {Missing, Duplicate}.