From e408804efb7528595e15772c79a20566fba23993 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 19 Jul 2024 21:17:29 +0300 Subject: [PATCH] feat(queue): fix dialyzer issues --- apps/emqx/src/emqx_persistent_session_ds.erl | 4 ++-- .../emqx_persistent_session_ds_shared_subs.erl | 2 +- .../emqx_persistent_session_ds_state.erl | 4 +++- apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl | 9 +++++++-- rel/i18n/emqx_ds_shared_sub_schema.hocon | 6 ++++++ 5 files changed, 19 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index bd763e62f..b86b44611 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -821,10 +821,10 @@ list_client_subscriptions(ClientId) -> {error, not_found} end. --spec get_client_subscription(emqx_types:clientid(), topic_filter()) -> +-spec get_client_subscription(emqx_types:clientid(), topic_filter() | share_topic_filter()) -> subscription() | undefined. get_client_subscription(ClientId, #share{} = ShareTopicFilter) -> - emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, ShareTopicFilter); + emqx_persistent_session_ds_shared_subs:cold_get_subscription(ClientId, ShareTopicFilter); get_client_subscription(ClientId, TopicFilter) -> emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, TopicFilter). diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 11b89441d..5b54c6f73 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -606,7 +606,7 @@ to_map(S, _SharedSubS) -> %%-------------------------------------------------------------------- %% cold_get_subscription --spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) -> +-spec cold_get_subscription(emqx_persistent_session_ds:id(), share_topic_filter()) -> emqx_persistent_session_ds:subscription() | undefined. cold_get_subscription(SessionId, ShareTopicFilter) -> case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, ShareTopicFilter) of diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl index 1d60250ea..3d3840307 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl @@ -399,7 +399,9 @@ new_id(Rec) -> get_subscription(TopicFilter, Rec) -> gen_get(?subscriptions, TopicFilter, Rec). --spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) -> +-spec cold_get_subscription( + emqx_persistent_session_ds:id(), emqx_types:topic() | emqx_types:share() +) -> [emqx_persistent_session_ds_subs:subscription()]. cold_get_subscription(SessionId, Topic) -> kv_pmap_read(?subscription_tab, SessionId, Topic). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index 5b71a93e5..a90f1286d 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -9,6 +9,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include("emqx_ds_shared_sub_proto.hrl"). +-include("emqx_ds_shared_sub_config.hrl"). -export([ new/1, @@ -109,9 +110,13 @@ open(TopicSubscriptions, Opts) -> ), State1. --spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok. +-spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> + ok | {error, emqx_types:reason_code()}. can_subscribe(_State, _ShareTopicFilter, _SubOpts) -> - ok. + case ?dq_config(enable) of + true -> ok; + false -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED} + end. -spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t(). on_subscribe(State0, ShareTopicFilter, _SubOpts) -> diff --git a/rel/i18n/emqx_ds_shared_sub_schema.hocon b/rel/i18n/emqx_ds_shared_sub_schema.hocon index 5a95e9693..2ee28cc30 100644 --- a/rel/i18n/emqx_ds_shared_sub_schema.hocon +++ b/rel/i18n/emqx_ds_shared_sub_schema.hocon @@ -1,5 +1,11 @@ emqx_ds_shared_sub_schema { +enable.desc: +"""Enable the shared subscription feature.""" + +enable.label: +"""Enable Shared Subscription""" + session_find_leader_timeout_ms.desc: """The timeout in milliseconds for the session to find a leader. If the session cannot find a leader within this time, the session will retry."""