refactor(sessds): Extract subscription mgmt logic to separate module
This commit is contained in:
parent
3000a8f286
commit
19c6d1127f
|
@ -209,7 +209,7 @@ info(created_at, #{s := S}) ->
|
||||||
info(is_persistent, #{}) ->
|
info(is_persistent, #{}) ->
|
||||||
true;
|
true;
|
||||||
info(subscriptions, #{s := S}) ->
|
info(subscriptions, #{s := S}) ->
|
||||||
subs_to_map(S);
|
emqx_persistent_session_ds_subs:to_map(S);
|
||||||
info(subscriptions_cnt, #{s := S}) ->
|
info(subscriptions_cnt, #{s := S}) ->
|
||||||
emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S));
|
emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S));
|
||||||
info(subscriptions_max, #{props := Conf}) ->
|
info(subscriptions_max, #{props := Conf}) ->
|
||||||
|
@ -280,7 +280,7 @@ subscribe(
|
||||||
SubOpts,
|
SubOpts,
|
||||||
Session = #{id := ID, s := S0}
|
Session = #{id := ID, s := S0}
|
||||||
) ->
|
) ->
|
||||||
case subs_lookup(TopicFilter, S0) of
|
case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of
|
||||||
undefined ->
|
undefined ->
|
||||||
%% TODO: max subscriptions
|
%% TODO: max subscriptions
|
||||||
|
|
||||||
|
@ -322,7 +322,7 @@ subscribe(
|
||||||
IsNew = false,
|
IsNew = false,
|
||||||
S1 = S0
|
S1 = S0
|
||||||
end,
|
end,
|
||||||
S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S1),
|
S = emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, Subscription, S1),
|
||||||
?tp(persistent_session_ds_subscription_added, #{
|
?tp(persistent_session_ds_subscription_added, #{
|
||||||
topic_filter => TopicFilter, sub => Subscription, is_new => IsNew
|
topic_filter => TopicFilter, sub => Subscription, is_new => IsNew
|
||||||
}),
|
}),
|
||||||
|
@ -334,7 +334,7 @@ unsubscribe(
|
||||||
TopicFilter,
|
TopicFilter,
|
||||||
Session = #{id := ID, s := S0}
|
Session = #{id := ID, s := S0}
|
||||||
) ->
|
) ->
|
||||||
case subs_lookup(TopicFilter, S0) of
|
case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
|
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
|
||||||
Subscription = #{props := SubOpts} ->
|
Subscription = #{props := SubOpts} ->
|
||||||
|
@ -344,13 +344,8 @@ unsubscribe(
|
||||||
|
|
||||||
-spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) ->
|
-spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) ->
|
||||||
emqx_persistent_session_ds_state:t().
|
emqx_persistent_session_ds_state:t().
|
||||||
do_unsubscribe(SessionId, TopicFilter, SubMeta0 = #{id := SubId}, S0) ->
|
do_unsubscribe(SessionId, TopicFilter, Subscription = #{id := SubId}, S0) ->
|
||||||
%% Note: we cannot delete the subscription immediately, since its
|
S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, Subscription, S0),
|
||||||
%% metadata can be used during replay (see `process_batch'). We
|
|
||||||
%% instead mark it as deleted, and let `subscription_gc' function
|
|
||||||
%% dispatch it later:
|
|
||||||
SubMeta = SubMeta0#{deleted => true},
|
|
||||||
S1 = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], SubMeta, S0),
|
|
||||||
?tp(persistent_session_ds_subscription_delete, #{
|
?tp(persistent_session_ds_subscription_delete, #{
|
||||||
session_id => SessionId, topic_filter => TopicFilter
|
session_id => SessionId, topic_filter => TopicFilter
|
||||||
}),
|
}),
|
||||||
|
@ -365,7 +360,7 @@ do_unsubscribe(SessionId, TopicFilter, SubMeta0 = #{id := SubId}, S0) ->
|
||||||
-spec get_subscription(topic_filter(), session()) ->
|
-spec get_subscription(topic_filter(), session()) ->
|
||||||
emqx_types:subopts() | undefined.
|
emqx_types:subopts() | undefined.
|
||||||
get_subscription(TopicFilter, #{s := S}) ->
|
get_subscription(TopicFilter, #{s := S}) ->
|
||||||
case subs_lookup(TopicFilter, S) of
|
case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of
|
||||||
_Subscription = #{props := SubOpts} ->
|
_Subscription = #{props := SubOpts} ->
|
||||||
SubOpts;
|
SubOpts;
|
||||||
undefined ->
|
undefined ->
|
||||||
|
@ -465,7 +460,7 @@ handle_timeout(
|
||||||
Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
|
Session = emqx_session:ensure_timer(?TIMER_PULL, Timeout, Session1),
|
||||||
{ok, Publishes, Session};
|
{ok, Publishes, Session};
|
||||||
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
|
handle_timeout(_ClientInfo, ?TIMER_GET_STREAMS, Session0 = #{s := S0}) ->
|
||||||
S1 = subscription_gc(S0),
|
S1 = emqx_persistent_session_ds_subs:gc(S0),
|
||||||
S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
|
S = emqx_persistent_session_ds_stream_scheduler:renew_streams(S1),
|
||||||
Interval = emqx_config:get([session_persistence, renew_streams_interval]),
|
Interval = emqx_config:get([session_persistence, renew_streams_interval]),
|
||||||
Session = emqx_session:ensure_timer(
|
Session = emqx_session:ensure_timer(
|
||||||
|
@ -509,7 +504,6 @@ replay(ClientInfo, [], Session0 = #{s := S0}) ->
|
||||||
Session0,
|
Session0,
|
||||||
Streams
|
Streams
|
||||||
),
|
),
|
||||||
logger:error("Replay streams: ~p~n~p", [Streams, Session]),
|
|
||||||
%% Note: we filled the buffer with the historical messages, and
|
%% Note: we filled the buffer with the historical messages, and
|
||||||
%% from now on we'll rely on the normal inflight/flow control
|
%% from now on we'll rely on the normal inflight/flow control
|
||||||
%% mechanisms to replay them:
|
%% mechanisms to replay them:
|
||||||
|
@ -687,7 +681,7 @@ session_drop(ID, Reason) ->
|
||||||
case emqx_persistent_session_ds_state:open(ID) of
|
case emqx_persistent_session_ds_state:open(ID) of
|
||||||
{ok, S0} ->
|
{ok, S0} ->
|
||||||
?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}),
|
?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}),
|
||||||
_S = subs_fold(
|
_S = emqx_persistent_session_ds_subs:fold(
|
||||||
fun(TopicFilter, Subscription, S) ->
|
fun(TopicFilter, Subscription, S) ->
|
||||||
do_unsubscribe(ID, TopicFilter, Subscription, S)
|
do_unsubscribe(ID, TopicFilter, Subscription, S)
|
||||||
end,
|
end,
|
||||||
|
@ -905,74 +899,6 @@ do_drain_buffer(Inflight0, S0, Acc) ->
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Remove subscriptions that have been marked for deletion, and
|
|
||||||
%% that don't have any unacked messages:
|
|
||||||
subscription_gc(S0) ->
|
|
||||||
subs_fold_all(
|
|
||||||
fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) ->
|
|
||||||
case Deleted andalso has_no_unacked_streams(SubId, S0) of
|
|
||||||
true ->
|
|
||||||
emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc);
|
|
||||||
false ->
|
|
||||||
Acc
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
S0,
|
|
||||||
S0
|
|
||||||
).
|
|
||||||
|
|
||||||
has_no_unacked_streams(SubId, S) ->
|
|
||||||
emqx_persistent_session_ds_state:fold_streams(
|
|
||||||
fun
|
|
||||||
({SID, _Stream}, Srs, Acc) when SID =:= SubId ->
|
|
||||||
emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc;
|
|
||||||
(_StreamKey, _Srs, Acc) ->
|
|
||||||
Acc
|
|
||||||
end,
|
|
||||||
true,
|
|
||||||
S
|
|
||||||
).
|
|
||||||
|
|
||||||
%% @doc It only returns subscriptions that haven't been marked for deletion:
|
|
||||||
subs_lookup(TopicFilter, S) ->
|
|
||||||
Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
|
|
||||||
case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of
|
|
||||||
#{deleted := true} ->
|
|
||||||
undefined;
|
|
||||||
Sub ->
|
|
||||||
Sub
|
|
||||||
end.
|
|
||||||
|
|
||||||
subs_to_map(S) ->
|
|
||||||
subs_fold(
|
|
||||||
fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end,
|
|
||||||
#{},
|
|
||||||
S
|
|
||||||
).
|
|
||||||
|
|
||||||
subs_fold(Fun, AccIn, S) ->
|
|
||||||
subs_fold_all(
|
|
||||||
fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) ->
|
|
||||||
case Deleted of
|
|
||||||
true -> Acc;
|
|
||||||
false -> Fun(TopicFilter, Sub, Acc)
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
AccIn,
|
|
||||||
S
|
|
||||||
).
|
|
||||||
|
|
||||||
%% @doc Iterate over all subscriptions, including the deleted ones:
|
|
||||||
subs_fold_all(Fun, AccIn, S) ->
|
|
||||||
Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
|
|
||||||
emqx_topic_gbt:fold(
|
|
||||||
fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end,
|
|
||||||
AccIn,
|
|
||||||
Subs
|
|
||||||
).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% TODO: find a more reliable way to perform actions that have side
|
%% TODO: find a more reliable way to perform actions that have side
|
||||||
%% effects. Add `CBM:init' callback to the session behavior?
|
%% effects. Add `CBM:init' callback to the session behavior?
|
||||||
-spec ensure_timers(session()) -> session().
|
-spec ensure_timers(session()) -> session().
|
||||||
|
|
|
@ -126,10 +126,10 @@ find_new_streams(S) ->
|
||||||
renew_streams(S0) ->
|
renew_streams(S0) ->
|
||||||
S1 = remove_unsubscribed_streams(S0),
|
S1 = remove_unsubscribed_streams(S0),
|
||||||
S2 = remove_fully_replayed_streams(S1),
|
S2 = remove_fully_replayed_streams(S1),
|
||||||
emqx_topic_gbt:fold(
|
emqx_persistent_session_ds_subs:fold(
|
||||||
fun
|
fun
|
||||||
(Key, _Subscription = #{start_time := StartTime, id := SubId, deleted := false}, Acc) ->
|
(Key, #{start_time := StartTime, id := SubId, deleted := false}, Acc) ->
|
||||||
TopicFilter = emqx_topic:words(emqx_trie_search:get_topic(Key)),
|
TopicFilter = emqx_topic:words(Key),
|
||||||
Streams = select_streams(
|
Streams = select_streams(
|
||||||
SubId,
|
SubId,
|
||||||
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
|
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
|
||||||
|
@ -146,7 +146,7 @@ renew_streams(S0) ->
|
||||||
Acc
|
Acc
|
||||||
end,
|
end,
|
||||||
S2,
|
S2,
|
||||||
emqx_persistent_session_ds_state:get_subscriptions(S2)
|
S2
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec on_unsubscribe(
|
-spec on_unsubscribe(
|
||||||
|
|
|
@ -0,0 +1,154 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc This module encapsulates the data related to the client's
|
||||||
|
%% subscriptions. It tries to reppresent the subscriptions as if they
|
||||||
|
%% were a simple key-value map.
|
||||||
|
%%
|
||||||
|
%% In reality, however, the session has to retain old the
|
||||||
|
%% subscriptions for longer to ensure the consistency of message
|
||||||
|
%% replay.
|
||||||
|
-module(emqx_persistent_session_ds_subs).
|
||||||
|
|
||||||
|
%% API:
|
||||||
|
-export([on_subscribe/3, on_unsubscribe/3, gc/1, lookup/2, to_map/1, fold/3, fold_all/3]).
|
||||||
|
|
||||||
|
-export_type([]).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Type declarations
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% API functions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
%% @doc Process a new subscription
|
||||||
|
-spec on_subscribe(
|
||||||
|
emqx_persistent_session_ds:topic_filter(),
|
||||||
|
emqx_persistent_session_ds:subscription(),
|
||||||
|
emqx_persistent_session_ds_state:t()
|
||||||
|
) ->
|
||||||
|
emqx_persistent_session_ds_state:t().
|
||||||
|
on_subscribe(TopicFilter, Subscription, S) ->
|
||||||
|
emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S).
|
||||||
|
|
||||||
|
%% @doc Process UNSUBSCRIBE
|
||||||
|
-spec on_unsubscribe(
|
||||||
|
emqx_persistent_session_ds:topic_filter(),
|
||||||
|
emqx_persistent_session_ds:subscription(),
|
||||||
|
emqx_persistent_session_ds_state:t()
|
||||||
|
) ->
|
||||||
|
emqx_persistent_session_ds_state:t().
|
||||||
|
on_unsubscribe(TopicFilter, Subscription0, S0) ->
|
||||||
|
%% Note: we cannot delete the subscription immediately, since its
|
||||||
|
%% metadata can be used during replay (see `process_batch'). We
|
||||||
|
%% instead mark it as deleted, and let `subscription_gc' function
|
||||||
|
%% dispatch it later:
|
||||||
|
Subscription = Subscription0#{deleted => true},
|
||||||
|
emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0).
|
||||||
|
|
||||||
|
%% @doc Remove subscriptions that have been marked for deletion, and
|
||||||
|
%% that don't have any unacked messages:
|
||||||
|
-spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
|
||||||
|
gc(S0) ->
|
||||||
|
fold_all(
|
||||||
|
fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) ->
|
||||||
|
case Deleted andalso has_no_unacked_streams(SubId, S0) of
|
||||||
|
true ->
|
||||||
|
emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc);
|
||||||
|
false ->
|
||||||
|
Acc
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
S0,
|
||||||
|
S0
|
||||||
|
).
|
||||||
|
|
||||||
|
%% @doc Fold over active subscriptions:
|
||||||
|
-spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) ->
|
||||||
|
emqx_persistent_session_ds:subscription() | undefined.
|
||||||
|
lookup(TopicFilter, S) ->
|
||||||
|
Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
|
||||||
|
case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of
|
||||||
|
#{deleted := true} ->
|
||||||
|
undefined;
|
||||||
|
Sub ->
|
||||||
|
Sub
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Convert active subscriptions to a map, for information
|
||||||
|
%% purpose:
|
||||||
|
-spec to_map(emqx_persistent_session_ds_state:t()) -> map().
|
||||||
|
to_map(S) ->
|
||||||
|
fold(
|
||||||
|
fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end,
|
||||||
|
#{},
|
||||||
|
S
|
||||||
|
).
|
||||||
|
|
||||||
|
%% @doc Fold over active subscriptions:
|
||||||
|
-spec fold(
|
||||||
|
fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc),
|
||||||
|
Acc,
|
||||||
|
emqx_persistent_session_ds_state:t()
|
||||||
|
) ->
|
||||||
|
Acc.
|
||||||
|
fold(Fun, AccIn, S) ->
|
||||||
|
fold_all(
|
||||||
|
fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) ->
|
||||||
|
case Deleted of
|
||||||
|
true -> Acc;
|
||||||
|
false -> Fun(TopicFilter, Sub, Acc)
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
AccIn,
|
||||||
|
S
|
||||||
|
).
|
||||||
|
|
||||||
|
%% @doc Fold over all subscriptions, including inactive ones:
|
||||||
|
-spec fold_all(
|
||||||
|
fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc),
|
||||||
|
Acc,
|
||||||
|
emqx_persistent_session_ds_state:t()
|
||||||
|
) ->
|
||||||
|
Acc.
|
||||||
|
fold_all(Fun, AccIn, S) ->
|
||||||
|
Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
|
||||||
|
emqx_topic_gbt:fold(
|
||||||
|
fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end,
|
||||||
|
AccIn,
|
||||||
|
Subs
|
||||||
|
).
|
||||||
|
|
||||||
|
%%================================================================================
|
||||||
|
%% Internal functions
|
||||||
|
%%================================================================================
|
||||||
|
|
||||||
|
-spec has_no_unacked_streams(
|
||||||
|
emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t()
|
||||||
|
) -> boolean().
|
||||||
|
has_no_unacked_streams(SubId, S) ->
|
||||||
|
emqx_persistent_session_ds_state:fold_streams(
|
||||||
|
fun
|
||||||
|
({SID, _Stream}, Srs, Acc) when SID =:= SubId ->
|
||||||
|
emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc;
|
||||||
|
(_StreamKey, _Srs, Acc) ->
|
||||||
|
Acc
|
||||||
|
end,
|
||||||
|
true,
|
||||||
|
S
|
||||||
|
).
|
Loading…
Reference in New Issue