chore(sessds): rename iterators -> subscriptions

Also try to make clearer the difference between 2 flavors of topic
filter representation in use.
This commit is contained in:
Andrew Mayorov 2023-11-16 16:35:46 +07:00
parent 1395f1c424
commit 648b6ac63e
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 36 additions and 36 deletions

View File

@ -83,11 +83,12 @@
%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
%% an atom, in theory (?). %% an atom, in theory (?).
-type id() :: binary(). -type id() :: binary().
-type topic_filter() :: emqx_ds:topic_filter(). -type topic_filter() :: emqx_types:topic().
-type topic_filter_words() :: emqx_ds:topic_filter().
-type subscription_id() :: {id(), topic_filter()}. -type subscription_id() :: {id(), topic_filter()}.
-type subscription() :: #{ -type subscription() :: #{
start_time := emqx_ds:time(), start_time := emqx_ds:time(),
propts := map(), props := map(),
extra := map() extra := map()
}. }.
-type session() :: #{ -type session() :: #{
@ -98,7 +99,7 @@
%% When the session should expire %% When the session should expire
expires_at := timestamp() | never, expires_at := timestamp() | never,
%% Clients Subscriptions. %% Clients Subscriptions.
iterators := #{topic() => subscription()}, subscriptions := #{topic_filter() => subscription()},
%% Inflight messages %% Inflight messages
inflight := emqx_persistent_message_ds_replayer:inflight(), inflight := emqx_persistent_message_ds_replayer:inflight(),
%% Receive maximum %% Receive maximum
@ -108,7 +109,6 @@
}. }.
-type timestamp() :: emqx_utils_calendar:epoch_millisecond(). -type timestamp() :: emqx_utils_calendar:epoch_millisecond().
-type topic() :: emqx_types:topic().
-type clientinfo() :: emqx_types:clientinfo(). -type clientinfo() :: emqx_types:clientinfo().
-type conninfo() :: emqx_session:conninfo(). -type conninfo() :: emqx_session:conninfo().
-type replies() :: emqx_session:replies(). -type replies() :: emqx_session:replies().
@ -195,9 +195,9 @@ info(created_at, #{created_at := CreatedAt}) ->
CreatedAt; CreatedAt;
info(is_persistent, #{}) -> info(is_persistent, #{}) ->
true; true;
info(subscriptions, #{iterators := Iters}) -> info(subscriptions, #{subscriptions := Iters}) ->
maps:map(fun(_, #{props := SubOpts}) -> SubOpts end, Iters); maps:map(fun(_, #{props := SubOpts}) -> SubOpts end, Iters);
info(subscriptions_cnt, #{iterators := Iters}) -> info(subscriptions_cnt, #{subscriptions := Iters}) ->
maps:size(Iters); maps:size(Iters);
info(subscriptions_max, #{props := Conf}) -> info(subscriptions_max, #{props := Conf}) ->
maps:get(max_subscriptions, Conf); maps:get(max_subscriptions, Conf);
@ -239,47 +239,47 @@ stats(Session) ->
%% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec subscribe(topic(), emqx_types:subopts(), session()) -> -spec subscribe(topic_filter(), emqx_types:subopts(), session()) ->
{ok, session()} | {error, emqx_types:reason_code()}. {ok, session()} | {error, emqx_types:reason_code()}.
subscribe( subscribe(
TopicFilter, TopicFilter,
SubOpts, SubOpts,
Session = #{id := ID, iterators := Iters} Session = #{id := ID, subscriptions := Subs}
) when is_map_key(TopicFilter, Iters) -> ) when is_map_key(TopicFilter, Subs) ->
Iterator = maps:get(TopicFilter, Iters), Subscription = maps:get(TopicFilter, Subs),
NIterator = update_subscription(TopicFilter, Iterator, SubOpts, ID), NSubscription = update_subscription(TopicFilter, Subscription, SubOpts, ID),
{ok, Session#{iterators := Iters#{TopicFilter => NIterator}}}; {ok, Session#{subscriptions := Subs#{TopicFilter => NSubscription}}};
subscribe( subscribe(
TopicFilter, TopicFilter,
SubOpts, SubOpts,
Session = #{id := ID, iterators := Iters} Session = #{id := ID, subscriptions := Subs}
) -> ) ->
% TODO: max_subscriptions % TODO: max_subscriptions
Iterator = add_subscription(TopicFilter, SubOpts, ID), Subscription = add_subscription(TopicFilter, SubOpts, ID),
{ok, Session#{iterators := Iters#{TopicFilter => Iterator}}}. {ok, Session#{subscriptions := Subs#{TopicFilter => Subscription}}}.
-spec unsubscribe(topic(), session()) -> -spec unsubscribe(topic_filter(), session()) ->
{ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}. {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}.
unsubscribe( unsubscribe(
TopicFilter, TopicFilter,
Session = #{id := ID, iterators := Iters} Session = #{id := ID, subscriptions := Subs}
) when is_map_key(TopicFilter, Iters) -> ) when is_map_key(TopicFilter, Subs) ->
Iterator = maps:get(TopicFilter, Iters), Subscription = maps:get(TopicFilter, Subs),
SubOpts = maps:get(props, Iterator), SubOpts = maps:get(props, Subscription),
ok = del_subscription(TopicFilter, ID), ok = del_subscription(TopicFilter, ID),
{ok, Session#{iterators := maps:remove(TopicFilter, Iters)}, SubOpts}; {ok, Session#{subscriptions := maps:remove(TopicFilter, Subs)}, SubOpts};
unsubscribe( unsubscribe(
_TopicFilter, _TopicFilter,
_Session = #{} _Session = #{}
) -> ) ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED}. {error, ?RC_NO_SUBSCRIPTION_EXISTED}.
-spec get_subscription(topic(), session()) -> -spec get_subscription(topic_filter(), session()) ->
emqx_types:subopts() | undefined. emqx_types:subopts() | undefined.
get_subscription(TopicFilter, #{iterators := Iters}) -> get_subscription(TopicFilter, #{subscriptions := Subs}) ->
case maps:get(TopicFilter, Iters, undefined) of case maps:get(TopicFilter, Subs, undefined) of
Iterator = #{} -> Subscription = #{} ->
maps:get(props, Iterator); maps:get(props, Subscription);
undefined -> undefined ->
undefined undefined
end. end.
@ -292,7 +292,7 @@ get_subscription(TopicFilter, #{iterators := Iters}) ->
{ok, emqx_types:publish_result(), replies(), session()} {ok, emqx_types:publish_result(), replies(), session()}
| {error, emqx_types:reason_code()}. | {error, emqx_types:reason_code()}.
publish(_PacketId, Msg, Session) -> publish(_PacketId, Msg, Session) ->
%% TODO: %% TODO: QoS2
Result = emqx_broker:publish(Msg), Result = emqx_broker:publish(Msg),
{ok, Result, [], Session}. {ok, Result, [], Session}.
@ -397,7 +397,7 @@ terminate(_Reason, _Session = #{}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec add_subscription(topic(), emqx_types:subopts(), id()) -> -spec add_subscription(topic_filter(), emqx_types:subopts(), id()) ->
subscription(). subscription().
add_subscription(TopicFilterBin, SubOpts, DSSessionID) -> add_subscription(TopicFilterBin, SubOpts, DSSessionID) ->
%% N.B.: we chose to update the router before adding the subscription to the %% N.B.: we chose to update the router before adding the subscription to the
@ -427,7 +427,7 @@ add_subscription(TopicFilterBin, SubOpts, DSSessionID) ->
%% we'll list streams and open iterators when implementing message replay. %% we'll list streams and open iterators when implementing message replay.
DSSubExt. DSSubExt.
-spec update_subscription(topic(), subscription(), emqx_types:subopts(), id()) -> -spec update_subscription(topic_filter(), subscription(), emqx_types:subopts(), id()) ->
subscription(). subscription().
update_subscription(TopicFilterBin, DSSubExt, SubOpts, DSSessionID) -> update_subscription(TopicFilterBin, DSSubExt, SubOpts, DSSessionID) ->
TopicFilter = emqx_topic:words(TopicFilterBin), TopicFilter = emqx_topic:words(TopicFilterBin),
@ -437,7 +437,7 @@ update_subscription(TopicFilterBin, DSSubExt, SubOpts, DSSessionID) ->
ok = ?tp(persistent_session_ds_iterator_updated, #{sub => DSSubExt}), ok = ?tp(persistent_session_ds_iterator_updated, #{sub => DSSubExt}),
NDSSubExt. NDSSubExt.
-spec del_subscription(topic(), id()) -> -spec del_subscription(topic_filter(), id()) ->
ok. ok.
del_subscription(TopicFilterBin, DSSessionId) -> del_subscription(TopicFilterBin, DSSessionId) ->
TopicFilter = emqx_topic:words(TopicFilterBin), TopicFilter = emqx_topic:words(TopicFilterBin),
@ -522,7 +522,7 @@ storage() ->
%% Note: session API doesn't handle session takeovers, it's the job of %% Note: session API doesn't handle session takeovers, it's the job of
%% the broker. %% the broker.
-spec session_open(id()) -> -spec session_open(id()) ->
{ok, session(), #{topic() => subscription()}} | false. {ok, session(), #{topic_filter_words() => subscription()}} | false.
session_open(SessionId) -> session_open(SessionId) ->
transaction(fun() -> transaction(fun() ->
case mnesia:read(?SESSION_TAB, SessionId, write) of case mnesia:read(?SESSION_TAB, SessionId, write) of
@ -537,7 +537,7 @@ session_open(SessionId) ->
end). end).
-spec session_ensure_new(id(), _Props :: map()) -> -spec session_ensure_new(id(), _Props :: map()) ->
{ok, session(), #{topic() => subscription()}}. {ok, session(), #{topic_filter_words() => subscription()}}.
session_ensure_new(SessionId, Props) -> session_ensure_new(SessionId, Props) ->
transaction(fun() -> transaction(fun() ->
ok = session_drop_subscriptions(SessionId), ok = session_drop_subscriptions(SessionId),
@ -581,7 +581,7 @@ session_drop_subscriptions(DSSessionId) ->
). ).
%% @doc Called when a client subscribes to a topic. Idempotent. %% @doc Called when a client subscribes to a topic. Idempotent.
-spec session_add_subscription(id(), topic_filter(), _Props :: map()) -> -spec session_add_subscription(id(), topic_filter_words(), _Props :: map()) ->
{ok, subscription(), _IsNew :: boolean()}. {ok, subscription(), _IsNew :: boolean()}.
session_add_subscription(DSSessionId, TopicFilter, Props) -> session_add_subscription(DSSessionId, TopicFilter, Props) ->
DSSubId = {DSSessionId, TopicFilter}, DSSubId = {DSSessionId, TopicFilter},
@ -606,7 +606,7 @@ session_add_subscription(DSSessionId, TopicFilter, Props) ->
end end
end). end).
-spec session_insert_subscription(id(), topic_filter(), map()) -> ds_sub(). -spec session_insert_subscription(id(), topic_filter_words(), map()) -> ds_sub().
session_insert_subscription(DSSessionId, TopicFilter, Props) -> session_insert_subscription(DSSessionId, TopicFilter, Props) ->
{DSSubId, StartMS} = new_subscription_id(DSSessionId, TopicFilter), {DSSubId, StartMS} = new_subscription_id(DSSessionId, TopicFilter),
DSSub = #ds_sub{ DSSub = #ds_sub{
@ -641,7 +641,7 @@ session_read_subscriptions(DSSessionId) ->
), ),
mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read). mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read).
-spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}. -spec new_subscription_id(id(), topic_filter_words()) -> {subscription_id(), integer()}.
new_subscription_id(DSSessionId, TopicFilter) -> new_subscription_id(DSSessionId, TopicFilter) ->
%% Note: here we use _milliseconds_ to match with the timestamp %% Note: here we use _milliseconds_ to match with the timestamp
%% field of `#message' record. %% field of `#message' record.
@ -688,7 +688,7 @@ renew_streams(DSSessionId) ->
Subscriptions Subscriptions
). ).
-spec renew_streams(id(), [ds_stream()], emqx_ds:topic_filter(), emqx_ds:time()) -> ok. -spec renew_streams(id(), [ds_stream()], topic_filter_words(), emqx_ds:time()) -> ok.
renew_streams(DSSessionId, ExistingStreams, TopicFilter, StartTime) -> renew_streams(DSSessionId, ExistingStreams, TopicFilter, StartTime) ->
AllStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), AllStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
transaction( transaction(