diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 90d86bb1d..bad8352c8 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -126,21 +126,31 @@ | ?rec | ?committed(?QOS_2). +-define(id, id). +-define(dirty, dirty). +-define(metadata, metadata). +-define(subscriptions, subscriptions). +-define(subscription_states, subscription_states). +-define(seqnos, seqnos). +-define(streams, streams). +-define(ranks, ranks). +-define(awaiting_rel, awaiting_rel). + -opaque t() :: #{ - id := emqx_persistent_session_ds:id(), - dirty := boolean(), - metadata := metadata(), - subscriptions := pmap( + ?id := emqx_persistent_session_ds:id(), + ?dirty := boolean(), + ?metadata := metadata(), + ?subscriptions := pmap( emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_subs:subscription() ), - subscription_states := pmap( + ?subscription_states := pmap( emqx_persistent_session_ds_subs:subscription_state_id(), emqx_persistent_session_ds_subs:subscription_state() ), - seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), - streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), - ranks := pmap(term(), integer()), - awaiting_rel := pmap(emqx_types:packet_id(), _Timestamp :: integer()) + ?seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), + ?streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), + ?ranks := pmap(term(), integer()), + ?awaiting_rel := pmap(emqx_types:packet_id(), _Timestamp :: integer()) }. -define(session_tab, emqx_ds_session_tab). @@ -152,12 +162,12 @@ -define(awaiting_rel_tab, emqx_ds_session_awaiting_rel). -define(pmaps, [ - {subscriptions, ?subscription_tab}, - {subscription_states, ?subscription_states_tab}, - {streams, ?stream_tab}, - {seqnos, ?seqno_tab}, - {ranks, ?rank_tab}, - {awaiting_rel, ?awaiting_rel_tab} + {?subscriptions, ?subscription_tab}, + {?subscription_states, ?subscription_states_tab}, + {?streams, ?stream_tab}, + {?seqnos, ?seqno_tab}, + {?ranks, ?rank_tab}, + {?awaiting_rel, ?awaiting_rel_tab} ]). %% Enable this flag if you suspect some code breaks the sequence: @@ -358,15 +368,15 @@ new_id(Rec) -> -spec get_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> emqx_persistent_session_ds_subs:subscription() | undefined. get_subscription(TopicFilter, Rec) -> - gen_get(subscriptions, TopicFilter, Rec). + gen_get(?subscriptions, TopicFilter, Rec). -spec fold_subscriptions(fun(), Acc, t()) -> Acc. fold_subscriptions(Fun, Acc, Rec) -> - gen_fold(subscriptions, Fun, Acc, Rec). + gen_fold(?subscriptions, Fun, Acc, Rec). -spec n_subscriptions(t()) -> non_neg_integer(). n_subscriptions(Rec) -> - gen_size(subscriptions, Rec). + gen_size(?subscriptions, Rec). -spec put_subscription( emqx_persistent_session_ds:topic_filter(), @@ -374,22 +384,22 @@ n_subscriptions(Rec) -> t() ) -> t(). put_subscription(TopicFilter, Subscription, Rec) -> - gen_put(subscriptions, TopicFilter, Subscription, Rec). + gen_put(?subscriptions, TopicFilter, Subscription, Rec). -spec del_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> t(). del_subscription(TopicFilter, Rec) -> - gen_del(subscriptions, TopicFilter, Rec). + gen_del(?subscriptions, TopicFilter, Rec). %% -spec get_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> emqx_persistent_session_ds_subs:subscription_state() | undefined. get_subscription_state(SStateId, Rec) -> - gen_get(subscription_states, SStateId, Rec). + gen_get(?subscription_states, SStateId, Rec). -spec fold_subscription_states(fun(), Acc, t()) -> Acc. fold_subscription_states(Fun, Acc, Rec) -> - gen_fold(subscription_states, Fun, Acc, Rec). + gen_fold(?subscription_states, Fun, Acc, Rec). -spec put_subscription_state( emqx_persistent_session_ds_subs:subscription_state_id(), @@ -397,11 +407,11 @@ fold_subscription_states(Fun, Acc, Rec) -> t() ) -> t(). put_subscription_state(SStateId, SState, Rec) -> - gen_put(subscription_states, SStateId, SState, Rec). + gen_put(?subscription_states, SStateId, SState, Rec). -spec del_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> t(). del_subscription_state(SStateId, Rec) -> - gen_del(subscription_states, SStateId, Rec). + gen_del(?subscription_states, SStateId, Rec). %% @@ -410,33 +420,33 @@ del_subscription_state(SStateId, Rec) -> -spec get_stream(stream_key(), t()) -> emqx_persistent_session_ds:stream_state() | undefined. get_stream(Key, Rec) -> - gen_get(streams, Key, Rec). + gen_get(?streams, Key, Rec). -spec put_stream(stream_key(), emqx_persistent_session_ds:stream_state(), t()) -> t(). put_stream(Key, Val, Rec) -> - gen_put(streams, Key, Val, Rec). + gen_put(?streams, Key, Val, Rec). -spec del_stream(stream_key(), t()) -> t(). del_stream(Key, Rec) -> - gen_del(streams, Key, Rec). + gen_del(?streams, Key, Rec). -spec fold_streams(fun(), Acc, t()) -> Acc. fold_streams(Fun, Acc, Rec) -> - gen_fold(streams, Fun, Acc, Rec). + gen_fold(?streams, Fun, Acc, Rec). -spec n_streams(t()) -> non_neg_integer(). n_streams(Rec) -> - gen_size(streams, Rec). + gen_size(?streams, Rec). %% -spec get_seqno(seqno_type(), t()) -> emqx_persistent_session_ds:seqno() | undefined. get_seqno(Key, Rec) -> - gen_get(seqnos, Key, Rec). + gen_get(?seqnos, Key, Rec). -spec put_seqno(seqno_type(), emqx_persistent_session_ds:seqno(), t()) -> t(). put_seqno(Key, Val, Rec) -> - gen_put(seqnos, Key, Val, Rec). + gen_put(?seqnos, Key, Val, Rec). %% @@ -444,41 +454,41 @@ put_seqno(Key, Val, Rec) -> -spec get_rank(rank_key(), t()) -> integer() | undefined. get_rank(Key, Rec) -> - gen_get(ranks, Key, Rec). + gen_get(?ranks, Key, Rec). -spec put_rank(rank_key(), integer(), t()) -> t(). put_rank(Key, Val, Rec) -> - gen_put(ranks, Key, Val, Rec). + gen_put(?ranks, Key, Val, Rec). -spec del_rank(rank_key(), t()) -> t(). del_rank(Key, Rec) -> - gen_del(ranks, Key, Rec). + gen_del(?ranks, Key, Rec). -spec fold_ranks(fun(), Acc, t()) -> Acc. fold_ranks(Fun, Acc, Rec) -> - gen_fold(ranks, Fun, Acc, Rec). + gen_fold(?ranks, Fun, Acc, Rec). %% -spec get_awaiting_rel(emqx_types:packet_id(), t()) -> integer() | undefined. get_awaiting_rel(Key, Rec) -> - gen_get(awaiting_rel, Key, Rec). + gen_get(?awaiting_rel, Key, Rec). -spec put_awaiting_rel(emqx_types:packet_id(), _Timestamp :: integer(), t()) -> t(). put_awaiting_rel(Key, Val, Rec) -> - gen_put(awaiting_rel, Key, Val, Rec). + gen_put(?awaiting_rel, Key, Val, Rec). -spec del_awaiting_rel(emqx_types:packet_id(), t()) -> t(). del_awaiting_rel(Key, Rec) -> - gen_del(awaiting_rel, Key, Rec). + gen_del(?awaiting_rel, Key, Rec). -spec fold_awaiting_rel(fun(), Acc, t()) -> Acc. fold_awaiting_rel(Fun, Acc, Rec) -> - gen_fold(awaiting_rel, Fun, Acc, Rec). + gen_fold(?awaiting_rel, Fun, Acc, Rec). -spec n_awaiting_rel(t()) -> non_neg_integer(). n_awaiting_rel(Rec) -> - gen_size(awaiting_rel, Rec). + gen_size(?awaiting_rel, Rec). %%