feat(ds): implement session discard

Fixes https://emqx.atlassian.net/browse/EMQX-9739

Fixes some issues to ensure the session is discarded when the client connects with
`clean_start = true`, and added some cleanup to subscriptions/routes/iterators/streams.

> There is an API that session garbage collector can use to perform cleaning

We already have `emqx_session:destroy/1`, which could serve as an API for a periodic
session GC to use.
This commit is contained in:
Thales Macedo Garitezi 2023-11-10 17:06:16 -03:00
parent 3537d688bc
commit 45dad2ed3a
8 changed files with 251 additions and 20 deletions

View File

@ -11,6 +11,8 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/src/emqx_persistent_session_ds.hrl").
-define(DEFAULT_KEYSPACE, default).
-define(DS_SHARD_ID, <<"local">>).
-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
@ -118,6 +120,7 @@ start_client(Opts0 = #{}) ->
properties => #{'Session-Expiry-Interval' => 300}
},
Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)),
ct:pal("starting client with opts:\n ~p", [Opts]),
{ok, Client} = emqtt:start_link(Opts),
on_exit(fun() -> catch emqtt:stop(Client) end),
Client.
@ -148,6 +151,9 @@ restart_node(Node, NodeSpec) ->
?tp(restarted_node, #{}),
ok.
is_persistent_connect_opts(#{properties := #{'Session-Expiry-Interval' := EI}}) ->
EI > 0.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -309,3 +315,94 @@ t_session_unsubscription_idempotency(Config) ->
end
),
ok.
t_session_discard_persistent_to_non_persistent(_Config) ->
ClientId = atom_to_binary(?FUNCTION_NAME),
Params = #{
client_id => ClientId,
reconnect_opts =>
#{
clean_start => true,
%% we set it to zero so that a new session is not created.
properties => #{'Session-Expiry-Interval' => 0},
proto_ver => v5
}
},
do_t_session_discard(Params).
t_session_discard_persistent_to_persistent(_Config) ->
ClientId = atom_to_binary(?FUNCTION_NAME),
Params = #{
client_id => ClientId,
reconnect_opts =>
#{
clean_start => true,
properties => #{'Session-Expiry-Interval' => 30},
proto_ver => v5
}
},
do_t_session_discard(Params).
do_t_session_discard(Params) ->
#{
client_id := ClientId,
reconnect_opts := ReconnectOpts0
} = Params,
ReconnectOpts = ReconnectOpts0#{clientid => ClientId},
SubTopicFilter = <<"t/+">>,
?check_trace(
begin
?tp(notice, "starting", #{}),
Client0 = start_client(#{
clientid => ClientId,
clean_start => false,
properties => #{'Session-Expiry-Interval' => 30},
proto_ver => v5
}),
{ok, _} = emqtt:connect(Client0),
?tp(notice, "subscribing", #{}),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2),
%% Store some matching messages so that streams and iterators are created.
ok = emqtt:publish(Client0, <<"t/1">>, <<"1">>),
ok = emqtt:publish(Client0, <<"t/2">>, <<"2">>),
?retry(
_Sleep0 = 100,
_Attempts0 = 50,
true = map_size(emqx_persistent_session_ds:list_all_streams()) > 0
),
?retry(
_Sleep0 = 100,
_Attempts0 = 50,
true = map_size(emqx_persistent_session_ds:list_all_iterators()) > 0
),
ok = emqtt:stop(Client0),
?tp(notice, "disconnected", #{}),
?tp(notice, "reconnecting", #{}),
%% we still have iterators and streams
?assert(map_size(emqx_persistent_session_ds:list_all_streams()) > 0),
?assert(map_size(emqx_persistent_session_ds:list_all_iterators()) > 0),
Client1 = start_client(ReconnectOpts),
{ok, _} = emqtt:connect(Client1),
?assertEqual([], emqtt:subscriptions(Client1)),
case is_persistent_connect_opts(ReconnectOpts) of
true ->
?assertMatch(#{ClientId := _}, emqx_persistent_session_ds:list_all_sessions());
false ->
?assertEqual(#{}, emqx_persistent_session_ds:list_all_sessions())
end,
?assertEqual(#{}, emqx_persistent_session_ds:list_all_subscriptions()),
?assertEqual([], emqx_persistent_session_ds_router:topics()),
?assertEqual(#{}, emqx_persistent_session_ds:list_all_streams()),
?assertEqual(#{}, emqx_persistent_session_ds:list_all_iterators()),
ok = emqtt:stop(Client1),
?tp(notice, "disconnected", #{}),
ok
end,
fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
ok
end
),
ok.

View File

@ -258,21 +258,21 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
end.
%% @doc Open a session.
-spec open_session(boolean(), emqx_types:clientinfo(), emqx_types:conninfo()) ->
-spec open_session(_CleanStart :: boolean(), emqx_types:clientinfo(), emqx_types:conninfo()) ->
{ok, #{
session := emqx_session:t(),
present := boolean(),
replay => _ReplayContext
}}
| {error, Reason :: term()}.
open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
emqx_cm_locker:trans(ClientId, fun(_) ->
ok = discard_session(ClientId),
ok = emqx_session:destroy(ClientInfo, ConnInfo),
create_register_session(ClientInfo, ConnInfo, Self)
end);
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
emqx_cm_locker:trans(ClientId, fun(_) ->
case emqx_session:open(ClientInfo, ConnInfo) of

View File

@ -188,8 +188,12 @@ fetch(SessionId, Inflight0, [Stream | Streams], N, Publishes0) ->
end.
-spec update_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream(), emqx_ds:iterator()) -> ok.
update_iterator(SessionId, Stream, Iterator) ->
mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {SessionId, Stream}, iter = Iterator}).
update_iterator(DSSessionId, Stream, Iterator) ->
%% Workaround: we convert `Stream' to a binary before attempting to store it in
%% mnesia(rocksdb) because of a bug in `mnesia_rocksdb' when trying to do
%% `mnesia:dirty_all_keys' later.
StreamBin = term_to_binary(Stream),
mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {DSSessionId, StreamBin}, iter = Iterator}).
get_last_iterator(SessionId, Stream, Ranges) ->
case lists:keyfind(Stream, #range.stream, lists:reverse(Ranges)) of
@ -200,8 +204,10 @@ get_last_iterator(SessionId, Stream, Ranges) ->
end.
-spec get_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream()) -> emqx_ds:iterator().
get_iterator(SessionId, Stream) ->
Id = {SessionId, Stream},
get_iterator(DSSessionId, Stream) ->
%% See comment in `update_iterator'.
StreamBin = term_to_binary(Stream),
Id = {DSSessionId, StreamBin},
[#ds_iter{iter = It}] = mnesia:dirty_read(?SESSION_ITER_TAB, Id),
It.

View File

@ -16,6 +16,8 @@
-module(emqx_persistent_session_ds).
-behaviour(emqx_session).
-include("emqx.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
@ -69,7 +71,13 @@
]).
-ifdef(TEST).
-export([session_open/1]).
-export([
session_open/1,
list_all_sessions/0,
list_all_subscriptions/0,
list_all_streams/0,
list_all_iterators/0
]).
-endif.
%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
@ -537,14 +545,24 @@ session_create(SessionId, Props) ->
-spec session_drop(id()) -> ok.
session_drop(DSSessionId) ->
transaction(fun() ->
%% TODO: ensure all iterators from this clientid are closed?
ok = session_drop_subscriptions(DSSessionId),
ok = session_drop_iterators(DSSessionId),
ok = session_drop_streams(DSSessionId),
ok = mnesia:delete(?SESSION_TAB, DSSessionId, write)
end).
-spec session_drop_subscriptions(id()) -> ok.
session_drop_subscriptions(DSSessionId) ->
IteratorRefs = session_read_subscriptions(DSSessionId),
ok = lists:foreach(fun session_del_subscription/1, IteratorRefs).
Subscriptions = session_read_subscriptions(DSSessionId),
lists:foreach(
fun(#ds_sub{id = DSSubId} = DSSub) ->
TopicFilter = subscription_id_to_topic_filter(DSSubId),
TopicFilterBin = emqx_topic:join(TopicFilter),
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionId),
ok = session_del_subscription(DSSub)
end,
Subscriptions
).
%% @doc Called when a client subscribes to a topic. Idempotent.
-spec session_add_subscription(id(), topic_filter(), _Props :: map()) ->
@ -615,6 +633,10 @@ new_subscription_id(DSSessionId, TopicFilter) ->
DSSubId = {DSSessionId, TopicFilter},
{DSSubId, NowMS}.
-spec subscription_id_to_topic_filter(subscription_id()) -> topic_filter().
subscription_id_to_topic_filter({_DSSessionId, TopicFilter}) ->
TopicFilter.
%%--------------------------------------------------------------------
%% RPC targets (v1)
%%--------------------------------------------------------------------
@ -639,24 +661,26 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
%% Reading batches
%%--------------------------------------------------------------------
renew_streams(Id) ->
Subscriptions = ro_transaction(fun() -> session_read_subscriptions(Id) end),
ExistingStreams = ro_transaction(fun() -> mnesia:read(?SESSION_STREAM_TAB, Id) end),
-spec renew_streams(id()) -> ok.
renew_streams(DSSessionId) ->
Subscriptions = ro_transaction(fun() -> session_read_subscriptions(DSSessionId) end),
ExistingStreams = ro_transaction(fun() -> mnesia:read(?SESSION_STREAM_TAB, DSSessionId) end),
lists:foreach(
fun(#ds_sub{id = {_, TopicFilter}, start_time = StartTime}) ->
renew_streams(Id, ExistingStreams, TopicFilter, StartTime)
renew_streams(DSSessionId, ExistingStreams, TopicFilter, StartTime)
end,
Subscriptions
).
renew_streams(Id, ExistingStreams, TopicFilter, StartTime) ->
-spec renew_streams(id(), [ds_stream()], emqx_ds:topic_filter(), emqx_ds:time()) -> ok.
renew_streams(DSSessionId, ExistingStreams, TopicFilter, StartTime) ->
AllStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
transaction(
fun() ->
lists:foreach(
fun({Rank, Stream}) ->
Rec = #ds_stream{
session = Id,
session = DSSessionId,
topic_filter = TopicFilter,
stream = Stream,
rank = Rank
@ -669,7 +693,12 @@ renew_streams(Id, ExistingStreams, TopicFilter, StartTime) ->
{ok, Iterator} = emqx_ds:make_iterator(
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
),
IterRec = #ds_iter{id = {Id, Stream}, iter = Iterator},
%% Workaround: we convert `Stream' to a binary before
%% attempting to store it in mnesia(rocksdb) because of a bug
%% in `mnesia_rocksdb' when trying to do
%% `mnesia:dirty_all_keys' later.
StreamBin = term_to_binary(Stream),
IterRec = #ds_iter{id = {DSSessionId, StreamBin}, iter = Iterator},
mnesia:write(?SESSION_ITER_TAB, IterRec, write)
end
end,
@ -678,6 +707,33 @@ renew_streams(Id, ExistingStreams, TopicFilter, StartTime) ->
end
).
%% must be called inside a transaction
-spec session_drop_streams(id()) -> ok.
session_drop_streams(DSSessionId) ->
MS = ets:fun2ms(
fun(#ds_stream{session = DSSessionId0}) when DSSessionId0 =:= DSSessionId ->
DSSessionId0
end
),
StreamIDs = mnesia:select(?SESSION_STREAM_TAB, MS, write),
lists:foreach(fun(Key) -> mnesia:delete(?SESSION_STREAM_TAB, Key, write) end, StreamIDs).
%% must be called inside a transaction
-spec session_drop_iterators(id()) -> ok.
session_drop_iterators(DSSessionId) ->
MS = ets:fun2ms(
fun(#ds_iter{id = {DSSessionId0, StreamBin}}) when DSSessionId0 =:= DSSessionId ->
StreamBin
end
),
StreamBins = mnesia:select(?SESSION_ITER_TAB, MS, write),
lists:foreach(
fun(StreamBin) ->
mnesia:delete(?SESSION_ITER_TAB, {DSSessionId, StreamBin}, write)
end,
StreamBins
).
%%--------------------------------------------------------------------------------
transaction(Fun) ->
@ -724,3 +780,63 @@ ensure_timer(Type) ->
ensure_timer(Type, Timeout) ->
_ = emqx_utils:start_timer(Timeout, {emqx_session, Type}),
ok.
-ifdef(TEST).
list_all_sessions() ->
DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB),
Sessions = lists:map(
fun(SessionID) ->
{ok, Session, Subscriptions} = session_open(SessionID),
{SessionID, #{session => Session, subscriptions => Subscriptions}}
end,
DSSessionIds
),
maps:from_list(Sessions).
list_all_subscriptions() ->
DSSubIds = mnesia:dirty_all_keys(?SESSION_SUBSCRIPTIONS_TAB),
Subscriptions = lists:map(
fun(DSSubId) ->
[DSSub] = mnesia:dirty_read(?SESSION_SUBSCRIPTIONS_TAB, DSSubId),
{DSSubId, export_subscription(DSSub)}
end,
DSSubIds
),
maps:from_list(Subscriptions).
list_all_streams() ->
DSStreamIds = mnesia:dirty_all_keys(?SESSION_STREAM_TAB),
DSStreams = lists:map(
fun(DSStreamId) ->
Records = mnesia:dirty_read(?SESSION_STREAM_TAB, DSStreamId),
ExtDSStreams =
lists:map(
fun(Record) ->
export_record(
Record,
#ds_stream.session,
[session, topic_filter, stream, rank],
#{}
)
end,
Records
),
{DSStreamId, ExtDSStreams}
end,
DSStreamIds
),
maps:from_list(DSStreams).
list_all_iterators() ->
DSIterIds = mnesia:dirty_all_keys(?SESSION_ITER_TAB),
DSIters = lists:map(
fun(DSIterId) ->
[Record] = mnesia:dirty_read(?SESSION_ITER_TAB, DSIterId),
{DSIterId, export_record(Record, #ds_iter.id, [id, iter], #{})}
end,
DSIterIds
),
maps:from_list(DSIters).
%% ifdef(TEST)
-endif.

View File

@ -39,9 +39,10 @@
rank :: emqx_ds:stream_rank()
}).
-type ds_stream() :: #ds_stream{}.
-type ds_stream_bin() :: binary().
-record(ds_iter, {
id :: {emqx_persistent_session_ds:id(), emqx_ds:stream()},
id :: {emqx_persistent_session_ds:id(), ds_stream_bin()},
iter :: emqx_ds:iterator()
}).

View File

@ -176,6 +176,7 @@
t().
-callback open(clientinfo(), conninfo()) ->
{_IsPresent :: true, t(), _ReplayContext} | false.
-callback destroy(t() | clientinfo()) -> ok.
%%--------------------------------------------------------------------
%% Create a Session
@ -247,7 +248,14 @@ get_mqtt_conf(Zone, Key) ->
-spec destroy(clientinfo(), conninfo()) -> ok.
destroy(ClientInfo, ConnInfo) ->
(choose_impl_mod(ConnInfo)):destroy(ClientInfo).
%% When destroying/discarding a session, the current `ClientInfo' might suggest an
%% implementation which does not correspond to the one previously used by this client.
%% An example of this is a client that first connects with `Session-Expiry-Interval' >
%% 0, and later reconnects with `Session-Expiry-Interval' = 0 and `clean_start' =
%% true. So we may simply destroy sessions from all implementations, since the key
%% (ClientID) is the same.
Mods = choose_impl_candidates(ConnInfo),
lists:foreach(fun(Mod) -> Mod:destroy(ClientInfo) end, Mods).
-spec destroy(t()) -> ok.
destroy(Session) ->

View File

@ -44,6 +44,8 @@
%% State is stored in-memory in the process heap.
-module(emqx_session_mem).
-behaviour(emqx_session).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_session_mem.hrl").

View File

@ -599,6 +599,7 @@ t_publish_while_client_is_gone(Config) ->
ok = emqtt:disconnect(Client2).
%% TODO: don't skip after QoS2 support is added to DS.
t_clean_start_drops_subscriptions(init, Config) -> skip_ds_tc(Config);
t_clean_start_drops_subscriptions('end', _Config) -> ok.
t_clean_start_drops_subscriptions(Config) ->