feat(queue): fix dialyzer issues

This commit is contained in:
Ilya Averyanov 2024-07-04 17:51:42 +03:00
parent 65ab81ff74
commit 91dd1183ad
2 changed files with 15 additions and 3 deletions

View File

@ -107,7 +107,7 @@ open(S, Opts) ->
Agent = emqx_persistent_session_ds_shared_subs_agent:open( Agent = emqx_persistent_session_ds_shared_subs_agent:open(
SharedSubscriptions, agent_opts(Opts) SharedSubscriptions, agent_opts(Opts)
), ),
SharedSubS = #{agent => Agent}, SharedSubS = #{agent => Agent, scheduled_actions => #{}},
{ok, S, SharedSubS}. {ok, S, SharedSubS}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -136,6 +136,7 @@ on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Sessio
on_subscribe(Subscription, TopicFilter, SubOpts, Session) -> on_subscribe(Subscription, TopicFilter, SubOpts, Session) ->
update_subscription(Subscription, TopicFilter, SubOpts, Session). update_subscription(Subscription, TopicFilter, SubOpts, Session).
-dialyzer({nowarn_function, create_new_subscription/3}).
create_new_subscription(TopicFilter, SubOpts, #{ create_new_subscription(TopicFilter, SubOpts, #{
s := S0, s := S0,
shared_sub_s := #{agent := Agent} = SharedSubS0, shared_sub_s := #{agent := Agent} = SharedSubS0,
@ -190,6 +191,7 @@ update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilt
{ok, S, SharedSubS} {ok, S, SharedSubS}
end. end.
-dialyzer({nowarn_function, schedule_subscribe/3}).
schedule_subscribe( schedule_subscribe(
#{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0, TopicFilter, SubOpts #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0, TopicFilter, SubOpts
) -> ) ->
@ -605,6 +607,7 @@ to_agent_subscription(_S, Subscription) ->
agent_opts(#{session_id := SessionId}) -> agent_opts(#{session_id := SessionId}) ->
#{session_id => SessionId}. #{session_id => SessionId}.
-dialyzer({nowarn_function, now_ms/0}).
now_ms() -> now_ms() ->
erlang:system_time(millisecond). erlang:system_time(millisecond).

View File

@ -34,6 +34,14 @@
rank_x() => x_progress() rank_x() => x_progress()
}. }.
-export_type([
t/0
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec init() -> t(). -spec init() -> t().
init() -> #{}. init() -> #{}.
@ -47,8 +55,8 @@ set_replayed({{RankX, RankY}, Stream}, State) ->
_ -> _ ->
?SLOG( ?SLOG(
warning, warning,
leader_rank_progress_double_or_invalid_update,
#{ #{
msg => leader_rank_progress_double_or_invalid_update,
rank_x => RankX, rank_x => RankX,
rank_y => RankY, rank_y => RankY,
state => State state => State
@ -57,7 +65,8 @@ set_replayed({{RankX, RankY}, Stream}, State) ->
State State
end. end.
-spec add_streams([{emqx_ds:stream_rank(), emqx_ds:stream()}], t()) -> false | {true, t()}. -spec add_streams([{emqx_ds:stream_rank(), emqx_ds:stream()}], t()) ->
{[{emqx_ds:stream_rank(), emqx_ds:stream()}], t()}.
add_streams(StreamsWithRanks, State) -> add_streams(StreamsWithRanks, State) ->
SortedStreamsWithRanks = lists:sort( SortedStreamsWithRanks = lists:sort(
fun({{_RankX1, RankY1}, _Stream1}, {{_RankX2, RankY2}, _Stream2}) -> fun({{_RankX1, RankY1}, _Stream1}, {{_RankX2, RankY2}, _Stream2}) ->