Merge pull request #11930 from thalesmg/ds-session-discard-m-20231110
feat(ds): implement session discard
This commit is contained in:
commit
01a2a3b1c0
|
@ -11,6 +11,8 @@
|
|||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
|
||||
-include_lib("emqx/src/emqx_persistent_session_ds.hrl").
|
||||
|
||||
-define(DEFAULT_KEYSPACE, default).
|
||||
-define(DS_SHARD_ID, <<"local">>).
|
||||
-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
|
||||
|
@ -118,6 +120,7 @@ start_client(Opts0 = #{}) ->
|
|||
properties => #{'Session-Expiry-Interval' => 300}
|
||||
},
|
||||
Opts = maps:to_list(emqx_utils_maps:deep_merge(Defaults, Opts0)),
|
||||
ct:pal("starting client with opts:\n ~p", [Opts]),
|
||||
{ok, Client} = emqtt:start_link(Opts),
|
||||
on_exit(fun() -> catch emqtt:stop(Client) end),
|
||||
Client.
|
||||
|
@ -148,6 +151,9 @@ restart_node(Node, NodeSpec) ->
|
|||
?tp(restarted_node, #{}),
|
||||
ok.
|
||||
|
||||
is_persistent_connect_opts(#{properties := #{'Session-Expiry-Interval' := EI}}) ->
|
||||
EI > 0.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -309,3 +315,94 @@ t_session_unsubscription_idempotency(Config) ->
|
|||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_session_discard_persistent_to_non_persistent(_Config) ->
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
Params = #{
|
||||
client_id => ClientId,
|
||||
reconnect_opts =>
|
||||
#{
|
||||
clean_start => true,
|
||||
%% we set it to zero so that a new session is not created.
|
||||
properties => #{'Session-Expiry-Interval' => 0},
|
||||
proto_ver => v5
|
||||
}
|
||||
},
|
||||
do_t_session_discard(Params).
|
||||
|
||||
t_session_discard_persistent_to_persistent(_Config) ->
|
||||
ClientId = atom_to_binary(?FUNCTION_NAME),
|
||||
Params = #{
|
||||
client_id => ClientId,
|
||||
reconnect_opts =>
|
||||
#{
|
||||
clean_start => true,
|
||||
properties => #{'Session-Expiry-Interval' => 30},
|
||||
proto_ver => v5
|
||||
}
|
||||
},
|
||||
do_t_session_discard(Params).
|
||||
|
||||
do_t_session_discard(Params) ->
|
||||
#{
|
||||
client_id := ClientId,
|
||||
reconnect_opts := ReconnectOpts0
|
||||
} = Params,
|
||||
ReconnectOpts = ReconnectOpts0#{clientid => ClientId},
|
||||
SubTopicFilter = <<"t/+">>,
|
||||
?check_trace(
|
||||
begin
|
||||
?tp(notice, "starting", #{}),
|
||||
Client0 = start_client(#{
|
||||
clientid => ClientId,
|
||||
clean_start => false,
|
||||
properties => #{'Session-Expiry-Interval' => 30},
|
||||
proto_ver => v5
|
||||
}),
|
||||
{ok, _} = emqtt:connect(Client0),
|
||||
?tp(notice, "subscribing", #{}),
|
||||
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client0, SubTopicFilter, qos2),
|
||||
%% Store some matching messages so that streams and iterators are created.
|
||||
ok = emqtt:publish(Client0, <<"t/1">>, <<"1">>),
|
||||
ok = emqtt:publish(Client0, <<"t/2">>, <<"2">>),
|
||||
?retry(
|
||||
_Sleep0 = 100,
|
||||
_Attempts0 = 50,
|
||||
true = map_size(emqx_persistent_session_ds:list_all_streams()) > 0
|
||||
),
|
||||
?retry(
|
||||
_Sleep0 = 100,
|
||||
_Attempts0 = 50,
|
||||
true = map_size(emqx_persistent_session_ds:list_all_iterators()) > 0
|
||||
),
|
||||
ok = emqtt:stop(Client0),
|
||||
?tp(notice, "disconnected", #{}),
|
||||
|
||||
?tp(notice, "reconnecting", #{}),
|
||||
%% we still have iterators and streams
|
||||
?assert(map_size(emqx_persistent_session_ds:list_all_streams()) > 0),
|
||||
?assert(map_size(emqx_persistent_session_ds:list_all_iterators()) > 0),
|
||||
Client1 = start_client(ReconnectOpts),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
?assertEqual([], emqtt:subscriptions(Client1)),
|
||||
case is_persistent_connect_opts(ReconnectOpts) of
|
||||
true ->
|
||||
?assertMatch(#{ClientId := _}, emqx_persistent_session_ds:list_all_sessions());
|
||||
false ->
|
||||
?assertEqual(#{}, emqx_persistent_session_ds:list_all_sessions())
|
||||
end,
|
||||
?assertEqual(#{}, emqx_persistent_session_ds:list_all_subscriptions()),
|
||||
?assertEqual([], emqx_persistent_session_ds_router:topics()),
|
||||
?assertEqual(#{}, emqx_persistent_session_ds:list_all_streams()),
|
||||
?assertEqual(#{}, emqx_persistent_session_ds:list_all_iterators()),
|
||||
ok = emqtt:stop(Client1),
|
||||
?tp(notice, "disconnected", #{}),
|
||||
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
ct:pal("trace:\n ~p", [Trace]),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -258,21 +258,21 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
|
|||
end.
|
||||
|
||||
%% @doc Open a session.
|
||||
-spec open_session(boolean(), emqx_types:clientinfo(), emqx_types:conninfo()) ->
|
||||
-spec open_session(_CleanStart :: boolean(), emqx_types:clientinfo(), emqx_types:conninfo()) ->
|
||||
{ok, #{
|
||||
session := emqx_session:t(),
|
||||
present := boolean(),
|
||||
replay => _ReplayContext
|
||||
}}
|
||||
| {error, Reason :: term()}.
|
||||
open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||
open_session(_CleanStart = true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||
Self = self(),
|
||||
emqx_cm_locker:trans(ClientId, fun(_) ->
|
||||
ok = discard_session(ClientId),
|
||||
ok = emqx_session:destroy(ClientInfo, ConnInfo),
|
||||
create_register_session(ClientInfo, ConnInfo, Self)
|
||||
end);
|
||||
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||
open_session(_CleanStart = false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||
Self = self(),
|
||||
emqx_cm_locker:trans(ClientId, fun(_) ->
|
||||
case emqx_session:open(ClientInfo, ConnInfo) of
|
||||
|
|
|
@ -188,8 +188,12 @@ fetch(SessionId, Inflight0, [Stream | Streams], N, Publishes0) ->
|
|||
end.
|
||||
|
||||
-spec update_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream(), emqx_ds:iterator()) -> ok.
|
||||
update_iterator(SessionId, Stream, Iterator) ->
|
||||
mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {SessionId, Stream}, iter = Iterator}).
|
||||
update_iterator(DSSessionId, Stream, Iterator) ->
|
||||
%% Workaround: we convert `Stream' to a binary before attempting to store it in
|
||||
%% mnesia(rocksdb) because of a bug in `mnesia_rocksdb' when trying to do
|
||||
%% `mnesia:dirty_all_keys' later.
|
||||
StreamBin = term_to_binary(Stream),
|
||||
mria:dirty_write(?SESSION_ITER_TAB, #ds_iter{id = {DSSessionId, StreamBin}, iter = Iterator}).
|
||||
|
||||
get_last_iterator(SessionId, Stream, Ranges) ->
|
||||
case lists:keyfind(Stream, #range.stream, lists:reverse(Ranges)) of
|
||||
|
@ -200,8 +204,10 @@ get_last_iterator(SessionId, Stream, Ranges) ->
|
|||
end.
|
||||
|
||||
-spec get_iterator(emqx_persistent_session_ds:id(), emqx_ds:stream()) -> emqx_ds:iterator().
|
||||
get_iterator(SessionId, Stream) ->
|
||||
Id = {SessionId, Stream},
|
||||
get_iterator(DSSessionId, Stream) ->
|
||||
%% See comment in `update_iterator'.
|
||||
StreamBin = term_to_binary(Stream),
|
||||
Id = {DSSessionId, StreamBin},
|
||||
[#ds_iter{iter = It}] = mnesia:dirty_read(?SESSION_ITER_TAB, Id),
|
||||
It.
|
||||
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(emqx_persistent_session_ds).
|
||||
|
||||
-behaviour(emqx_session).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("stdlib/include/ms_transform.hrl").
|
||||
|
@ -69,7 +71,13 @@
|
|||
]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([session_open/1]).
|
||||
-export([
|
||||
session_open/1,
|
||||
list_all_sessions/0,
|
||||
list_all_subscriptions/0,
|
||||
list_all_streams/0,
|
||||
list_all_iterators/0
|
||||
]).
|
||||
-endif.
|
||||
|
||||
%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
|
||||
|
@ -537,14 +545,24 @@ session_create(SessionId, Props) ->
|
|||
-spec session_drop(id()) -> ok.
|
||||
session_drop(DSSessionId) ->
|
||||
transaction(fun() ->
|
||||
%% TODO: ensure all iterators from this clientid are closed?
|
||||
ok = session_drop_subscriptions(DSSessionId),
|
||||
ok = session_drop_iterators(DSSessionId),
|
||||
ok = session_drop_streams(DSSessionId),
|
||||
ok = mnesia:delete(?SESSION_TAB, DSSessionId, write)
|
||||
end).
|
||||
|
||||
-spec session_drop_subscriptions(id()) -> ok.
|
||||
session_drop_subscriptions(DSSessionId) ->
|
||||
IteratorRefs = session_read_subscriptions(DSSessionId),
|
||||
ok = lists:foreach(fun session_del_subscription/1, IteratorRefs).
|
||||
Subscriptions = session_read_subscriptions(DSSessionId),
|
||||
lists:foreach(
|
||||
fun(#ds_sub{id = DSSubId} = DSSub) ->
|
||||
TopicFilter = subscription_id_to_topic_filter(DSSubId),
|
||||
TopicFilterBin = emqx_topic:join(TopicFilter),
|
||||
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionId),
|
||||
ok = session_del_subscription(DSSub)
|
||||
end,
|
||||
Subscriptions
|
||||
).
|
||||
|
||||
%% @doc Called when a client subscribes to a topic. Idempotent.
|
||||
-spec session_add_subscription(id(), topic_filter(), _Props :: map()) ->
|
||||
|
@ -615,6 +633,10 @@ new_subscription_id(DSSessionId, TopicFilter) ->
|
|||
DSSubId = {DSSessionId, TopicFilter},
|
||||
{DSSubId, NowMS}.
|
||||
|
||||
-spec subscription_id_to_topic_filter(subscription_id()) -> topic_filter().
|
||||
subscription_id_to_topic_filter({_DSSessionId, TopicFilter}) ->
|
||||
TopicFilter.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% RPC targets (v1)
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -639,24 +661,26 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
|
|||
%% Reading batches
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
renew_streams(Id) ->
|
||||
Subscriptions = ro_transaction(fun() -> session_read_subscriptions(Id) end),
|
||||
ExistingStreams = ro_transaction(fun() -> mnesia:read(?SESSION_STREAM_TAB, Id) end),
|
||||
-spec renew_streams(id()) -> ok.
|
||||
renew_streams(DSSessionId) ->
|
||||
Subscriptions = ro_transaction(fun() -> session_read_subscriptions(DSSessionId) end),
|
||||
ExistingStreams = ro_transaction(fun() -> mnesia:read(?SESSION_STREAM_TAB, DSSessionId) end),
|
||||
lists:foreach(
|
||||
fun(#ds_sub{id = {_, TopicFilter}, start_time = StartTime}) ->
|
||||
renew_streams(Id, ExistingStreams, TopicFilter, StartTime)
|
||||
renew_streams(DSSessionId, ExistingStreams, TopicFilter, StartTime)
|
||||
end,
|
||||
Subscriptions
|
||||
).
|
||||
|
||||
renew_streams(Id, ExistingStreams, TopicFilter, StartTime) ->
|
||||
-spec renew_streams(id(), [ds_stream()], emqx_ds:topic_filter(), emqx_ds:time()) -> ok.
|
||||
renew_streams(DSSessionId, ExistingStreams, TopicFilter, StartTime) ->
|
||||
AllStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime),
|
||||
transaction(
|
||||
fun() ->
|
||||
lists:foreach(
|
||||
fun({Rank, Stream}) ->
|
||||
Rec = #ds_stream{
|
||||
session = Id,
|
||||
session = DSSessionId,
|
||||
topic_filter = TopicFilter,
|
||||
stream = Stream,
|
||||
rank = Rank
|
||||
|
@ -669,7 +693,12 @@ renew_streams(Id, ExistingStreams, TopicFilter, StartTime) ->
|
|||
{ok, Iterator} = emqx_ds:make_iterator(
|
||||
?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime
|
||||
),
|
||||
IterRec = #ds_iter{id = {Id, Stream}, iter = Iterator},
|
||||
%% Workaround: we convert `Stream' to a binary before
|
||||
%% attempting to store it in mnesia(rocksdb) because of a bug
|
||||
%% in `mnesia_rocksdb' when trying to do
|
||||
%% `mnesia:dirty_all_keys' later.
|
||||
StreamBin = term_to_binary(Stream),
|
||||
IterRec = #ds_iter{id = {DSSessionId, StreamBin}, iter = Iterator},
|
||||
mnesia:write(?SESSION_ITER_TAB, IterRec, write)
|
||||
end
|
||||
end,
|
||||
|
@ -678,6 +707,33 @@ renew_streams(Id, ExistingStreams, TopicFilter, StartTime) ->
|
|||
end
|
||||
).
|
||||
|
||||
%% must be called inside a transaction
|
||||
-spec session_drop_streams(id()) -> ok.
|
||||
session_drop_streams(DSSessionId) ->
|
||||
MS = ets:fun2ms(
|
||||
fun(#ds_stream{session = DSSessionId0}) when DSSessionId0 =:= DSSessionId ->
|
||||
DSSessionId0
|
||||
end
|
||||
),
|
||||
StreamIDs = mnesia:select(?SESSION_STREAM_TAB, MS, write),
|
||||
lists:foreach(fun(Key) -> mnesia:delete(?SESSION_STREAM_TAB, Key, write) end, StreamIDs).
|
||||
|
||||
%% must be called inside a transaction
|
||||
-spec session_drop_iterators(id()) -> ok.
|
||||
session_drop_iterators(DSSessionId) ->
|
||||
MS = ets:fun2ms(
|
||||
fun(#ds_iter{id = {DSSessionId0, StreamBin}}) when DSSessionId0 =:= DSSessionId ->
|
||||
StreamBin
|
||||
end
|
||||
),
|
||||
StreamBins = mnesia:select(?SESSION_ITER_TAB, MS, write),
|
||||
lists:foreach(
|
||||
fun(StreamBin) ->
|
||||
mnesia:delete(?SESSION_ITER_TAB, {DSSessionId, StreamBin}, write)
|
||||
end,
|
||||
StreamBins
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------------------
|
||||
|
||||
transaction(Fun) ->
|
||||
|
@ -724,3 +780,63 @@ ensure_timer(Type) ->
|
|||
ensure_timer(Type, Timeout) ->
|
||||
_ = emqx_utils:start_timer(Timeout, {emqx_session, Type}),
|
||||
ok.
|
||||
|
||||
-ifdef(TEST).
|
||||
list_all_sessions() ->
|
||||
DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB),
|
||||
Sessions = lists:map(
|
||||
fun(SessionID) ->
|
||||
{ok, Session, Subscriptions} = session_open(SessionID),
|
||||
{SessionID, #{session => Session, subscriptions => Subscriptions}}
|
||||
end,
|
||||
DSSessionIds
|
||||
),
|
||||
maps:from_list(Sessions).
|
||||
|
||||
list_all_subscriptions() ->
|
||||
DSSubIds = mnesia:dirty_all_keys(?SESSION_SUBSCRIPTIONS_TAB),
|
||||
Subscriptions = lists:map(
|
||||
fun(DSSubId) ->
|
||||
[DSSub] = mnesia:dirty_read(?SESSION_SUBSCRIPTIONS_TAB, DSSubId),
|
||||
{DSSubId, export_subscription(DSSub)}
|
||||
end,
|
||||
DSSubIds
|
||||
),
|
||||
maps:from_list(Subscriptions).
|
||||
|
||||
list_all_streams() ->
|
||||
DSStreamIds = mnesia:dirty_all_keys(?SESSION_STREAM_TAB),
|
||||
DSStreams = lists:map(
|
||||
fun(DSStreamId) ->
|
||||
Records = mnesia:dirty_read(?SESSION_STREAM_TAB, DSStreamId),
|
||||
ExtDSStreams =
|
||||
lists:map(
|
||||
fun(Record) ->
|
||||
export_record(
|
||||
Record,
|
||||
#ds_stream.session,
|
||||
[session, topic_filter, stream, rank],
|
||||
#{}
|
||||
)
|
||||
end,
|
||||
Records
|
||||
),
|
||||
{DSStreamId, ExtDSStreams}
|
||||
end,
|
||||
DSStreamIds
|
||||
),
|
||||
maps:from_list(DSStreams).
|
||||
|
||||
list_all_iterators() ->
|
||||
DSIterIds = mnesia:dirty_all_keys(?SESSION_ITER_TAB),
|
||||
DSIters = lists:map(
|
||||
fun(DSIterId) ->
|
||||
[Record] = mnesia:dirty_read(?SESSION_ITER_TAB, DSIterId),
|
||||
{DSIterId, export_record(Record, #ds_iter.id, [id, iter], #{})}
|
||||
end,
|
||||
DSIterIds
|
||||
),
|
||||
maps:from_list(DSIters).
|
||||
|
||||
%% ifdef(TEST)
|
||||
-endif.
|
||||
|
|
|
@ -39,9 +39,10 @@
|
|||
rank :: emqx_ds:stream_rank()
|
||||
}).
|
||||
-type ds_stream() :: #ds_stream{}.
|
||||
-type ds_stream_bin() :: binary().
|
||||
|
||||
-record(ds_iter, {
|
||||
id :: {emqx_persistent_session_ds:id(), emqx_ds:stream()},
|
||||
id :: {emqx_persistent_session_ds:id(), ds_stream_bin()},
|
||||
iter :: emqx_ds:iterator()
|
||||
}).
|
||||
|
||||
|
|
|
@ -176,6 +176,7 @@
|
|||
t().
|
||||
-callback open(clientinfo(), conninfo()) ->
|
||||
{_IsPresent :: true, t(), _ReplayContext} | false.
|
||||
-callback destroy(t() | clientinfo()) -> ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Create a Session
|
||||
|
@ -247,7 +248,14 @@ get_mqtt_conf(Zone, Key) ->
|
|||
|
||||
-spec destroy(clientinfo(), conninfo()) -> ok.
|
||||
destroy(ClientInfo, ConnInfo) ->
|
||||
(choose_impl_mod(ConnInfo)):destroy(ClientInfo).
|
||||
%% When destroying/discarding a session, the current `ClientInfo' might suggest an
|
||||
%% implementation which does not correspond to the one previously used by this client.
|
||||
%% An example of this is a client that first connects with `Session-Expiry-Interval' >
|
||||
%% 0, and later reconnects with `Session-Expiry-Interval' = 0 and `clean_start' =
|
||||
%% true. So we may simply destroy sessions from all implementations, since the key
|
||||
%% (ClientID) is the same.
|
||||
Mods = choose_impl_candidates(ConnInfo),
|
||||
lists:foreach(fun(Mod) -> Mod:destroy(ClientInfo) end, Mods).
|
||||
|
||||
-spec destroy(t()) -> ok.
|
||||
destroy(Session) ->
|
||||
|
|
|
@ -44,6 +44,8 @@
|
|||
%% State is stored in-memory in the process heap.
|
||||
-module(emqx_session_mem).
|
||||
|
||||
-behaviour(emqx_session).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include("emqx_session_mem.hrl").
|
||||
|
|
|
@ -599,6 +599,7 @@ t_publish_while_client_is_gone(Config) ->
|
|||
|
||||
ok = emqtt:disconnect(Client2).
|
||||
|
||||
%% TODO: don't skip after QoS2 support is added to DS.
|
||||
t_clean_start_drops_subscriptions(init, Config) -> skip_ds_tc(Config);
|
||||
t_clean_start_drops_subscriptions('end', _Config) -> ok.
|
||||
t_clean_start_drops_subscriptions(Config) ->
|
||||
|
|
Loading…
Reference in New Issue