From 648b6ac63ef4792c223be5eed72eee2f8c4a8bf6 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 16 Nov 2023 16:35:46 +0700 Subject: [PATCH] chore(sessds): rename iterators -> subscriptions Also try to make clearer the difference between 2 flavors of topic filter representation in use. --- apps/emqx/src/emqx_persistent_session_ds.erl | 72 ++++++++++---------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 6c0fc2dcc..ca3fc3514 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -83,11 +83,12 @@ %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% an atom, in theory (?). -type id() :: binary(). --type topic_filter() :: emqx_ds:topic_filter(). +-type topic_filter() :: emqx_types:topic(). +-type topic_filter_words() :: emqx_ds:topic_filter(). -type subscription_id() :: {id(), topic_filter()}. -type subscription() :: #{ start_time := emqx_ds:time(), - propts := map(), + props := map(), extra := map() }. -type session() :: #{ @@ -98,7 +99,7 @@ %% When the session should expire expires_at := timestamp() | never, %% Client’s Subscriptions. - iterators := #{topic() => subscription()}, + subscriptions := #{topic_filter() => subscription()}, %% Inflight messages inflight := emqx_persistent_message_ds_replayer:inflight(), %% Receive maximum @@ -108,7 +109,6 @@ }. -type timestamp() :: emqx_utils_calendar:epoch_millisecond(). --type topic() :: emqx_types:topic(). -type clientinfo() :: emqx_types:clientinfo(). -type conninfo() :: emqx_session:conninfo(). -type replies() :: emqx_session:replies(). @@ -195,9 +195,9 @@ info(created_at, #{created_at := CreatedAt}) -> CreatedAt; info(is_persistent, #{}) -> true; -info(subscriptions, #{iterators := Iters}) -> +info(subscriptions, #{subscriptions := Iters}) -> maps:map(fun(_, #{props := SubOpts}) -> SubOpts end, Iters); -info(subscriptions_cnt, #{iterators := Iters}) -> +info(subscriptions_cnt, #{subscriptions := Iters}) -> maps:size(Iters); info(subscriptions_max, #{props := Conf}) -> maps:get(max_subscriptions, Conf); @@ -239,47 +239,47 @@ stats(Session) -> %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE %%-------------------------------------------------------------------- --spec subscribe(topic(), emqx_types:subopts(), session()) -> +-spec subscribe(topic_filter(), emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}. subscribe( TopicFilter, SubOpts, - Session = #{id := ID, iterators := Iters} -) when is_map_key(TopicFilter, Iters) -> - Iterator = maps:get(TopicFilter, Iters), - NIterator = update_subscription(TopicFilter, Iterator, SubOpts, ID), - {ok, Session#{iterators := Iters#{TopicFilter => NIterator}}}; + Session = #{id := ID, subscriptions := Subs} +) when is_map_key(TopicFilter, Subs) -> + Subscription = maps:get(TopicFilter, Subs), + NSubscription = update_subscription(TopicFilter, Subscription, SubOpts, ID), + {ok, Session#{subscriptions := Subs#{TopicFilter => NSubscription}}}; subscribe( TopicFilter, SubOpts, - Session = #{id := ID, iterators := Iters} + Session = #{id := ID, subscriptions := Subs} ) -> % TODO: max_subscriptions - Iterator = add_subscription(TopicFilter, SubOpts, ID), - {ok, Session#{iterators := Iters#{TopicFilter => Iterator}}}. + Subscription = add_subscription(TopicFilter, SubOpts, ID), + {ok, Session#{subscriptions := Subs#{TopicFilter => Subscription}}}. --spec unsubscribe(topic(), session()) -> +-spec unsubscribe(topic_filter(), session()) -> {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}. unsubscribe( TopicFilter, - Session = #{id := ID, iterators := Iters} -) when is_map_key(TopicFilter, Iters) -> - Iterator = maps:get(TopicFilter, Iters), - SubOpts = maps:get(props, Iterator), + Session = #{id := ID, subscriptions := Subs} +) when is_map_key(TopicFilter, Subs) -> + Subscription = maps:get(TopicFilter, Subs), + SubOpts = maps:get(props, Subscription), ok = del_subscription(TopicFilter, ID), - {ok, Session#{iterators := maps:remove(TopicFilter, Iters)}, SubOpts}; + {ok, Session#{subscriptions := maps:remove(TopicFilter, Subs)}, SubOpts}; unsubscribe( _TopicFilter, _Session = #{} ) -> {error, ?RC_NO_SUBSCRIPTION_EXISTED}. --spec get_subscription(topic(), session()) -> +-spec get_subscription(topic_filter(), session()) -> emqx_types:subopts() | undefined. -get_subscription(TopicFilter, #{iterators := Iters}) -> - case maps:get(TopicFilter, Iters, undefined) of - Iterator = #{} -> - maps:get(props, Iterator); +get_subscription(TopicFilter, #{subscriptions := Subs}) -> + case maps:get(TopicFilter, Subs, undefined) of + Subscription = #{} -> + maps:get(props, Subscription); undefined -> undefined end. @@ -292,7 +292,7 @@ get_subscription(TopicFilter, #{iterators := Iters}) -> {ok, emqx_types:publish_result(), replies(), session()} | {error, emqx_types:reason_code()}. publish(_PacketId, Msg, Session) -> - %% TODO: + %% TODO: QoS2 Result = emqx_broker:publish(Msg), {ok, Result, [], Session}. @@ -397,7 +397,7 @@ terminate(_Reason, _Session = #{}) -> %%-------------------------------------------------------------------- --spec add_subscription(topic(), emqx_types:subopts(), id()) -> +-spec add_subscription(topic_filter(), emqx_types:subopts(), id()) -> subscription(). add_subscription(TopicFilterBin, SubOpts, DSSessionID) -> %% N.B.: we chose to update the router before adding the subscription to the @@ -427,7 +427,7 @@ add_subscription(TopicFilterBin, SubOpts, DSSessionID) -> %% we'll list streams and open iterators when implementing message replay. DSSubExt. --spec update_subscription(topic(), subscription(), emqx_types:subopts(), id()) -> +-spec update_subscription(topic_filter(), subscription(), emqx_types:subopts(), id()) -> subscription(). update_subscription(TopicFilterBin, DSSubExt, SubOpts, DSSessionID) -> TopicFilter = emqx_topic:words(TopicFilterBin), @@ -437,7 +437,7 @@ update_subscription(TopicFilterBin, DSSubExt, SubOpts, DSSessionID) -> ok = ?tp(persistent_session_ds_iterator_updated, #{sub => DSSubExt}), NDSSubExt. --spec del_subscription(topic(), id()) -> +-spec del_subscription(topic_filter(), id()) -> ok. del_subscription(TopicFilterBin, DSSessionId) -> TopicFilter = emqx_topic:words(TopicFilterBin), @@ -522,7 +522,7 @@ storage() -> %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. -spec session_open(id()) -> - {ok, session(), #{topic() => subscription()}} | false. + {ok, session(), #{topic_filter_words() => subscription()}} | false. session_open(SessionId) -> transaction(fun() -> case mnesia:read(?SESSION_TAB, SessionId, write) of @@ -537,7 +537,7 @@ session_open(SessionId) -> end). -spec session_ensure_new(id(), _Props :: map()) -> - {ok, session(), #{topic() => subscription()}}. + {ok, session(), #{topic_filter_words() => subscription()}}. session_ensure_new(SessionId, Props) -> transaction(fun() -> ok = session_drop_subscriptions(SessionId), @@ -581,7 +581,7 @@ session_drop_subscriptions(DSSessionId) -> ). %% @doc Called when a client subscribes to a topic. Idempotent. --spec session_add_subscription(id(), topic_filter(), _Props :: map()) -> +-spec session_add_subscription(id(), topic_filter_words(), _Props :: map()) -> {ok, subscription(), _IsNew :: boolean()}. session_add_subscription(DSSessionId, TopicFilter, Props) -> DSSubId = {DSSessionId, TopicFilter}, @@ -606,7 +606,7 @@ session_add_subscription(DSSessionId, TopicFilter, Props) -> end end). --spec session_insert_subscription(id(), topic_filter(), map()) -> ds_sub(). +-spec session_insert_subscription(id(), topic_filter_words(), map()) -> ds_sub(). session_insert_subscription(DSSessionId, TopicFilter, Props) -> {DSSubId, StartMS} = new_subscription_id(DSSessionId, TopicFilter), DSSub = #ds_sub{ @@ -641,7 +641,7 @@ session_read_subscriptions(DSSessionId) -> ), mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read). --spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}. +-spec new_subscription_id(id(), topic_filter_words()) -> {subscription_id(), integer()}. new_subscription_id(DSSessionId, TopicFilter) -> %% Note: here we use _milliseconds_ to match with the timestamp %% field of `#message' record. @@ -688,7 +688,7 @@ renew_streams(DSSessionId) -> Subscriptions ). --spec renew_streams(id(), [ds_stream()], emqx_ds:topic_filter(), emqx_ds:time()) -> ok. +-spec renew_streams(id(), [ds_stream()], topic_filter_words(), emqx_ds:time()) -> ok. renew_streams(DSSessionId, ExistingStreams, TopicFilter, StartTime) -> AllStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), transaction(