From 45dad2ed3afd23070887d3041a5760e67e2f464f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 10 Nov 2023 17:06:16 -0300 Subject: [PATCH] feat(ds): implement session discard Fixes https://emqx.atlassian.net/browse/EMQX-9739 Fixes some issues to ensure the session is discarded when the client connects with `clean_start = true`, and added some cleanup to subscriptions/routes/iterators/streams. > There is an API that session garbage collector can use to perform cleaning We already have `emqx_session:destroy/1`, which could serve as an API for a periodic session GC to use. --- .../emqx_persistent_session_ds_SUITE.erl | 97 ++++++++++++ apps/emqx/src/emqx_cm.erl | 6 +- .../emqx_persistent_message_ds_replayer.erl | 14 +- apps/emqx/src/emqx_persistent_session_ds.erl | 138 ++++++++++++++++-- apps/emqx/src/emqx_persistent_session_ds.hrl | 3 +- apps/emqx/src/emqx_session.erl | 10 +- apps/emqx/src/emqx_session_mem.erl | 2 + .../test/emqx_persistent_session_SUITE.erl | 1 + 8 files changed, 251 insertions(+), 20 deletions(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index ee5d203e4..f22a4f97e 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -11,6 +11,8 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/src/emqx_persistent_session_ds.hrl"). + -define(DEFAULT_KEYSPACE, default). -define(DS_SHARD_ID, <<"local">>). -define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). @@ -118,6 +120,7 @@ start_client(Opts0 = #{}) -> properties => #{'Session-Expiry-Interval' => 300} }, Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)), + ct:pal("starting client with opts:\n ~p", [Opts]), {ok, Client} = emqtt:start_link(Opts), on_exit(fun() -> catch emqtt:stop(Client) end), Client. @@ -148,6 +151,9 @@ restart_node(Node, NodeSpec) -> ?tp(restarted_node, #{}), ok. +is_persistent_connect_opts(#{properties := #{'Session-Expiry-Interval' := EI}}) -> + EI > 0. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -309,3 +315,94 @@ t_session_unsubscription_idempotency(Config) -> end ), ok. + +t_session_discard_persistent_to_non_persistent(_Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Params = #{ + client_id => ClientId, + reconnect_opts => + #{ + clean_start => true, + %% we set it to zero so that a new session is not created. + properties => #{'Session-Expiry-Interval' => 0}, + proto_ver => v5 + } + }, + do_t_session_discard(Params). + +t_session_discard_persistent_to_persistent(_Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + Params = #{ + client_id => ClientId, + reconnect_opts => + #{ + clean_start => true, + properties => #{'Session-Expiry-Interval' => 30}, + proto_ver => v5 + } + }, + do_t_session_discard(Params). + +do_t_session_discard(Params) -> + #{ + client_id := ClientId, + reconnect_opts := ReconnectOpts0 + } = Params, + ReconnectOpts = ReconnectOpts0#{clientid => ClientId}, + SubTopicFilter = <<"t/+">>, + ?check_trace( + begin + ?tp(notice, "starting", #{}), + Client0 = start_client(#{ + clientid => ClientId, + clean_start => false, + properties => #{'Session-Expiry-Interval' => 30}, + proto_ver => v5 + }), + {ok, _} = emqtt:connect(Client0), + ?tp(notice, "subscribing", #{}), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2), + %% Store some matching messages so that streams and iterators are created. + ok = emqtt:publish(Client0, <<"t/1">>, <<"1">>), + ok = emqtt:publish(Client0, <<"t/2">>, <<"2">>), + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + true = map_size(emqx_persistent_session_ds:list_all_streams()) > 0 + ), + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + true = map_size(emqx_persistent_session_ds:list_all_iterators()) > 0 + ), + ok = emqtt:stop(Client0), + ?tp(notice, "disconnected", #{}), + + ?tp(notice, "reconnecting", #{}), + %% we still have iterators and streams + ?assert(map_size(emqx_persistent_session_ds:list_all_streams()) > 0), + ?assert(map_size(emqx_persistent_session_ds:list_all_iterators()) > 0), + Client1 = start_client(ReconnectOpts), + {ok, _} = emqtt:connect(Client1), + ?assertEqual([], emqtt:subscriptions(Client1)), + case is_persistent_connect_opts(ReconnectOpts) of + true -> + ?assertMatch(#{ClientId := _}, emqx_persistent_session_ds:list_all_sessions()); + false -> + ?assertEqual(#{}, emqx_persistent_session_ds:list_all_sessions()) + end, + ?assertEqual(#{}, emqx_persistent_session_ds:list_all_subscriptions()), + ?assertEqual([], emqx_persistent_session_ds_router:topics()), + ?assertEqual(#{}, emqx_persistent_session_ds:list_all_streams()), + ?assertEqual(#{}, emqx_persistent_session_ds:list_all_iterators()), + ok = emqtt:stop(Client1), + ?tp(notice, "disconnected", #{}), + + ok + end, + fun(Trace) -> + ct:pal("trace:\n ~p", [Trace]), + ok + end + ), + ok. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 1e4940965..537c60876 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -258,21 +258,21 @@ set_chan_stats(ClientId, ChanPid, Stats) -> end. %% @doc Open a session. --spec open_session(boolean(), emqx_types:clientinfo(), emqx_types:conninfo()) -> +-spec open_session(_CleanStart :: boolean(), emqx_types:clientinfo(), emqx_types:conninfo()) -> {ok, #{ session := emqx_session:t(), present := boolean(), replay => _ReplayContext }} | {error, Reason :: term()}. -open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> +open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), emqx_cm_locker:trans(ClientId, fun(_) -> ok = discard_session(ClientId), ok = emqx_session:destroy(ClientInfo, ConnInfo), create_register_session(ClientInfo, ConnInfo, Self) end); -open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> +open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), emqx_cm_locker:trans(ClientId, fun(_) -> case emqx_session:open(ClientInfo, ConnInfo) of diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 156aa943e..98bb069b0 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -188,8 +188,12 @@ fetch(SessionId, Inflight0, [Stream | Streams], N, Publishes0) -> end. -spec update_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream(), emqx_ds:iterator()) -> ok. -update_iterator(SessionId, Stream, Iterator) -> - mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {SessionId, Stream}, iter = Iterator}). +update_iterator(DSSessionId, Stream, Iterator) -> + %% Workaround: we convert `Stream' to a binary before attempting to store it in + %% mnesia(rocksdb) because of a bug in `mnesia_rocksdb' when trying to do + %% `mnesia:dirty_all_keys' later. + StreamBin = term_to_binary(Stream), + mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {DSSessionId, StreamBin}, iter = Iterator}). get_last_iterator(SessionId, Stream, Ranges) -> case lists:keyfind(Stream, #range.stream, lists:reverse(Ranges)) of @@ -200,8 +204,10 @@ get_last_iterator(SessionId, Stream, Ranges) -> end. -spec get_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream()) -> emqx_ds:iterator(). -get_iterator(SessionId, Stream) -> - Id = {SessionId, Stream}, +get_iterator(DSSessionId, Stream) -> + %% See comment in `update_iterator'. + StreamBin = term_to_binary(Stream), + Id = {DSSessionId, StreamBin}, [#ds_iter{iter = It}] = mnesia:dirty_read(?SESSION_ITER_TAB, Id), It. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 52c98c7d4..bc60a1277 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -16,6 +16,8 @@ -module(emqx_persistent_session_ds). +-behaviour(emqx_session). + -include("emqx.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). @@ -69,7 +71,13 @@ ]). -ifdef(TEST). --export([session_open/1]). +-export([ + session_open/1, + list_all_sessions/0, + list_all_subscriptions/0, + list_all_streams/0, + list_all_iterators/0 +]). -endif. %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be @@ -537,14 +545,24 @@ session_create(SessionId, Props) -> -spec session_drop(id()) -> ok. session_drop(DSSessionId) -> transaction(fun() -> - %% TODO: ensure all iterators from this clientid are closed? ok = session_drop_subscriptions(DSSessionId), + ok = session_drop_iterators(DSSessionId), + ok = session_drop_streams(DSSessionId), ok = mnesia:delete(?SESSION_TAB, DSSessionId, write) end). +-spec session_drop_subscriptions(id()) -> ok. session_drop_subscriptions(DSSessionId) -> - IteratorRefs = session_read_subscriptions(DSSessionId), - ok = lists:foreach(fun session_del_subscription/1, IteratorRefs). + Subscriptions = session_read_subscriptions(DSSessionId), + lists:foreach( + fun(#ds_sub{id = DSSubId} = DSSub) -> + TopicFilter = subscription_id_to_topic_filter(DSSubId), + TopicFilterBin = emqx_topic:join(TopicFilter), + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionId), + ok = session_del_subscription(DSSub) + end, + Subscriptions + ). %% @doc Called when a client subscribes to a topic. Idempotent. -spec session_add_subscription(id(), topic_filter(), _Props :: map()) -> @@ -615,6 +633,10 @@ new_subscription_id(DSSessionId, TopicFilter) -> DSSubId = {DSSessionId, TopicFilter}, {DSSubId, NowMS}. +-spec subscription_id_to_topic_filter(subscription_id()) -> topic_filter(). +subscription_id_to_topic_filter({_DSSessionId, TopicFilter}) -> + TopicFilter. + %%-------------------------------------------------------------------- %% RPC targets (v1) %%-------------------------------------------------------------------- @@ -639,24 +661,26 @@ do_ensure_all_iterators_closed(_DSSessionID) -> %% Reading batches %%-------------------------------------------------------------------- -renew_streams(Id) -> - Subscriptions = ro_transaction(fun() -> session_read_subscriptions(Id) end), - ExistingStreams = ro_transaction(fun() -> mnesia:read(?SESSION_STREAM_TAB, Id) end), +-spec renew_streams(id()) -> ok. +renew_streams(DSSessionId) -> + Subscriptions = ro_transaction(fun() -> session_read_subscriptions(DSSessionId) end), + ExistingStreams = ro_transaction(fun() -> mnesia:read(?SESSION_STREAM_TAB, DSSessionId) end), lists:foreach( fun(#ds_sub{id = {_, TopicFilter}, start_time = StartTime}) -> - renew_streams(Id, ExistingStreams, TopicFilter, StartTime) + renew_streams(DSSessionId, ExistingStreams, TopicFilter, StartTime) end, Subscriptions ). -renew_streams(Id, ExistingStreams, TopicFilter, StartTime) -> +-spec renew_streams(id(), [ds_stream()], emqx_ds:topic_filter(), emqx_ds:time()) -> ok. +renew_streams(DSSessionId, ExistingStreams, TopicFilter, StartTime) -> AllStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), transaction( fun() -> lists:foreach( fun({Rank, Stream}) -> Rec = #ds_stream{ - session = Id, + session = DSSessionId, topic_filter = TopicFilter, stream = Stream, rank = Rank @@ -669,7 +693,12 @@ renew_streams(Id, ExistingStreams, TopicFilter, StartTime) -> {ok, Iterator} = emqx_ds:make_iterator( ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ), - IterRec = #ds_iter{id = {Id, Stream}, iter = Iterator}, + %% Workaround: we convert `Stream' to a binary before + %% attempting to store it in mnesia(rocksdb) because of a bug + %% in `mnesia_rocksdb' when trying to do + %% `mnesia:dirty_all_keys' later. + StreamBin = term_to_binary(Stream), + IterRec = #ds_iter{id = {DSSessionId, StreamBin}, iter = Iterator}, mnesia:write(?SESSION_ITER_TAB, IterRec, write) end end, @@ -678,6 +707,33 @@ renew_streams(Id, ExistingStreams, TopicFilter, StartTime) -> end ). +%% must be called inside a transaction +-spec session_drop_streams(id()) -> ok. +session_drop_streams(DSSessionId) -> + MS = ets:fun2ms( + fun(#ds_stream{session = DSSessionId0}) when DSSessionId0 =:= DSSessionId -> + DSSessionId0 + end + ), + StreamIDs = mnesia:select(?SESSION_STREAM_TAB, MS, write), + lists:foreach(fun(Key) -> mnesia:delete(?SESSION_STREAM_TAB, Key, write) end, StreamIDs). + +%% must be called inside a transaction +-spec session_drop_iterators(id()) -> ok. +session_drop_iterators(DSSessionId) -> + MS = ets:fun2ms( + fun(#ds_iter{id = {DSSessionId0, StreamBin}}) when DSSessionId0 =:= DSSessionId -> + StreamBin + end + ), + StreamBins = mnesia:select(?SESSION_ITER_TAB, MS, write), + lists:foreach( + fun(StreamBin) -> + mnesia:delete(?SESSION_ITER_TAB, {DSSessionId, StreamBin}, write) + end, + StreamBins + ). + %%-------------------------------------------------------------------------------- transaction(Fun) -> @@ -724,3 +780,63 @@ ensure_timer(Type) -> ensure_timer(Type, Timeout) -> _ = emqx_utils:start_timer(Timeout, {emqx_session, Type}), ok. + +-ifdef(TEST). +list_all_sessions() -> + DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB), + Sessions = lists:map( + fun(SessionID) -> + {ok, Session, Subscriptions} = session_open(SessionID), + {SessionID, #{session => Session, subscriptions => Subscriptions}} + end, + DSSessionIds + ), + maps:from_list(Sessions). + +list_all_subscriptions() -> + DSSubIds = mnesia:dirty_all_keys(?SESSION_SUBSCRIPTIONS_TAB), + Subscriptions = lists:map( + fun(DSSubId) -> + [DSSub] = mnesia:dirty_read(?SESSION_SUBSCRIPTIONS_TAB, DSSubId), + {DSSubId, export_subscription(DSSub)} + end, + DSSubIds + ), + maps:from_list(Subscriptions). + +list_all_streams() -> + DSStreamIds = mnesia:dirty_all_keys(?SESSION_STREAM_TAB), + DSStreams = lists:map( + fun(DSStreamId) -> + Records = mnesia:dirty_read(?SESSION_STREAM_TAB, DSStreamId), + ExtDSStreams = + lists:map( + fun(Record) -> + export_record( + Record, + #ds_stream.session, + [session, topic_filter, stream, rank], + #{} + ) + end, + Records + ), + {DSStreamId, ExtDSStreams} + end, + DSStreamIds + ), + maps:from_list(DSStreams). + +list_all_iterators() -> + DSIterIds = mnesia:dirty_all_keys(?SESSION_ITER_TAB), + DSIters = lists:map( + fun(DSIterId) -> + [Record] = mnesia:dirty_read(?SESSION_ITER_TAB, DSIterId), + {DSIterId, export_record(Record, #ds_iter.id, [id, iter], #{})} + end, + DSIterIds + ), + maps:from_list(DSIters). + +%% ifdef(TEST) +-endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 666874608..cc995ce66 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -39,9 +39,10 @@ rank :: emqx_ds:stream_rank() }). -type ds_stream() :: #ds_stream{}. +-type ds_stream_bin() :: binary(). -record(ds_iter, { - id :: {emqx_persistent_session_ds:id(), emqx_ds:stream()}, + id :: {emqx_persistent_session_ds:id(), ds_stream_bin()}, iter :: emqx_ds:iterator() }). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 8bdd47392..52342d7ee 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -176,6 +176,7 @@ t(). -callback open(clientinfo(), conninfo()) -> {_IsPresent :: true, t(), _ReplayContext} | false. +-callback destroy(t() | clientinfo()) -> ok. %%-------------------------------------------------------------------- %% Create a Session @@ -247,7 +248,14 @@ get_mqtt_conf(Zone, Key) -> -spec destroy(clientinfo(), conninfo()) -> ok. destroy(ClientInfo, ConnInfo) -> - (choose_impl_mod(ConnInfo)):destroy(ClientInfo). + %% When destroying/discarding a session, the current `ClientInfo' might suggest an + %% implementation which does not correspond to the one previously used by this client. + %% An example of this is a client that first connects with `Session-Expiry-Interval' > + %% 0, and later reconnects with `Session-Expiry-Interval' = 0 and `clean_start' = + %% true. So we may simply destroy sessions from all implementations, since the key + %% (ClientID) is the same. + Mods = choose_impl_candidates(ConnInfo), + lists:foreach(fun(Mod) -> Mod:destroy(ClientInfo) end, Mods). -spec destroy(t()) -> ok. destroy(Session) -> diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index e72feffd5..3ea4f9f3b 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -44,6 +44,8 @@ %% State is stored in-memory in the process heap. -module(emqx_session_mem). +-behaviour(emqx_session). + -include("emqx.hrl"). -include("emqx_mqtt.hrl"). -include("emqx_session_mem.hrl"). diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 0f8929e23..bd7ca1c46 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -599,6 +599,7 @@ t_publish_while_client_is_gone(Config) -> ok = emqtt:disconnect(Client2). +%% TODO: don't skip after QoS2 support is added to DS. t_clean_start_drops_subscriptions(init, Config) -> skip_ds_tc(Config); t_clean_start_drops_subscriptions('end', _Config) -> ok. t_clean_start_drops_subscriptions(Config) ->