feat(queue): fix dialyzer issues

This commit is contained in:
Ilya Averyanov 2024-07-10 20:25:53 +03:00
parent 8705956cdc
commit a97a0d6400
4 changed files with 13 additions and 12 deletions

View File

@ -52,7 +52,7 @@
t/0, t/0,
subscription/0, subscription/0,
session_id/0, session_id/0,
stream_lease/0, stream_lease_event/0,
opts/0 opts/0
]). ]).
@ -84,7 +84,7 @@
-callback can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}. -callback can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
-callback on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t(). -callback on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
-callback on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t(). -callback on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
-callback on_disconnect(t(), #{share_topic_filter() => [stream_progress()]}) -> t(). -callback on_disconnect(t(), [stream_progress()]) -> t().
-callback renew_streams(t()) -> {[stream_lease_event()], t()}. -callback renew_streams(t()) -> {[stream_lease_event()], t()}.
-callback on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t(). -callback on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
-callback on_info(t(), term()) -> t(). -callback on_info(t(), term()) -> t().

View File

@ -126,7 +126,8 @@ on_subscribe(State0, ShareTopicFilter, _SubOpts) ->
on_unsubscribe(State, ShareTopicFilter, GroupProgress) -> on_unsubscribe(State, ShareTopicFilter, GroupProgress) ->
delete_shared_subscription(State, ShareTopicFilter, GroupProgress). delete_shared_subscription(State, ShareTopicFilter, GroupProgress).
-spec renew_streams(t()) -> {[emqx_persistent_session_ds_shared_subs:agent_stream_event()], t()}. -spec renew_streams(t()) ->
{[emqx_persistent_session_ds_shared_subs_agent:stream_lease_event()], t()}.
renew_streams(#{} = State) -> renew_streams(#{} = State) ->
fetch_stream_events(State). fetch_stream_events(State).

View File

@ -33,7 +33,7 @@
]). ]).
-export_type([ -export_type([
group_sm/0, t/0,
options/0, options/0,
state/0 state/0
]). ]).
@ -97,7 +97,7 @@
-type timer_name() :: atom(). -type timer_name() :: atom().
-type timer() :: #timer{}. -type timer() :: #timer{}.
-type group_sm() :: #{ -type t() :: #{
share_topic_filter => emqx_persistent_session_ds:share_topic_filter(), share_topic_filter => emqx_persistent_session_ds:share_topic_filter(),
agent => emqx_ds_shared_sub_proto:agent(), agent => emqx_ds_shared_sub_proto:agent(),
send_after => fun((non_neg_integer(), term()) -> reference()), send_after => fun((non_neg_integer(), term()) -> reference()),
@ -112,7 +112,7 @@
%% API %% API
%%----------------------------------------------------------------------- %%-----------------------------------------------------------------------
-spec new(options()) -> group_sm(). -spec new(options()) -> t().
new(#{ new(#{
session_id := SessionId, session_id := SessionId,
agent := Agent, agent := Agent,
@ -139,8 +139,8 @@ new(#{
}), }),
transition(GSM0, ?connecting, #{}). transition(GSM0, ?connecting, #{}).
-spec fetch_stream_events(group_sm()) -> -spec fetch_stream_events(t()) ->
{group_sm(), [emqx_ds_shared_sub_agent:external_lease_event()]}. {t(), [emqx_ds_shared_sub_agent:external_lease_event()]}.
fetch_stream_events( fetch_stream_events(
#{ #{
state := _State, state := _State,
@ -156,7 +156,7 @@ fetch_stream_events(
), ),
{GSM#{stream_lease_events => []}, Events1}. {GSM#{stream_lease_events => []}, Events1}.
-spec handle_disconnect(group_sm(), emqx_ds_shared_sub_proto:agent_stream_progress()) -> group_sm(). -spec handle_disconnect(t(), emqx_ds_shared_sub_proto:agent_stream_progress()) -> t().
handle_disconnect(#{state := ?connecting} = GSM, _StreamProgresses) -> handle_disconnect(#{state := ?connecting} = GSM, _StreamProgresses) ->
transition(GSM, ?disconnected, #{}); transition(GSM, ?disconnected, #{});
handle_disconnect( handle_disconnect(
@ -378,8 +378,8 @@ handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) ->
}), }),
transition(GSM, ?connecting, #{}). transition(GSM, ?connecting, #{}).
-spec handle_stream_progress(group_sm(), list(emqx_ds_shared_sub_proto:agent_stream_progress())) -> -spec handle_stream_progress(t(), list(emqx_ds_shared_sub_proto:agent_stream_progress())) ->
group_sm(). t().
handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) -> handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) ->
GSM; GSM;
handle_stream_progress( handle_stream_progress(

View File

@ -132,7 +132,7 @@
%% TODO: Not implemented %% TODO: Not implemented
-type iterator_id() :: term(). -type iterator_id() :: term().
-opaque iterator() :: ds_specific_iterator(). -type iterator() :: ds_specific_iterator().
-opaque delete_iterator() :: ds_specific_delete_iterator(). -opaque delete_iterator() :: ds_specific_delete_iterator().