diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 7db86dfe0..eb45ef014 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -107,7 +107,7 @@ open(S, Opts) -> Agent = emqx_persistent_session_ds_shared_subs_agent:open( SharedSubscriptions, agent_opts(Opts) ), - SharedSubS = #{agent => Agent}, + SharedSubS = #{agent => Agent, scheduled_actions => #{}}, {ok, S, SharedSubS}. %%-------------------------------------------------------------------- @@ -136,6 +136,7 @@ on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Sessio on_subscribe(Subscription, TopicFilter, SubOpts, Session) -> update_subscription(Subscription, TopicFilter, SubOpts, Session). +-dialyzer({nowarn_function, create_new_subscription/3}). create_new_subscription(TopicFilter, SubOpts, #{ s := S0, shared_sub_s := #{agent := Agent} = SharedSubS0, @@ -190,6 +191,7 @@ update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilt {ok, S, SharedSubS} end. +-dialyzer({nowarn_function, schedule_subscribe/3}). schedule_subscribe( #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0, TopicFilter, SubOpts ) -> @@ -605,6 +607,7 @@ to_agent_subscription(_S, Subscription) -> agent_opts(#{session_id := SessionId}) -> #{session_id => SessionId}. +-dialyzer({nowarn_function, now_ms/0}). now_ms() -> erlang:system_time(millisecond). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl index 689c4ba89..5cde51f16 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl @@ -34,6 +34,14 @@ rank_x() => x_progress() }. +-export_type([ + t/0 +]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + -spec init() -> t(). init() -> #{}. @@ -47,8 +55,8 @@ set_replayed({{RankX, RankY}, Stream}, State) -> _ -> ?SLOG( warning, - leader_rank_progress_double_or_invalid_update, #{ + msg => leader_rank_progress_double_or_invalid_update, rank_x => RankX, rank_y => RankY, state => State @@ -57,7 +65,8 @@ set_replayed({{RankX, RankY}, Stream}, State) -> State end. --spec add_streams([{emqx_ds:stream_rank(), emqx_ds:stream()}], t()) -> false | {true, t()}. +-spec add_streams([{emqx_ds:stream_rank(), emqx_ds:stream()}], t()) -> + {[{emqx_ds:stream_rank(), emqx_ds:stream()}], t()}. add_streams(StreamsWithRanks, State) -> SortedStreamsWithRanks = lists:sort( fun({{_RankX1, RankY1}, _Stream1}, {{_RankX2, RankY2}, _Stream2}) ->