From a97a0d64006a3ecadd285d1365c032b2aeca4676 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 10 Jul 2024 20:25:53 +0300 Subject: [PATCH] feat(queue): fix dialyzer issues --- ...x_persistent_session_ds_shared_subs_agent.erl | 4 ++-- .../src/emqx_ds_shared_sub_agent.erl | 3 ++- .../src/emqx_ds_shared_sub_group_sm.erl | 16 ++++++++-------- apps/emqx_durable_storage/src/emqx_ds.erl | 2 +- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl index 022963ad9..dff66de0f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl @@ -52,7 +52,7 @@ t/0, subscription/0, session_id/0, - stream_lease/0, + stream_lease_event/0, opts/0 ]). @@ -84,7 +84,7 @@ -callback can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}. -callback on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t(). -callback on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t(). --callback on_disconnect(t(), #{share_topic_filter() => [stream_progress()]}) -> t(). +-callback on_disconnect(t(), [stream_progress()]) -> t(). -callback renew_streams(t()) -> {[stream_lease_event()], t()}. -callback on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t(). -callback on_info(t(), term()) -> t(). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index fea711d0f..5b71a93e5 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -126,7 +126,8 @@ on_subscribe(State0, ShareTopicFilter, _SubOpts) -> on_unsubscribe(State, ShareTopicFilter, GroupProgress) -> delete_shared_subscription(State, ShareTopicFilter, GroupProgress). --spec renew_streams(t()) -> {[emqx_persistent_session_ds_shared_subs:agent_stream_event()], t()}. +-spec renew_streams(t()) -> + {[emqx_persistent_session_ds_shared_subs_agent:stream_lease_event()], t()}. renew_streams(#{} = State) -> fetch_stream_events(State). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index 2b37328a2..a648bbaef 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -33,7 +33,7 @@ ]). -export_type([ - group_sm/0, + t/0, options/0, state/0 ]). @@ -97,7 +97,7 @@ -type timer_name() :: atom(). -type timer() :: #timer{}. --type group_sm() :: #{ +-type t() :: #{ share_topic_filter => emqx_persistent_session_ds:share_topic_filter(), agent => emqx_ds_shared_sub_proto:agent(), send_after => fun((non_neg_integer(), term()) -> reference()), @@ -112,7 +112,7 @@ %% API %%----------------------------------------------------------------------- --spec new(options()) -> group_sm(). +-spec new(options()) -> t(). new(#{ session_id := SessionId, agent := Agent, @@ -139,8 +139,8 @@ new(#{ }), transition(GSM0, ?connecting, #{}). --spec fetch_stream_events(group_sm()) -> - {group_sm(), [emqx_ds_shared_sub_agent:external_lease_event()]}. +-spec fetch_stream_events(t()) -> + {t(), [emqx_ds_shared_sub_agent:external_lease_event()]}. fetch_stream_events( #{ state := _State, @@ -156,7 +156,7 @@ fetch_stream_events( ), {GSM#{stream_lease_events => []}, Events1}. --spec handle_disconnect(group_sm(), emqx_ds_shared_sub_proto:agent_stream_progress()) -> group_sm(). +-spec handle_disconnect(t(), emqx_ds_shared_sub_proto:agent_stream_progress()) -> t(). handle_disconnect(#{state := ?connecting} = GSM, _StreamProgresses) -> transition(GSM, ?disconnected, #{}); handle_disconnect( @@ -378,8 +378,8 @@ handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) -> }), transition(GSM, ?connecting, #{}). --spec handle_stream_progress(group_sm(), list(emqx_ds_shared_sub_proto:agent_stream_progress())) -> - group_sm(). +-spec handle_stream_progress(t(), list(emqx_ds_shared_sub_proto:agent_stream_progress())) -> + t(). handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) -> GSM; handle_stream_progress( diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 6aaba205d..38d63e41f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -132,7 +132,7 @@ %% TODO: Not implemented -type iterator_id() :: term(). --opaque iterator() :: ds_specific_iterator(). +-type iterator() :: ds_specific_iterator(). -opaque delete_iterator() :: ds_specific_delete_iterator().