diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 5cae8487d..c25e3c813 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -367,10 +367,10 @@ subscribe( subscribe( TopicFilter, SubOpts, - Session = #{id := ID, s := S0, props := #{upgrade_qos := UpgradeQoS}} + Session = #{id := ID} ) -> {UpdateRouter, S1} = emqx_persistent_session_ds_subs:on_subscribe( - TopicFilter, UpgradeQoS, SubOpts, S0 + TopicFilter, SubOpts, Session ), case UpdateRouter of true -> @@ -379,9 +379,8 @@ subscribe( ok end, S = emqx_persistent_session_ds_state:commit(S1), - ?tp(persistent_session_ds_subscription_added, #{ - topic_filter => TopicFilter, is_new => UpdateRouter - }), + UpdateRouter andalso + ?tp(persistent_session_ds_subscription_added, #{topic_filter => TopicFilter, session => ID}), {ok, Session#{s => S}}. -spec unsubscribe(topic_filter(), session()) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index e9e2a97ee..1993370ed 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -25,7 +25,7 @@ %% API: -export([ - on_subscribe/4, + on_subscribe/3, on_unsubscribe/2, gc/1, lookup/2, @@ -73,26 +73,37 @@ %% @doc Process a new subscription -spec on_subscribe( emqx_persistent_session_ds:topic_filter(), - boolean(), emqx_types:subopts(), - emqx_persistent_session_ds_state:t() + emqx_persistent_session_ds:session() ) -> {_UpdateRouter :: boolean(), emqx_persistent_session_ds_state:t()}. -on_subscribe(TopicFilter, UpgradeQoS, SubOpts, S0) -> +on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) -> + #{upgrade_qos := UpgradeQoS, max_subscriptions := MaxSubscriptions} = Props, case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of undefined -> %% This is a new subscription: - {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), - {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), - SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, - S3 = emqx_persistent_session_ds_state:put_subscription_state(SStateId, SState, S2), - Subscription = #{ - id => SubId, - current_state => SStateId, - start_time => now_ms() - }, - S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Subscription, S3), - {true, S}; + case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of + true -> + {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), + {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), + SState = #{ + parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts + }, + S3 = emqx_persistent_session_ds_state:put_subscription_state( + SStateId, SState, S2 + ), + Subscription = #{ + id => SubId, + current_state => SStateId, + start_time => now_ms() + }, + S = emqx_persistent_session_ds_state:put_subscription( + TopicFilter, Subscription, S3 + ), + {true, S}; + false -> + {false, S0} + end; Sub0 = #{current_state := SStateId0, id := SubId} -> SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of