From 5b40304d1fbe938e36234f05f4bfb749cc7c6dbc Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 20 Nov 2023 13:25:24 +0700 Subject: [PATCH] chore(sessds): simplify subscriptions handling There's currently no point in storing parsed topic filters in the subscriptions table. --- .../emqx_persistent_session_ds_SUITE.erl | 17 +---- apps/emqx/src/emqx_persistent_session_ds.erl | 69 +++++++------------ 2 files changed, 28 insertions(+), 58 deletions(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index f22a4f97e..72775228c 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -11,12 +11,6 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). --include_lib("emqx/src/emqx_persistent_session_ds.hrl"). - --define(DEFAULT_KEYSPACE, default). --define(DS_SHARD_ID, <<"local">>). --define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). - -import(emqx_common_test_helpers, [on_exit/1]). %%------------------------------------------------------------------------------ @@ -92,12 +86,6 @@ get_mqtt_port(Node, Type) -> {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), Port. -get_all_iterator_ids(Node) -> - Fn = fun(K, _V, Acc) -> [K | Acc] end, - erpc:call(Node, fun() -> - emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) - end). - wait_nodeup(Node) -> ?retry( _Sleep0 = 500, @@ -233,9 +221,8 @@ t_session_subscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - SubTopicFilterWords = emqx_topic:words(SubTopicFilter), ?assertMatch( - {ok, #{}, #{SubTopicFilterWords := #{}}}, + #{subscriptions := #{SubTopicFilter := #{}}}, erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId]) ) end @@ -308,7 +295,7 @@ t_session_unsubscription_idempotency(Config) -> fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), ?assertMatch( - {ok, #{}, Subs = #{}} when map_size(Subs) =:= 0, + #{subscriptions := Subs = #{}} when map_size(Subs) =:= 0, erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId]) ), ok diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index ca3fc3514..3a7232747 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -142,7 +142,7 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> %% somehow isolate those idling not-yet-expired sessions into a separate process %% space, and move this call back into `emqx_cm` where it belongs. ok = emqx_cm:discard_session(ClientID), - case open_session(ClientID) of + case session_open(ClientID) of Session0 = #{} -> ensure_timers(), ReceiveMaximum = receive_maximum(ConnInfo), @@ -153,24 +153,9 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> end. ensure_session(ClientID, ConnInfo, Conf) -> - {ok, Session, #{}} = session_ensure_new(ClientID, Conf), + Session = session_ensure_new(ClientID, Conf), ReceiveMaximum = receive_maximum(ConnInfo), - Session#{iterators => #{}, receive_maximum => ReceiveMaximum}. - -open_session(ClientID) -> - case session_open(ClientID) of - {ok, Session, Subscriptions} -> - Session#{iterators => prep_subscriptions(Subscriptions)}; - false -> - false - end. - -prep_subscriptions(Subscriptions) -> - maps:fold( - fun(Topic, Subscription, Acc) -> Acc#{emqx_topic:join(Topic) => Subscription} end, - #{}, - Subscriptions - ). + Session#{subscriptions => #{}, receive_maximum => ReceiveMaximum}. -spec destroy(session() | clientinfo()) -> ok. destroy(#{id := ClientID}) -> @@ -392,14 +377,13 @@ disconnect(Session = #{}) -> -spec terminate(Reason :: term(), session()) -> ok. terminate(_Reason, _Session = #{}) -> - % TODO: close iterators ok. %%-------------------------------------------------------------------- -spec add_subscription(topic_filter(), emqx_types:subopts(), id()) -> subscription(). -add_subscription(TopicFilterBin, SubOpts, DSSessionID) -> +add_subscription(TopicFilter, SubOpts, DSSessionID) -> %% N.B.: we chose to update the router before adding the subscription to the %% session/iterator table. The reasoning for this is as follows: %% @@ -418,8 +402,7 @@ add_subscription(TopicFilterBin, SubOpts, DSSessionID) -> %% since it is guarded by a transaction context: we consider a subscription %% operation to be successful if it ended up changing this table. Both router %% and iterator information can be reconstructed from this table, if needed. - ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID), - TopicFilter = emqx_topic:words(TopicFilterBin), + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, DSSessionID), {ok, DSSubExt, IsNew} = session_add_subscription( DSSessionID, TopicFilter, SubOpts ), @@ -429,8 +412,7 @@ add_subscription(TopicFilterBin, SubOpts, DSSessionID) -> -spec update_subscription(topic_filter(), subscription(), emqx_types:subopts(), id()) -> subscription(). -update_subscription(TopicFilterBin, DSSubExt, SubOpts, DSSessionID) -> - TopicFilter = emqx_topic:words(TopicFilterBin), +update_subscription(TopicFilter, DSSubExt, SubOpts, DSSessionID) -> {ok, NDSSubExt, false} = session_add_subscription( DSSessionID, TopicFilter, SubOpts ), @@ -439,8 +421,8 @@ update_subscription(TopicFilterBin, DSSubExt, SubOpts, DSSessionID) -> -spec del_subscription(topic_filter(), id()) -> ok. -del_subscription(TopicFilterBin, DSSessionId) -> - TopicFilter = emqx_topic:words(TopicFilterBin), +del_subscription(TopicFilter, DSSessionId) -> + %% TODO: transaction? ?tp_span( persistent_session_ds_subscription_delete, #{session_id => DSSessionId}, @@ -449,7 +431,7 @@ del_subscription(TopicFilterBin, DSSessionId) -> ?tp_span( persistent_session_ds_subscription_route_delete, #{session_id => DSSessionId}, - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionId) + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, DSSessionId) ). %%-------------------------------------------------------------------- @@ -522,27 +504,33 @@ storage() -> %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. -spec session_open(id()) -> - {ok, session(), #{topic_filter_words() => subscription()}} | false. + session() | false. session_open(SessionId) -> - transaction(fun() -> + ro_transaction(fun() -> case mnesia:read(?SESSION_TAB, SessionId, write) of [Record = #session{}] -> Session = export_session(Record), DSSubs = session_read_subscriptions(SessionId), Subscriptions = export_subscriptions(DSSubs), - {ok, Session, Subscriptions}; + Session#{ + subscriptions => Subscriptions, + inflight => emqx_persistent_message_ds_replayer:new() + }; [] -> false end end). -spec session_ensure_new(id(), _Props :: map()) -> - {ok, session(), #{topic_filter_words() => subscription()}}. + session(). session_ensure_new(SessionId, Props) -> transaction(fun() -> ok = session_drop_subscriptions(SessionId), Session = export_session(session_create(SessionId, Props)), - {ok, Session, #{}} + Session#{ + subscriptions => #{}, + inflight => emqx_persistent_message_ds_replayer:new() + } end). session_create(SessionId, Props) -> @@ -550,8 +538,7 @@ session_create(SessionId, Props) -> id = SessionId, created_at = erlang:system_time(millisecond), expires_at = never, - props = Props, - inflight = emqx_persistent_message_ds_replayer:new() + props = Props }, ok = mnesia:write(?SESSION_TAB, Session, write), Session. @@ -573,15 +560,14 @@ session_drop_subscriptions(DSSessionId) -> lists:foreach( fun(#ds_sub{id = DSSubId} = DSSub) -> TopicFilter = subscription_id_to_topic_filter(DSSubId), - TopicFilterBin = emqx_topic:join(TopicFilter), - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionId), + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, DSSessionId), ok = session_del_subscription(DSSub) end, Subscriptions ). %% @doc Called when a client subscribes to a topic. Idempotent. --spec session_add_subscription(id(), topic_filter_words(), _Props :: map()) -> +-spec session_add_subscription(id(), topic_filter(), _Props :: map()) -> {ok, subscription(), _IsNew :: boolean()}. session_add_subscription(DSSessionId, TopicFilter, Props) -> DSSubId = {DSSessionId, TopicFilter}, @@ -606,7 +592,7 @@ session_add_subscription(DSSessionId, TopicFilter, Props) -> end end). --spec session_insert_subscription(id(), topic_filter_words(), map()) -> ds_sub(). +-spec session_insert_subscription(id(), topic_filter(), map()) -> ds_sub(). session_insert_subscription(DSSessionId, TopicFilter, Props) -> {DSSubId, StartMS} = new_subscription_id(DSSessionId, TopicFilter), DSSub = #ds_sub{ @@ -641,7 +627,7 @@ session_read_subscriptions(DSSessionId) -> ), mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read). --spec new_subscription_id(id(), topic_filter_words()) -> {subscription_id(), integer()}. +-spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}. new_subscription_id(DSSessionId, TopicFilter) -> %% Note: here we use _milliseconds_ to match with the timestamp %% field of `#message' record. @@ -808,10 +794,7 @@ receive_maximum(ConnInfo) -> list_all_sessions() -> DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB), Sessions = lists:map( - fun(SessionID) -> - {ok, Session, Subscriptions} = session_open(SessionID), - {SessionID, #{session => Session, subscriptions => Subscriptions}} - end, + fun(SessionID) -> {SessionID, session_open(SessionID)} end, DSSessionIds ), maps:from_list(Sessions).