From 7a9916c84dd864a56927c99e0e030c514d6153f0 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Sep 2023 14:57:12 +0400 Subject: [PATCH] fix(sessds): convert ds iterator topics upon opening ds session --- apps/emqx/integration_test/emqx_ds_SUITE.erl | 2 +- apps/emqx/src/emqx_persistent_session_ds.erl | 16 +++++++++++++--- apps/emqx_durable_storage/src/emqx_ds.erl | 18 ++++++++++-------- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/apps/emqx/integration_test/emqx_ds_SUITE.erl b/apps/emqx/integration_test/emqx_ds_SUITE.erl index 4e2103c45..a35790897 100644 --- a/apps/emqx/integration_test/emqx_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_ds_SUITE.erl @@ -245,7 +245,7 @@ t_session_subscription_idempotency(Config) -> ?assertEqual([{ClientId, SubTopicFilterWords}], get_all_iterator_refs(Node1)), ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), ?assertMatch( - {_IsNew = false, #{}}, + {_IsNew = false, #{}, #{SubTopicFilterWords := #{}}}, erpc:call(Node1, emqx_ds, session_open, [ClientId, #{}]) ) end diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index b7f12dc93..6060aa4a4 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -98,7 +98,7 @@ session(). create(#{clientid := ClientID}, _ConnInfo, Conf) -> % TODO: expiration - {true, Session} = emqx_ds:session_open(ClientID, Conf), + {true, Session} = open_session(ClientID, Conf), Session. -spec open(clientinfo(), conninfo(), emqx_session:conf()) -> @@ -111,7 +111,7 @@ open(#{clientid := ClientID}, _ConnInfo, Conf) -> % somehow isolate those idling not-yet-expired sessions into a separate process % space, and move this call back into `emqx_cm` where it belongs. ok = emqx_cm:discard_session(ClientID), - {IsNew, Session} = emqx_ds:session_open(ClientID, Conf), + {IsNew, Session} = open_session(ClientID, Conf), IsPresent = not IsNew, case IsPresent of true -> @@ -120,6 +120,16 @@ open(#{clientid := ClientID}, _ConnInfo, Conf) -> {IsPresent, Session} end. +open_session(ClientID, Conf) -> + {IsNew, Session, Iterators} = emqx_ds:session_open(ClientID, Conf), + {IsNew, Session#{ + iterators => maps:fold( + fun(Topic, Iterator, Acc) -> Acc#{emqx_topic:join(Topic) => Iterator} end, + #{}, + Iterators + ) + }}. + -spec destroy(session() | clientinfo()) -> ok. destroy(#{id := ClientID}) -> emqx_ds:session_drop(ClientID); @@ -219,7 +229,7 @@ unsubscribe( ) -> {error, ?RC_NO_SUBSCRIPTION_EXISTED}. --spec get_subscription(emqx_types:topic(), session()) -> +-spec get_subscription(topic(), session()) -> emqx_types:subopts() | undefined. get_subscription(TopicFilter, #{iterators := Iters}) -> case maps:get(TopicFilter, Iters, undefined) of diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 8f8510e55..62d6369ea 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -66,10 +66,11 @@ id := emqx_ds:session_id(), created_at := _Millisecond :: non_neg_integer(), expires_at := _Millisecond :: non_neg_integer() | never, - iterators := map(), props := map() }. +-type iterators() :: #{topic() => iterator()}. + %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% an atom, in theory (?). -type session_id() :: binary(). @@ -102,7 +103,7 @@ -type replay_id() :: binary(). -type replay() :: { - _TopicFilter :: emqx_topic:words(), + _TopicFilter :: topic(), _StartTime :: time() }. @@ -151,7 +152,8 @@ message_stats() -> %% %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. --spec session_open(session_id(), _Props :: map()) -> {_New :: boolean(), session()}. +-spec session_open(session_id(), _Props :: map()) -> + {_New :: boolean(), session(), iterators()}. session_open(SessionId, Props) -> transaction(fun() -> case mnesia:read(?SESSION_TAB, SessionId, write) of @@ -159,10 +161,10 @@ session_open(SessionId, Props) -> Session = export_record(Record), IteratorRefs = session_read_iterators(SessionId), Iterators = export_iterators(IteratorRefs), - {false, Session#{iterators => Iterators}}; + {false, Session, Iterators}; [] -> Session = export_record(session_create(SessionId, Props)), - {true, Session#{iterators => #{}}} + {true, Session, #{}} end end). @@ -195,7 +197,7 @@ session_suspend(_SessionId) -> ok. %% @doc Called when a client subscribes to a topic. Idempotent. --spec session_add_iterator(session_id(), emqx_topic:words(), _Props :: map()) -> +-spec session_add_iterator(session_id(), topic(), _Props :: map()) -> {ok, iterator(), _IsNew :: boolean()}. session_add_iterator(DSSessionId, TopicFilter, Props) -> IteratorRefId = {DSSessionId, TopicFilter}, @@ -236,7 +238,7 @@ session_update_iterator(IteratorRef, Props) -> ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write), NIteratorRef. --spec session_get_iterator_id(session_id(), emqx_topic:words()) -> +-spec session_get_iterator_id(session_id(), topic()) -> {ok, iterator_id()} | {error, not_found}. session_get_iterator_id(DSSessionId, TopicFilter) -> IteratorRefId = {DSSessionId, TopicFilter}, @@ -248,7 +250,7 @@ session_get_iterator_id(DSSessionId, TopicFilter) -> end. %% @doc Called when a client unsubscribes from a topic. --spec session_del_iterator(session_id(), emqx_topic:words()) -> ok. +-spec session_del_iterator(session_id(), topic()) -> ok. session_del_iterator(DSSessionId, TopicFilter) -> IteratorRefId = {DSSessionId, TopicFilter}, transaction(fun() ->