diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index b8c853431..20c382934 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -368,52 +368,31 @@ subscribe( subscribe( TopicFilter, SubOpts, - Session = #{id := ID} + Session ) -> - {UpdateRouter, S1} = emqx_persistent_session_ds_subs:on_subscribe( - TopicFilter, SubOpts, Session - ), - case UpdateRouter of - true -> - ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID); - false -> - ok - end, - S = emqx_persistent_session_ds_state:commit(S1), - UpdateRouter andalso - ?tp(persistent_session_ds_subscription_added, #{topic_filter => TopicFilter, session => ID}), - {ok, Session#{s => S}}. + case emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, SubOpts, Session) of + {ok, S1} -> + S = emqx_persistent_session_ds_state:commit(S1), + {ok, Session#{s => S}}; + Error = {error, _} -> + Error + end. -spec unsubscribe(topic_filter(), session()) -> {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}. unsubscribe( TopicFilter, - Session = #{id := ID, s := S0} + Session = #{id := SessionId, s := S0} ) -> - case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of - undefined -> - {error, ?RC_NO_SUBSCRIPTION_EXISTED}; - Subscription = #{subopts := SubOpts} -> - S1 = do_unsubscribe(ID, TopicFilter, Subscription, S0), - S = emqx_persistent_session_ds_state:commit(S1), - {ok, Session#{s => S}, SubOpts} + case emqx_persistent_session_ds_subs:on_unsubscribe(SessionId, TopicFilter, S0) of + {ok, S1, #{id := SubId, subopts := SubOpts}} -> + S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1), + S = emqx_persistent_session_ds_state:commit(S2), + {ok, Session#{s => S}, SubOpts}; + Error = {error, _} -> + Error end. --spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> - emqx_persistent_session_ds_state:t(). -do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) -> - S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, S0), - ?tp(persistent_session_ds_subscription_delete, #{ - session_id => SessionId, topic_filter => TopicFilter - }), - S = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1), - ?tp_span( - persistent_session_ds_subscription_route_delete, - #{session_id => SessionId, topic_filter => TopicFilter}, - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId) - ), - S. - -spec get_subscription(topic_filter(), session()) -> emqx_types:subopts() | undefined. get_subscription(#share{}, _) -> @@ -860,18 +839,12 @@ session_ensure_new( %% @doc Called when a client reconnects with `clean session=true' or %% during session GC -spec session_drop(id(), _Reason) -> ok. -session_drop(ID, Reason) -> - case emqx_persistent_session_ds_state:open(ID) of +session_drop(SessionId, Reason) -> + case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> - ?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}), - _S = emqx_persistent_session_ds_subs:fold( - fun(TopicFilter, Subscription, S) -> - do_unsubscribe(ID, TopicFilter, Subscription, S) - end, - S0, - S0 - ), - emqx_persistent_session_ds_state:delete(ID); + ?tp(debug, drop_persistent_session, #{client_id => SessionId, reason => Reason}), + emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0), + emqx_persistent_session_ds_state:delete(SessionId); undefined -> ok end. diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index 99ad9f9fc..8b4f70a69 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -26,7 +26,8 @@ %% API: -export([ on_subscribe/3, - on_unsubscribe/2, + on_unsubscribe/3, + on_session_drop/2, gc/1, lookup/2, to_map/1, @@ -41,6 +42,8 @@ -export_type([subscription_state_id/0, subscription/0, subscription_state/0]). -include("emqx_persistent_session_ds.hrl"). +-include("emqx_mqtt.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). %%================================================================================ %% Type declarations @@ -81,14 +84,15 @@ emqx_types:subopts(), emqx_persistent_session_ds:session() ) -> - {_UpdateRouter :: boolean(), emqx_persistent_session_ds_state:t()}. -on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) -> + {ok, emqx_persistent_session_ds_state:t()} | {error, ?RC_QUOTA_EXCEEDED}. +on_subscribe(TopicFilter, SubOpts, #{id := SessionId, s := S0, props := Props}) -> #{upgrade_qos := UpgradeQoS, max_subscriptions := MaxSubscriptions} = Props, case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of undefined -> %% This is a new subscription: case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of true -> + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId), {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), SState = #{ @@ -105,16 +109,19 @@ on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) -> S = emqx_persistent_session_ds_state:put_subscription( TopicFilter, Subscription, S3 ), - {true, S}; + ?tp(persistent_session_ds_subscription_added, #{ + topic_filter => TopicFilter, session => SessionId + }), + {ok, S}; false -> - {false, S0} + {error, ?RC_QUOTA_EXCEEDED} end; Sub0 = #{current_state := SStateId0, id := SubId} -> SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of SState -> %% Client resubscribed with the same parameters: - {false, S0}; + {ok, S0}; _ -> %% Subsription parameters changed: {SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0), @@ -123,18 +130,46 @@ on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) -> ), Sub = Sub0#{current_state => SStateId}, S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2), - {false, S} + {ok, S} end end. %% @doc Process UNSUBSCRIBE -spec on_unsubscribe( + emqx_persistent_session_ds:id(), emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t() ) -> - emqx_persistent_session_ds_state:t(). -on_unsubscribe(TopicFilter, S0) -> - emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0). + {ok, emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()} + | {error, ?RC_NO_SUBSCRIPTION_EXISTED}. +on_unsubscribe(SessionId, TopicFilter, S0) -> + case lookup(TopicFilter, S0) of + undefined -> + {error, ?RC_NO_SUBSCRIPTION_EXISTED}; + Subscription -> + ?tp(persistent_session_ds_subscription_delete, #{ + session_id => SessionId, topic_filter => TopicFilter + }), + ?tp_span( + persistent_session_ds_subscription_route_delete, + #{session_id => SessionId, topic_filter => TopicFilter}, + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId) + ), + {ok, emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), Subscription} + end. + +-spec on_session_drop(emqx_persistent_session_ds:id(), emqx_persistent_session_ds_state:t()) -> ok. +on_session_drop(SessionId, S0) -> + fold( + fun(TopicFilter, _Subscription, S) -> + case on_unsubscribe(SessionId, TopicFilter, S) of + {ok, S1, _} -> S1; + _ -> S + end + end, + S0, + S0 + ). %% @doc Remove subscription states that don't have a parent, and that %% don't have any unacked messages: