fix(sessds): convert ds iterator topics upon opening ds session

This commit is contained in:
Andrew Mayorov 2023-09-19 14:57:12 +04:00
parent 045d8b7f10
commit 7a9916c84d
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 24 additions and 12 deletions

View File

@ -245,7 +245,7 @@ t_session_subscription_idempotency(Config) ->
?assertEqual([{ClientId, SubTopicFilterWords}], get_all_iterator_refs(Node1)), ?assertEqual([{ClientId, SubTopicFilterWords}], get_all_iterator_refs(Node1)),
?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
?assertMatch( ?assertMatch(
{_IsNew = false, #{}}, {_IsNew = false, #{}, #{SubTopicFilterWords := #{}}},
erpc:call(Node1, emqx_ds, session_open, [ClientId, #{}]) erpc:call(Node1, emqx_ds, session_open, [ClientId, #{}])
) )
end end

View File

@ -98,7 +98,7 @@
session(). session().
create(#{clientid := ClientID}, _ConnInfo, Conf) -> create(#{clientid := ClientID}, _ConnInfo, Conf) ->
% TODO: expiration % TODO: expiration
{true, Session} = emqx_ds:session_open(ClientID, Conf), {true, Session} = open_session(ClientID, Conf),
Session. Session.
-spec open(clientinfo(), conninfo(), emqx_session:conf()) -> -spec open(clientinfo(), conninfo(), emqx_session:conf()) ->
@ -111,7 +111,7 @@ open(#{clientid := ClientID}, _ConnInfo, Conf) ->
% somehow isolate those idling not-yet-expired sessions into a separate process % somehow isolate those idling not-yet-expired sessions into a separate process
% space, and move this call back into `emqx_cm` where it belongs. % space, and move this call back into `emqx_cm` where it belongs.
ok = emqx_cm:discard_session(ClientID), ok = emqx_cm:discard_session(ClientID),
{IsNew, Session} = emqx_ds:session_open(ClientID, Conf), {IsNew, Session} = open_session(ClientID, Conf),
IsPresent = not IsNew, IsPresent = not IsNew,
case IsPresent of case IsPresent of
true -> true ->
@ -120,6 +120,16 @@ open(#{clientid := ClientID}, _ConnInfo, Conf) ->
{IsPresent, Session} {IsPresent, Session}
end. end.
open_session(ClientID, Conf) ->
{IsNew, Session, Iterators} = emqx_ds:session_open(ClientID, Conf),
{IsNew, Session#{
iterators => maps:fold(
fun(Topic, Iterator, Acc) -> Acc#{emqx_topic:join(Topic) => Iterator} end,
#{},
Iterators
)
}}.
-spec destroy(session() | clientinfo()) -> ok. -spec destroy(session() | clientinfo()) -> ok.
destroy(#{id := ClientID}) -> destroy(#{id := ClientID}) ->
emqx_ds:session_drop(ClientID); emqx_ds:session_drop(ClientID);
@ -219,7 +229,7 @@ unsubscribe(
) -> ) ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED}. {error, ?RC_NO_SUBSCRIPTION_EXISTED}.
-spec get_subscription(emqx_types:topic(), session()) -> -spec get_subscription(topic(), session()) ->
emqx_types:subopts() | undefined. emqx_types:subopts() | undefined.
get_subscription(TopicFilter, #{iterators := Iters}) -> get_subscription(TopicFilter, #{iterators := Iters}) ->
case maps:get(TopicFilter, Iters, undefined) of case maps:get(TopicFilter, Iters, undefined) of

View File

@ -66,10 +66,11 @@
id := emqx_ds:session_id(), id := emqx_ds:session_id(),
created_at := _Millisecond :: non_neg_integer(), created_at := _Millisecond :: non_neg_integer(),
expires_at := _Millisecond :: non_neg_integer() | never, expires_at := _Millisecond :: non_neg_integer() | never,
iterators := map(),
props := map() props := map()
}. }.
-type iterators() :: #{topic() => iterator()}.
%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
%% an atom, in theory (?). %% an atom, in theory (?).
-type session_id() :: binary(). -type session_id() :: binary().
@ -102,7 +103,7 @@
-type replay_id() :: binary(). -type replay_id() :: binary().
-type replay() :: { -type replay() :: {
_TopicFilter :: emqx_topic:words(), _TopicFilter :: topic(),
_StartTime :: time() _StartTime :: time()
}. }.
@ -151,7 +152,8 @@ message_stats() ->
%% %%
%% Note: session API doesn't handle session takeovers, it's the job of %% Note: session API doesn't handle session takeovers, it's the job of
%% the broker. %% the broker.
-spec session_open(session_id(), _Props :: map()) -> {_New :: boolean(), session()}. -spec session_open(session_id(), _Props :: map()) ->
{_New :: boolean(), session(), iterators()}.
session_open(SessionId, Props) -> session_open(SessionId, Props) ->
transaction(fun() -> transaction(fun() ->
case mnesia:read(?SESSION_TAB, SessionId, write) of case mnesia:read(?SESSION_TAB, SessionId, write) of
@ -159,10 +161,10 @@ session_open(SessionId, Props) ->
Session = export_record(Record), Session = export_record(Record),
IteratorRefs = session_read_iterators(SessionId), IteratorRefs = session_read_iterators(SessionId),
Iterators = export_iterators(IteratorRefs), Iterators = export_iterators(IteratorRefs),
{false, Session#{iterators => Iterators}}; {false, Session, Iterators};
[] -> [] ->
Session = export_record(session_create(SessionId, Props)), Session = export_record(session_create(SessionId, Props)),
{true, Session#{iterators => #{}}} {true, Session, #{}}
end end
end). end).
@ -195,7 +197,7 @@ session_suspend(_SessionId) ->
ok. ok.
%% @doc Called when a client subscribes to a topic. Idempotent. %% @doc Called when a client subscribes to a topic. Idempotent.
-spec session_add_iterator(session_id(), emqx_topic:words(), _Props :: map()) -> -spec session_add_iterator(session_id(), topic(), _Props :: map()) ->
{ok, iterator(), _IsNew :: boolean()}. {ok, iterator(), _IsNew :: boolean()}.
session_add_iterator(DSSessionId, TopicFilter, Props) -> session_add_iterator(DSSessionId, TopicFilter, Props) ->
IteratorRefId = {DSSessionId, TopicFilter}, IteratorRefId = {DSSessionId, TopicFilter},
@ -236,7 +238,7 @@ session_update_iterator(IteratorRef, Props) ->
ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write), ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write),
NIteratorRef. NIteratorRef.
-spec session_get_iterator_id(session_id(), emqx_topic:words()) -> -spec session_get_iterator_id(session_id(), topic()) ->
{ok, iterator_id()} | {error, not_found}. {ok, iterator_id()} | {error, not_found}.
session_get_iterator_id(DSSessionId, TopicFilter) -> session_get_iterator_id(DSSessionId, TopicFilter) ->
IteratorRefId = {DSSessionId, TopicFilter}, IteratorRefId = {DSSessionId, TopicFilter},
@ -248,7 +250,7 @@ session_get_iterator_id(DSSessionId, TopicFilter) ->
end. end.
%% @doc Called when a client unsubscribes from a topic. %% @doc Called when a client unsubscribes from a topic.
-spec session_del_iterator(session_id(), emqx_topic:words()) -> ok. -spec session_del_iterator(session_id(), topic()) -> ok.
session_del_iterator(DSSessionId, TopicFilter) -> session_del_iterator(DSSessionId, TopicFilter) ->
IteratorRefId = {DSSessionId, TopicFilter}, IteratorRefId = {DSSessionId, TopicFilter},
transaction(fun() -> transaction(fun() ->