From 113a990482bbddb61d46fd75c4da57914aada27f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 13 Apr 2024 11:13:17 +0200 Subject: [PATCH] feat(sessds): Support max subscriptions --- apps/emqx/src/emqx_persistent_session_ds.erl | 9 ++-- .../src/emqx_persistent_session_ds_subs.erl | 41 ++++++++++++------- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 5cae8487d..c25e3c813 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -367,10 +367,10 @@ subscribe( subscribe( TopicFilter, SubOpts, - Session = #{id := ID, s := S0, props := #{upgrade_qos := UpgradeQoS}} + Session = #{id := ID} ) -> {UpdateRouter, S1} = emqx_persistent_session_ds_subs:on_subscribe( - TopicFilter, UpgradeQoS, SubOpts, S0 + TopicFilter, SubOpts, Session ), case UpdateRouter of true -> @@ -379,9 +379,8 @@ subscribe( ok end, S = emqx_persistent_session_ds_state:commit(S1), - ?tp(persistent_session_ds_subscription_added, #{ - topic_filter => TopicFilter, is_new => UpdateRouter - }), + UpdateRouter andalso + ?tp(persistent_session_ds_subscription_added, #{topic_filter => TopicFilter, session => ID}), {ok, Session#{s => S}}. -spec unsubscribe(topic_filter(), session()) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index e9e2a97ee..1993370ed 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -25,7 +25,7 @@ %% API: -export([ - on_subscribe/4, + on_subscribe/3, on_unsubscribe/2, gc/1, lookup/2, @@ -73,26 +73,37 @@ %% @doc Process a new subscription -spec on_subscribe( emqx_persistent_session_ds:topic_filter(), - boolean(), emqx_types:subopts(), - emqx_persistent_session_ds_state:t() + emqx_persistent_session_ds:session() ) -> {_UpdateRouter :: boolean(), emqx_persistent_session_ds_state:t()}. -on_subscribe(TopicFilter, UpgradeQoS, SubOpts, S0) -> +on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) -> + #{upgrade_qos := UpgradeQoS, max_subscriptions := MaxSubscriptions} = Props, case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of undefined -> %% This is a new subscription: - {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), - {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), - SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, - S3 = emqx_persistent_session_ds_state:put_subscription_state(SStateId, SState, S2), - Subscription = #{ - id => SubId, - current_state => SStateId, - start_time => now_ms() - }, - S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Subscription, S3), - {true, S}; + case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of + true -> + {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), + {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), + SState = #{ + parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts + }, + S3 = emqx_persistent_session_ds_state:put_subscription_state( + SStateId, SState, S2 + ), + Subscription = #{ + id => SubId, + current_state => SStateId, + start_time => now_ms() + }, + S = emqx_persistent_session_ds_state:put_subscription( + TopicFilter, Subscription, S3 + ), + {true, S}; + false -> + {false, S0} + end; Sub0 = #{current_state := SStateId0, id := SubId} -> SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of