Merge pull request #12485 from ieQu1/dev/ds-fix-unsubscribe
fix(ds): Fix unsubscribe logic related to the replay
This commit is contained in:
commit
811edb32a2
|
@ -117,7 +117,7 @@
|
||||||
id := subscription_id(),
|
id := subscription_id(),
|
||||||
start_time := emqx_ds:time(),
|
start_time := emqx_ds:time(),
|
||||||
props := map(),
|
props := map(),
|
||||||
extra := map()
|
deleted := boolean()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-define(TIMER_PULL, timer_pull).
|
-define(TIMER_PULL, timer_pull).
|
||||||
|
@ -209,7 +209,7 @@ info(created_at, #{s := S}) ->
|
||||||
info(is_persistent, #{}) ->
|
info(is_persistent, #{}) ->
|
||||||
true;
|
true;
|
||||||
info(subscriptions, #{s := S}) ->
|
info(subscriptions, #{s := S}) ->
|
||||||
subs_to_map(S);
|
emqx_persistent_session_ds_subs:to_map(S);
|
||||||
info(subscriptions_cnt, #{s := S}) ->
|
info(subscriptions_cnt, #{s := S}) ->
|
||||||
emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S));
|
emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S));
|
||||||
info(subscriptions_max, #{props := Conf}) ->
|
info(subscriptions_max, #{props := Conf}) ->
|
||||||
|
@ -280,7 +280,7 @@ subscribe(
|
||||||
SubOpts,
|
SubOpts,
|
||||||
Session = #{id := ID, s := S0}
|
Session = #{id := ID, s := S0}
|
||||||
) ->
|
) ->
|
||||||
case subs_lookup(TopicFilter, S0) of
|
case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of
|
||||||
undefined ->
|
undefined ->
|
||||||
%% TODO: max subscriptions
|
%% TODO: max subscriptions
|
||||||
|
|
||||||
|
@ -313,7 +313,8 @@ subscribe(
|
||||||
Subscription = #{
|
Subscription = #{
|
||||||
start_time => now_ms(),
|
start_time => now_ms(),
|
||||||
props => SubOpts,
|
props => SubOpts,
|
||||||
id => SubId
|
id => SubId,
|
||||||
|
deleted => false
|
||||||
},
|
},
|
||||||
IsNew = true;
|
IsNew = true;
|
||||||
Subscription0 = #{} ->
|
Subscription0 = #{} ->
|
||||||
|
@ -321,7 +322,7 @@ subscribe(
|
||||||
IsNew = false,
|
IsNew = false,
|
||||||
S1 = S0
|
S1 = S0
|
||||||
end,
|
end,
|
||||||
S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S1),
|
S = emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, Subscription, S1),
|
||||||
?tp(persistent_session_ds_subscription_added, #{
|
?tp(persistent_session_ds_subscription_added, #{
|
||||||
topic_filter => TopicFilter, sub => Subscription, is_new => IsNew
|
topic_filter => TopicFilter, sub => Subscription, is_new => IsNew
|
||||||
}),
|
}),
|
||||||
|
@ -333,7 +334,7 @@ unsubscribe(
|
||||||
TopicFilter,
|
TopicFilter,
|
||||||
Session = #{id := ID, s := S0}
|
Session = #{id := ID, s := S0}
|
||||||
) ->
|
) ->
|
||||||
case subs_lookup(TopicFilter, S0) of
|
case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
|
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
|
||||||
Subscription = #{props := SubOpts} ->
|
Subscription = #{props := SubOpts} ->
|
||||||
|
@ -343,12 +344,12 @@ unsubscribe(
|
||||||
|
|
||||||
-spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) ->
|
-spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) ->
|
||||||
emqx_persistent_session_ds_state:t().
|
emqx_persistent_session_ds_state:t().
|
||||||
do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) ->
|
do_unsubscribe(SessionId, TopicFilter, Subscription = #{id := SubId}, S0) ->
|
||||||
S1 = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0),
|
S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, Subscription, S0),
|
||||||
?tp(persistent_session_ds_subscription_delete, #{
|
?tp(persistent_session_ds_subscription_delete, #{
|
||||||
session_id => SessionId, topic_filter => TopicFilter
|
session_id => SessionId, topic_filter => TopicFilter
|
||||||
}),
|
}),
|
||||||
S = emqx_persistent_session_ds_stream_scheduler:del_subscription(SubId, S1),
|
S = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
|
||||||
?tp_span(
|
?tp_span(
|
||||||
persistent_session_ds_subscription_route_delete,
|
persistent_session_ds_subscription_route_delete,
|
||||||
#{session_id => SessionId, topic_filter => TopicFilter},
|
#{session_id => SessionId, topic_filter => TopicFilter},
|
||||||
|
@ -359,7 +360,7 @@ do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) ->
|
||||||
-spec get_subscription(topic_filter(), session()) ->
|
-spec get_subscription(topic_filter(), session()) ->
|
||||||
emqx_types:subopts() | undefined.
|
emqx_types:subopts() | undefined.
|
||||||
get_subscription(TopicFilter, #{s := S}) ->
|
get_subscription(TopicFilter, #{s := S}) ->
|
||||||
case subs_lookup(TopicFilter, S) of
|
case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of
|
||||||
_Subscription = #{props := SubOpts} ->
|
_Subscription = #{props := SubOpts} ->
|
||||||
SubOpts;
|
SubOpts;
|
||||||
undefined ->
|
undefined ->
|
||||||
|
@ -459,7 +460,8 @@ handle_timeout(
|
||||||
Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
|
Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
|
||||||
{ok, Publishes, Session};
|
{ok, Publishes, Session};
|
||||||
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
|
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
|
||||||
S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S0),
|
S1 = emqx_persistent_session_ds_subs:gc(S0),
|
||||||
|
S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
|
||||||
Interval = emqx_config:get([session_persistence, renew_streams_interval]),
|
Interval = emqx_config:get([session_persistence, renew_streams_interval]),
|
||||||
Session = emqx_session:ensure_timer(
|
Session = emqx_session:ensure_timer(
|
||||||
?TIMER_GET_STREAMS,
|
?TIMER_GET_STREAMS,
|
||||||
|
@ -679,7 +681,7 @@ session_drop(ID, Reason) ->
|
||||||
case emqx_persistent_session_ds_state:open(ID) of
|
case emqx_persistent_session_ds_state:open(ID) of
|
||||||
{ok, S0} ->
|
{ok, S0} ->
|
||||||
?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}),
|
?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}),
|
||||||
_S = subs_fold(
|
_S = emqx_persistent_session_ds_subs:fold(
|
||||||
fun(TopicFilter, Subscription, S) ->
|
fun(TopicFilter, Subscription, S) ->
|
||||||
do_unsubscribe(ID, TopicFilter, Subscription, S)
|
do_unsubscribe(ID, TopicFilter, Subscription, S)
|
||||||
end,
|
end,
|
||||||
|
@ -897,27 +899,6 @@ do_drain_buffer(Inflight0, S0, Acc) ->
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------------
|
||||||
|
|
||||||
subs_lookup(TopicFilter, S) ->
|
|
||||||
Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
|
|
||||||
emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined).
|
|
||||||
|
|
||||||
subs_to_map(S) ->
|
|
||||||
subs_fold(
|
|
||||||
fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end,
|
|
||||||
#{},
|
|
||||||
S
|
|
||||||
).
|
|
||||||
|
|
||||||
subs_fold(Fun, AccIn, S) ->
|
|
||||||
Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
|
|
||||||
emqx_topic_gbt:fold(
|
|
||||||
fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end,
|
|
||||||
AccIn,
|
|
||||||
Subs
|
|
||||||
).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% TODO: find a more reliable way to perform actions that have side
|
%% TODO: find a more reliable way to perform actions that have side
|
||||||
%% effects. Add `CBM:init' callback to the session behavior?
|
%% effects. Add `CBM:init' callback to the session behavior?
|
||||||
-spec ensure_timers(session()) -> session().
|
-spec ensure_timers(session()) -> session().
|
||||||
|
|
|
@ -62,7 +62,10 @@
|
||||||
first_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(),
|
first_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(),
|
||||||
%% Sequence numbers that have to be committed for the batch:
|
%% Sequence numbers that have to be committed for the batch:
|
||||||
last_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(),
|
last_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(),
|
||||||
last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno()
|
last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(),
|
||||||
|
%% This stream belongs to an unsubscribed topic-filter, and is
|
||||||
|
%% marked for deletion:
|
||||||
|
unsubscribed = false :: boolean()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% Session metadata keys:
|
%% Session metadata keys:
|
||||||
|
|
|
@ -16,8 +16,8 @@
|
||||||
-module(emqx_persistent_session_ds_stream_scheduler).
|
-module(emqx_persistent_session_ds_stream_scheduler).
|
||||||
|
|
||||||
%% API:
|
%% API:
|
||||||
-export([find_new_streams/1, find_replay_streams/1]).
|
-export([find_new_streams/1, find_replay_streams/1, is_fully_acked/2]).
|
||||||
-export([renew_streams/1, del_subscription/2]).
|
-export([renew_streams/1, on_unsubscribe/2]).
|
||||||
|
|
||||||
%% behavior callbacks:
|
%% behavior callbacks:
|
||||||
-export([]).
|
-export([]).
|
||||||
|
@ -93,7 +93,7 @@ find_new_streams(S) ->
|
||||||
(_Key, #srs{it_end = end_of_stream}, Acc) ->
|
(_Key, #srs{it_end = end_of_stream}, Acc) ->
|
||||||
Acc;
|
Acc;
|
||||||
(Key, Stream, Acc) ->
|
(Key, Stream, Acc) ->
|
||||||
case is_fully_acked(Comm1, Comm2, Stream) of
|
case is_fully_acked(Comm1, Comm2, Stream) andalso not Stream#srs.unsubscribed of
|
||||||
true ->
|
true ->
|
||||||
[{Key, Stream} | Acc];
|
[{Key, Stream} | Acc];
|
||||||
false ->
|
false ->
|
||||||
|
@ -124,37 +124,63 @@ find_new_streams(S) ->
|
||||||
%% This way, messages from the same topic/shard are never reordered.
|
%% This way, messages from the same topic/shard are never reordered.
|
||||||
-spec renew_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
|
-spec renew_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
|
||||||
renew_streams(S0) ->
|
renew_streams(S0) ->
|
||||||
S1 = remove_fully_replayed_streams(S0),
|
S1 = remove_unsubscribed_streams(S0),
|
||||||
emqx_topic_gbt:fold(
|
S2 = remove_fully_replayed_streams(S1),
|
||||||
fun(Key, _Subscription = #{start_time := StartTime, id := SubId}, S2) ->
|
emqx_persistent_session_ds_subs:fold(
|
||||||
TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)),
|
fun
|
||||||
Streams = select_streams(
|
(Key, #{start_time := StartTime, id := SubId, deleted := false}, Acc) ->
|
||||||
SubId,
|
TopicFilter = emqx_topic:words(Key),
|
||||||
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
|
Streams = select_streams(
|
||||||
S2
|
SubId,
|
||||||
),
|
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
|
||||||
lists:foldl(
|
Acc
|
||||||
fun(I, Acc) ->
|
),
|
||||||
ensure_iterator(TopicFilter, StartTime, SubId, I, Acc)
|
lists:foldl(
|
||||||
end,
|
fun(I, Acc1) ->
|
||||||
S2,
|
ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1)
|
||||||
Streams
|
end,
|
||||||
)
|
Acc,
|
||||||
|
Streams
|
||||||
|
);
|
||||||
|
(_Key, _DeletedSubscription, Acc) ->
|
||||||
|
Acc
|
||||||
end,
|
end,
|
||||||
S1,
|
S2,
|
||||||
emqx_persistent_session_ds_state:get_subscriptions(S1)
|
S2
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec del_subscription(
|
-spec on_unsubscribe(
|
||||||
emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t()
|
emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t()
|
||||||
) ->
|
) ->
|
||||||
emqx_persistent_session_ds_state:t().
|
emqx_persistent_session_ds_state:t().
|
||||||
del_subscription(SubId, S0) ->
|
on_unsubscribe(SubId, S0) ->
|
||||||
|
%% NOTE: this function only marks the streams for deletion,
|
||||||
|
%% instead of outright deleting them.
|
||||||
|
%%
|
||||||
|
%% It's done for two reasons:
|
||||||
|
%%
|
||||||
|
%% - MQTT standard states that the broker MUST process acks for
|
||||||
|
%% all sent messages, and it MAY keep on sending buffered
|
||||||
|
%% messages:
|
||||||
|
%% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901186
|
||||||
|
%%
|
||||||
|
%% - Deleting the streams may lead to gaps in the sequence number
|
||||||
|
%% series, and lead to problems with acknowledgement tracking, we
|
||||||
|
%% avoid that by delaying the deletion.
|
||||||
|
%%
|
||||||
|
%% When the stream is marked for deletion, the session won't fetch
|
||||||
|
%% _new_ batches from it. Actual deletion is done by
|
||||||
|
%% `renew_streams', when it detects that all in-flight messages
|
||||||
|
%% from the stream have been acked by the client.
|
||||||
emqx_persistent_session_ds_state:fold_streams(
|
emqx_persistent_session_ds_state:fold_streams(
|
||||||
fun(Key, _, Acc) ->
|
fun(Key, Srs, Acc) ->
|
||||||
case Key of
|
case Key of
|
||||||
{SubId, _Stream} ->
|
{SubId, _Stream} ->
|
||||||
emqx_persistent_session_ds_state:del_stream(Key, Acc);
|
%% This stream belongs to a deleted subscription.
|
||||||
|
%% Mark for deletion:
|
||||||
|
emqx_persistent_session_ds_state:put_stream(
|
||||||
|
Key, Srs#srs{unsubscribed = true}, Acc
|
||||||
|
);
|
||||||
_ ->
|
_ ->
|
||||||
Acc
|
Acc
|
||||||
end
|
end
|
||||||
|
@ -163,12 +189,19 @@ del_subscription(SubId, S0) ->
|
||||||
S0
|
S0
|
||||||
).
|
).
|
||||||
|
|
||||||
|
-spec is_fully_acked(
|
||||||
|
emqx_persistent_session_ds:stream_state(), emqx_persistent_session_ds_state:t()
|
||||||
|
) -> boolean().
|
||||||
|
is_fully_acked(Srs, S) ->
|
||||||
|
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
|
||||||
|
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
|
||||||
|
is_fully_acked(CommQos1, CommQos2, Srs).
|
||||||
|
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%================================================================================
|
%%================================================================================
|
||||||
|
|
||||||
ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
|
ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
|
||||||
%% TODO: hash collisions
|
|
||||||
Key = {SubId, Stream},
|
Key = {SubId, Stream},
|
||||||
case emqx_persistent_session_ds_state:get_stream(Key, S) of
|
case emqx_persistent_session_ds_state:get_stream(Key, S) of
|
||||||
undefined ->
|
undefined ->
|
||||||
|
@ -223,6 +256,27 @@ select_streams(SubId, RankX, Streams0, S) ->
|
||||||
lists:takewhile(fun({{_, Y}, _}) -> Y =:= MinRankY end, Streams)
|
lists:takewhile(fun({{_, Y}, _}) -> Y =:= MinRankY end, Streams)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Remove fully acked streams for the deleted subscriptions.
|
||||||
|
-spec remove_unsubscribed_streams(emqx_persistent_session_ds_state:t()) ->
|
||||||
|
emqx_persistent_session_ds_state:t().
|
||||||
|
remove_unsubscribed_streams(S0) ->
|
||||||
|
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0),
|
||||||
|
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0),
|
||||||
|
emqx_persistent_session_ds_state:fold_streams(
|
||||||
|
fun(Key, ReplayState, S1) ->
|
||||||
|
case
|
||||||
|
ReplayState#srs.unsubscribed andalso is_fully_acked(CommQos1, CommQos2, ReplayState)
|
||||||
|
of
|
||||||
|
true ->
|
||||||
|
emqx_persistent_session_ds_state:del_stream(Key, S1);
|
||||||
|
false ->
|
||||||
|
S1
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
S0,
|
||||||
|
S0
|
||||||
|
).
|
||||||
|
|
||||||
%% @doc Advance RankY for each RankX that doesn't have any unreplayed
|
%% @doc Advance RankY for each RankX that doesn't have any unreplayed
|
||||||
%% streams.
|
%% streams.
|
||||||
%%
|
%%
|
||||||
|
@ -303,6 +357,12 @@ compare_streams(
|
||||||
is_fully_replayed(Comm1, Comm2, S = #srs{it_end = It}) ->
|
is_fully_replayed(Comm1, Comm2, S = #srs{it_end = It}) ->
|
||||||
It =:= end_of_stream andalso is_fully_acked(Comm1, Comm2, S).
|
It =:= end_of_stream andalso is_fully_acked(Comm1, Comm2, S).
|
||||||
|
|
||||||
|
is_fully_acked(_, _, #srs{
|
||||||
|
first_seqno_qos1 = Q1, last_seqno_qos1 = Q1, first_seqno_qos2 = Q2, last_seqno_qos2 = Q2
|
||||||
|
}) ->
|
||||||
|
%% Streams where the last chunk doesn't contain any QoS1 and 2
|
||||||
|
%% messages are considered fully acked:
|
||||||
|
true;
|
||||||
is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
|
is_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) ->
|
||||||
(Comm1 >= S1) andalso (Comm2 >= S2).
|
(Comm1 >= S1) andalso (Comm2 >= S2).
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,154 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc This module encapsulates the data related to the client's
|
||||||
|
%% subscriptions. It tries to reppresent the subscriptions as if they
|
||||||
|
%% were a simple key-value map.
|
||||||
|
%%
|
||||||
|
%% In reality, however, the session has to retain old the
|
||||||
|
%% subscriptions for longer to ensure the consistency of message
|
||||||
|
%% replay.
|
||||||
|
-module(emqx_persistent_session_ds_subs).
|
||||||
|
|
||||||
|
%% API:
|
||||||
|
-export([on_subscribe/3, on_unsubscribe/3, gc/1, lookup/2, to_map/1, fold/3, fold_all/3]).
|
||||||
|
|
||||||
|
-export_type([]).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Type declarations
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% API functions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
%% @doc Process a new subscription
|
||||||
|
-spec on_subscribe(
|
||||||
|
emqx_persistent_session_ds:topic_filter(),
|
||||||
|
emqx_persistent_session_ds:subscription(),
|
||||||
|
emqx_persistent_session_ds_state:t()
|
||||||
|
) ->
|
||||||
|
emqx_persistent_session_ds_state:t().
|
||||||
|
on_subscribe(TopicFilter, Subscription, S) ->
|
||||||
|
emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S).
|
||||||
|
|
||||||
|
%% @doc Process UNSUBSCRIBE
|
||||||
|
-spec on_unsubscribe(
|
||||||
|
emqx_persistent_session_ds:topic_filter(),
|
||||||
|
emqx_persistent_session_ds:subscription(),
|
||||||
|
emqx_persistent_session_ds_state:t()
|
||||||
|
) ->
|
||||||
|
emqx_persistent_session_ds_state:t().
|
||||||
|
on_unsubscribe(TopicFilter, Subscription0, S0) ->
|
||||||
|
%% Note: we cannot delete the subscription immediately, since its
|
||||||
|
%% metadata can be used during replay (see `process_batch'). We
|
||||||
|
%% instead mark it as deleted, and let `subscription_gc' function
|
||||||
|
%% dispatch it later:
|
||||||
|
Subscription = Subscription0#{deleted => true},
|
||||||
|
emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0).
|
||||||
|
|
||||||
|
%% @doc Remove subscriptions that have been marked for deletion, and
|
||||||
|
%% that don't have any unacked messages:
|
||||||
|
-spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
|
||||||
|
gc(S0) ->
|
||||||
|
fold_all(
|
||||||
|
fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) ->
|
||||||
|
case Deleted andalso has_no_unacked_streams(SubId, S0) of
|
||||||
|
true ->
|
||||||
|
emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc);
|
||||||
|
false ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
S0,
|
||||||
|
S0
|
||||||
|
).
|
||||||
|
|
||||||
|
%% @doc Fold over active subscriptions:
|
||||||
|
-spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) ->
|
||||||
|
emqx_persistent_session_ds:subscription() | undefined.
|
||||||
|
lookup(TopicFilter, S) ->
|
||||||
|
Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
|
||||||
|
case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of
|
||||||
|
#{deleted := true} ->
|
||||||
|
undefined;
|
||||||
|
Sub ->
|
||||||
|
Sub
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Convert active subscriptions to a map, for information
|
||||||
|
%% purpose:
|
||||||
|
-spec to_map(emqx_persistent_session_ds_state:t()) -> map().
|
||||||
|
to_map(S) ->
|
||||||
|
fold(
|
||||||
|
fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end,
|
||||||
|
#{},
|
||||||
|
S
|
||||||
|
).
|
||||||
|
|
||||||
|
%% @doc Fold over active subscriptions:
|
||||||
|
-spec fold(
|
||||||
|
fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc),
|
||||||
|
Acc,
|
||||||
|
emqx_persistent_session_ds_state:t()
|
||||||
|
) ->
|
||||||
|
Acc.
|
||||||
|
fold(Fun, AccIn, S) ->
|
||||||
|
fold_all(
|
||||||
|
fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) ->
|
||||||
|
case Deleted of
|
||||||
|
true -> Acc;
|
||||||
|
false -> Fun(TopicFilter, Sub, Acc)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
AccIn,
|
||||||
|
S
|
||||||
|
).
|
||||||
|
|
||||||
|
%% @doc Fold over all subscriptions, including inactive ones:
|
||||||
|
-spec fold_all(
|
||||||
|
fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc),
|
||||||
|
Acc,
|
||||||
|
emqx_persistent_session_ds_state:t()
|
||||||
|
) ->
|
||||||
|
Acc.
|
||||||
|
fold_all(Fun, AccIn, S) ->
|
||||||
|
Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
|
||||||
|
emqx_topic_gbt:fold(
|
||||||
|
fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end,
|
||||||
|
AccIn,
|
||||||
|
Subs
|
||||||
|
).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Internal functions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec has_no_unacked_streams(
|
||||||
|
emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t()
|
||||||
|
) -> boolean().
|
||||||
|
has_no_unacked_streams(SubId, S) ->
|
||||||
|
emqx_persistent_session_ds_state:fold_streams(
|
||||||
|
fun
|
||||||
|
({SID, _Stream}, Srs, Acc) when SID =:= SubId ->
|
||||||
|
emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc;
|
||||||
|
(_StreamKey, _Srs, Acc) ->
|
||||||
|
Acc
|
||||||
|
end,
|
||||||
|
true,
|
||||||
|
S
|
||||||
|
).
|
|
@ -278,7 +278,10 @@ publish_many(Messages) ->
|
||||||
|
|
||||||
publish_many(Messages, WaitForUnregister) ->
|
publish_many(Messages, WaitForUnregister) ->
|
||||||
Fun = fun(Client, Message) ->
|
Fun = fun(Client, Message) ->
|
||||||
{ok, _} = emqtt:publish(Client, Message)
|
case emqtt:publish(Client, Message) of
|
||||||
|
ok -> ok;
|
||||||
|
{ok, _} -> ok
|
||||||
|
end
|
||||||
end,
|
end,
|
||||||
do_publish(Messages, Fun, WaitForUnregister).
|
do_publish(Messages, Fun, WaitForUnregister).
|
||||||
|
|
||||||
|
@ -1026,6 +1029,80 @@ t_unsubscribe(Config) ->
|
||||||
?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
|
?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
|
||||||
ok = emqtt:disconnect(Client).
|
ok = emqtt:disconnect(Client).
|
||||||
|
|
||||||
|
%% This testcase verifies that un-acked messages that were once sent
|
||||||
|
%% to the client are still retransmitted after the session
|
||||||
|
%% unsubscribes from the topic and reconnects.
|
||||||
|
t_unsubscribe_replay(Config) ->
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
|
TopicPrefix = ?config(topic, Config),
|
||||||
|
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
ClientOpts = [
|
||||||
|
{proto_ver, v5},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 30, 'Receive-Maximum' => 10}},
|
||||||
|
{max_inflight, 10}
|
||||||
|
| Config
|
||||||
|
],
|
||||||
|
{ok, Sub} = emqtt:start_link([{clean_start, true}, {auto_ack, never} | ClientOpts]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Sub),
|
||||||
|
%% 1. Make two subscriptions, one is to be deleted:
|
||||||
|
Topic1 = iolist_to_binary([TopicPrefix, $/, <<"unsub">>]),
|
||||||
|
Topic2 = iolist_to_binary([TopicPrefix, $/, <<"sub">>]),
|
||||||
|
?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic1, qos2)),
|
||||||
|
?assertMatch({ok, _, _}, emqtt:subscribe(Sub, Topic2, qos2)),
|
||||||
|
%% 2. Publish 2 messages to the first and second topics each
|
||||||
|
%% (client doesn't ack them):
|
||||||
|
ok = publish(Topic1, <<"1">>, ?QOS_1),
|
||||||
|
ok = publish(Topic1, <<"2">>, ?QOS_2),
|
||||||
|
[Msg1, Msg2] = receive_messages(2),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{payload := <<"1">>},
|
||||||
|
#{payload := <<"2">>}
|
||||||
|
],
|
||||||
|
[Msg1, Msg2]
|
||||||
|
),
|
||||||
|
ok = publish(Topic2, <<"3">>, ?QOS_1),
|
||||||
|
ok = publish(Topic2, <<"4">>, ?QOS_2),
|
||||||
|
[Msg3, Msg4] = receive_messages(2),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
#{payload := <<"3">>},
|
||||||
|
#{payload := <<"4">>}
|
||||||
|
],
|
||||||
|
[Msg3, Msg4]
|
||||||
|
),
|
||||||
|
%% 3. Unsubscribe from the topic and disconnect:
|
||||||
|
?assertMatch({ok, _, _}, emqtt:unsubscribe(Sub, Topic1)),
|
||||||
|
ok = emqtt:disconnect(Sub),
|
||||||
|
%% 5. Publish more messages to the disconnected topic:
|
||||||
|
ok = publish(Topic1, <<"5">>, ?QOS_1),
|
||||||
|
ok = publish(Topic1, <<"6">>, ?QOS_2),
|
||||||
|
%% 4. Reconnect the client. It must only receive only four
|
||||||
|
%% messages from the time when it was subscribed:
|
||||||
|
{ok, Sub1} = emqtt:start_link([{clean_start, false}, {auto_ack, true} | ClientOpts]),
|
||||||
|
?assertMatch({ok, _}, emqtt:ConnFun(Sub1)),
|
||||||
|
%% Note: we ask for 6 messages, but expect only 4, it's
|
||||||
|
%% intentional:
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
Topic1 := [<<"1">>, <<"2">>],
|
||||||
|
Topic2 := [<<"3">>, <<"4">>]
|
||||||
|
},
|
||||||
|
get_topicwise_order(receive_messages(6, 5_000)),
|
||||||
|
debug_info(ClientId)
|
||||||
|
),
|
||||||
|
%% 5. Now let's resubscribe, and check that the session can receive new messages:
|
||||||
|
?assertMatch({ok, _, _}, emqtt:subscribe(Sub1, Topic1, qos2)),
|
||||||
|
ok = publish(Topic1, <<"7">>, ?QOS_0),
|
||||||
|
ok = publish(Topic1, <<"8">>, ?QOS_1),
|
||||||
|
ok = publish(Topic1, <<"9">>, ?QOS_2),
|
||||||
|
?assertMatch(
|
||||||
|
[<<"7">>, <<"8">>, <<"9">>],
|
||||||
|
lists:map(fun get_msgpub_payload/1, receive_messages(3))
|
||||||
|
),
|
||||||
|
ok = emqtt:disconnect(Sub1).
|
||||||
|
|
||||||
t_multiple_subscription_matches(Config) ->
|
t_multiple_subscription_matches(Config) ->
|
||||||
ConnFun = ?config(conn_fun, Config),
|
ConnFun = ?config(conn_fun, Config),
|
||||||
Topic = ?config(topic, Config),
|
Topic = ?config(topic, Config),
|
||||||
|
|
Loading…
Reference in New Issue