Merge pull request #11293 from keynslug/fix/EMQX-9591/phase-out-persistence

chore(ps): phase out existing session persistence mechanism
This commit is contained in:
Andrew Mayorov 2023-07-19 10:36:46 +02:00 committed by GitHub
commit 92166f287a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 87 additions and 138 deletions

View File

@ -224,7 +224,6 @@ publish(Msg) when is_record(Msg, message) ->
}), }),
[]; [];
Msg1 = #message{topic = Topic} -> Msg1 = #message{topic = Topic} ->
emqx_persistent_session:persist_message(Msg1),
_ = emqx_persistent_session_ds:persist_message(Msg1), _ = emqx_persistent_session_ds:persist_message(Msg1),
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
end. end.

View File

@ -61,8 +61,7 @@
%% Export for emqx_channel implementations %% Export for emqx_channel implementations
-export([ -export([
maybe_nack/1, maybe_nack/1
maybe_mark_as_delivered/2
]). ]).
%% Exports for CT %% Exports for CT
@ -199,11 +198,6 @@ info(timers, #channel{timers = Timers}) ->
set_conn_state(ConnState, Channel) -> set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}. Channel#channel{conn_state = ConnState}.
set_session(Session, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
%% Assume that this is also an updated session. Allow side effect.
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
Channel#channel{session = Session1}.
-spec stats(channel()) -> emqx_types:stats(). -spec stats(channel()) -> emqx_types:stats().
stats(#channel{session = undefined}) -> stats(#channel{session = undefined}) ->
emqx_pd:get_counters(?CHANNEL_METRICS); emqx_pd:get_counters(?CHANNEL_METRICS);
@ -417,10 +411,10 @@ handle_in(
case emqx_session:puback(ClientInfo, PacketId, Session) of case emqx_session:puback(ClientInfo, PacketId, Session) of
{ok, Msg, NSession} -> {ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Properties), ok = after_message_acked(ClientInfo, Msg, Properties),
{ok, set_session(NSession, Channel)}; {ok, Channel#channel{session = NSession}};
{ok, Msg, Publishes, NSession} -> {ok, Msg, Publishes, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Properties), ok = after_message_acked(ClientInfo, Msg, Properties),
handle_out(publish, Publishes, set_session(NSession, Channel)); handle_out(publish, Publishes, Channel#channel{session = NSession});
{error, ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}), ?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}),
ok = emqx_metrics:inc('packets.puback.inuse'), ok = emqx_metrics:inc('packets.puback.inuse'),
@ -438,7 +432,7 @@ handle_in(
case emqx_session:pubrec(ClientInfo, PacketId, Session) of case emqx_session:pubrec(ClientInfo, PacketId, Session) of
{ok, Msg, NSession} -> {ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Properties), ok = after_message_acked(ClientInfo, Msg, Properties),
NChannel = set_session(NSession, Channel), NChannel = Channel#channel{session = NSession},
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}), ?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}),
@ -458,7 +452,7 @@ handle_in(
) -> ) ->
case emqx_session:pubrel(ClientInfo, PacketId, Session) of case emqx_session:pubrel(ClientInfo, PacketId, Session) of
{ok, NSession} -> {ok, NSession} ->
NChannel = set_session(NSession, Channel), NChannel = Channel#channel{session = NSession},
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}), ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}),
@ -473,9 +467,9 @@ handle_in(
) -> ) ->
case emqx_session:pubcomp(ClientInfo, PacketId, Session) of case emqx_session:pubcomp(ClientInfo, PacketId, Session) of
{ok, NSession} -> {ok, NSession} ->
{ok, set_session(NSession, Channel)}; {ok, Channel#channel{session = NSession}};
{ok, Publishes, NSession} -> {ok, Publishes, NSession} ->
handle_out(publish, Publishes, set_session(NSession, Channel)); handle_out(publish, Publishes, Channel#channel{session = NSession});
{error, ?RC_PACKET_IDENTIFIER_IN_USE} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
ok = emqx_metrics:inc('packets.pubcomp.inuse'), ok = emqx_metrics:inc('packets.pubcomp.inuse'),
{ok, Channel}; {ok, Channel};
@ -734,7 +728,7 @@ do_publish(
case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of
{ok, PubRes, NSession} -> {ok, PubRes, NSession} ->
RC = pubrec_reason_code(PubRes), RC = pubrec_reason_code(PubRes),
NChannel0 = set_session(NSession, Channel), NChannel0 = Channel#channel{session = NSession},
NChannel1 = ensure_timer(await_timer, NChannel0), NChannel1 = ensure_timer(await_timer, NChannel0),
NChannel2 = ensure_quota(PubRes, NChannel1), NChannel2 = ensure_quota(PubRes, NChannel1),
handle_out(pubrec, {PacketId, RC}, NChannel2); handle_out(pubrec, {PacketId, RC}, NChannel2);
@ -830,7 +824,7 @@ do_subscribe(
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel), NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
{ok, NSession} -> {ok, NSession} ->
{QoS, set_session(NSession, Channel)}; {QoS, Channel#channel{session = NSession}};
{error, RC} -> {error, RC} ->
?SLOG( ?SLOG(
warning, warning,
@ -869,7 +863,7 @@ do_unsubscribe(
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
{ok, NSession} -> {ok, NSession} ->
{?RC_SUCCESS, set_session(NSession, Channel)}; {?RC_SUCCESS, Channel#channel{session = NSession}};
{error, RC} -> {error, RC} ->
{RC, Channel} {RC, Channel}
end. end.
@ -898,7 +892,7 @@ process_disconnect(ReasonCode, Properties, Channel) ->
maybe_update_expiry_interval( maybe_update_expiry_interval(
#{'Session-Expiry-Interval' := Interval}, #{'Session-Expiry-Interval' := Interval},
Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo} Channel = #channel{conninfo = ConnInfo}
) -> ) ->
EI = timer:seconds(Interval), EI = timer:seconds(Interval),
OldEI = maps:get(expiry_interval, ConnInfo, 0), OldEI = maps:get(expiry_interval, ConnInfo, 0),
@ -907,12 +901,11 @@ maybe_update_expiry_interval(
Channel; Channel;
false -> false ->
NChannel = Channel#channel{conninfo = ConnInfo#{expiry_interval => EI}}, NChannel = Channel#channel{conninfo = ConnInfo#{expiry_interval => EI}},
ClientID = maps:get(clientid, ClientInfo, undefined),
%% Check if the client turns off persistence (turning it on is disallowed) %% Check if the client turns off persistence (turning it on is disallowed)
case EI =:= 0 andalso OldEI > 0 of case EI =:= 0 andalso OldEI > 0 of
true -> true ->
S = emqx_persistent_session:discard(ClientID, NChannel#channel.session), NSession = emqx_session:unpersist(NChannel#channel.session),
set_session(S, NChannel); NChannel#channel{session = NSession};
false -> false ->
NChannel NChannel
end end
@ -956,9 +949,7 @@ handle_deliver(
Delivers1 = maybe_nack(Delivers), Delivers1 = maybe_nack(Delivers),
Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session), Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session),
NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session), NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session),
NChannel = set_session(NSession, Channel), NChannel = Channel#channel{session = NSession},
%% We consider queued/dropped messages as delivered since they are now in the session state.
maybe_mark_as_delivered(Session, Delivers),
{ok, NChannel}; {ok, NChannel};
handle_deliver( handle_deliver(
Delivers, Delivers,
@ -976,11 +967,10 @@ handle_deliver(
) )
of of
{ok, Publishes, NSession} -> {ok, Publishes, NSession} ->
NChannel = set_session(NSession, Channel), NChannel = Channel#channel{session = NSession},
maybe_mark_as_delivered(NSession, Delivers),
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel)); handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
{ok, NSession} -> {ok, NSession} ->
{ok, set_session(NSession, Channel)} {ok, Channel#channel{session = NSession}}
end. end.
%% Nack delivers from shared subscription %% Nack delivers from shared subscription
@ -996,15 +986,6 @@ not_nacked({deliver, _Topic, Msg}) ->
true true
end. end.
maybe_mark_as_delivered(Session, Delivers) ->
case emqx_session:info(is_persistent, Session) of
false ->
skip;
true ->
SessionID = emqx_session:info(id, Session),
emqx_persistent_session:mark_as_delivered(SessionID, Delivers)
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle outgoing packet %% Handle outgoing packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -1096,11 +1077,11 @@ return_connack(AckPacket, Channel) ->
ignore -> ignore ->
{ok, Replies, Channel}; {ok, Replies, Channel};
{ok, Publishes, NSession} -> {ok, Publishes, NSession} ->
NChannel0 = Channel#channel{ NChannel1 = Channel#channel{
resuming = false, resuming = false,
pendings = [] pendings = [],
session = NSession
}, },
NChannel1 = set_session(NSession, NChannel0),
{Packets, NChannel2} = do_deliver(Publishes, NChannel1), {Packets, NChannel2} = do_deliver(Publishes, NChannel1),
Outgoing = [{outgoing, Packets} || length(Packets) > 0], Outgoing = [{outgoing, Packets} || length(Packets) > 0],
{ok, Replies ++ Outgoing, NChannel2} {ok, Replies ++ Outgoing, NChannel2}
@ -1345,9 +1326,10 @@ handle_timeout(
) -> ) ->
case emqx_session:retry(ClientInfo, Session) of case emqx_session:retry(ClientInfo, Session) of
{ok, NSession} -> {ok, NSession} ->
{ok, clean_timer(retry_timer, set_session(NSession, Channel))}; NChannel = Channel#channel{session = NSession},
{ok, clean_timer(retry_timer, NChannel)};
{ok, Publishes, Timeout, NSession} -> {ok, Publishes, Timeout, NSession} ->
NChannel = set_session(NSession, Channel), NChannel = Channel#channel{session = NSession},
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
end; end;
handle_timeout( handle_timeout(
@ -1363,9 +1345,11 @@ handle_timeout(
) -> ) ->
case emqx_session:expire(ClientInfo, awaiting_rel, Session) of case emqx_session:expire(ClientInfo, awaiting_rel, Session) of
{ok, NSession} -> {ok, NSession} ->
{ok, clean_timer(await_timer, set_session(NSession, Channel))}; NChannel = Channel#channel{session = NSession},
{ok, clean_timer(await_timer, NChannel)};
{ok, Timeout, NSession} -> {ok, Timeout, NSession} ->
{ok, reset_timer(await_timer, Timeout, set_session(NSession, Channel))} NChannel = Channel#channel{session = NSession},
{ok, reset_timer(await_timer, Timeout, NChannel)}
end; end;
handle_timeout(_TRef, expire_session, Channel) -> handle_timeout(_TRef, expire_session, Channel) ->
shutdown(expired, Channel); shutdown(expired, Channel);
@ -1453,25 +1437,11 @@ terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg
%% if will_msg still exists when the session is terminated, it %% if will_msg still exists when the session is terminated, it
%% must be published immediately. %% must be published immediately.
WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg), WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg),
(Reason =:= expired) andalso persist_if_session(Channel),
run_terminate_hook(Reason, Channel). run_terminate_hook(Reason, Channel).
persist_if_session(#channel{session = Session} = Channel) -> run_terminate_hook(_Reason, #channel{session = undefined}) ->
case emqx_session:is_session(Session) of
true ->
_ = emqx_persistent_session:persist(
Channel#channel.clientinfo,
Channel#channel.conninfo,
Channel#channel.session
),
ok; ok;
false -> run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
ok
end.
run_terminate_hook(_Reason, #channel{session = undefined} = _Channel) ->
ok;
run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session} = _Channel) ->
emqx_session:terminate(ClientInfo, Reason, Session). emqx_session:terminate(ClientInfo, Reason, Session).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -2096,11 +2066,9 @@ maybe_resume_session(#channel{
session = Session, session = Session,
resuming = true, resuming = true,
pendings = Pendings, pendings = Pendings,
clientinfo = #{clientid := ClientId} = ClientInfo clientinfo = ClientInfo
}) -> }) ->
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session),
%% We consider queued/dropped messages as delivered since they are now in the session state.
emqx_persistent_session:mark_as_delivered(ClientId, Pendings),
case emqx_session:deliver(ClientInfo, Pendings, Session1) of case emqx_session:deliver(ClientInfo, Pendings, Session1) of
{ok, Session2} -> {ok, Session2} ->
{ok, Publishes, Session2}; {ok, Publishes, Session2};

View File

@ -277,65 +277,24 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(), Self = self(),
CleanStart = fun(_) -> CleanStart = fun(_) ->
ok = discard_session(ClientId), ok = discard_session(ClientId),
ok = emqx_persistent_session:discard_if_present(ClientId), ok = emqx_session:destroy(ClientId),
Session = create_session(ClientInfo, ConnInfo), create_register_session(ClientInfo, ConnInfo, Self)
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session1, present => false}}
end, end,
emqx_cm_locker:trans(ClientId, CleanStart); emqx_cm_locker:trans(ClientId, CleanStart);
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(), Self = self(),
ResumeStart = fun(_) -> ResumeStart = fun(_) ->
CreateSess =
fun() ->
Session = create_session(ClientInfo, ConnInfo),
Session1 = emqx_persistent_session:persist(
ClientInfo, ConnInfo, Session
),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session1, present => false}}
end,
case takeover_session(ClientId) of case takeover_session(ClientId) of
{persistent, Session} ->
%% This is a persistent session without a managing process.
{Session1, Pendings} =
emqx_persistent_session:resume(ClientInfo, ConnInfo, Session),
register_channel(ClientId, Self, ConnInfo),
{ok, #{
session => clean_session(Session1),
present => true,
pendings => clean_pendings(Pendings)
}};
{living, ConnMod, ChanPid, Session} -> {living, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session), ok = emqx_session:resume(ClientInfo, Session),
case wrap_rpc(emqx_cm_proto_v2:takeover_finish(ConnMod, ChanPid)) of case wrap_rpc(emqx_cm_proto_v2:takeover_finish(ConnMod, ChanPid)) of
{ok, Pendings} -> {ok, Pendings} ->
Session1 = emqx_persistent_session:persist( clean_register_session(Session, Pendings, ClientInfo, ConnInfo, Self);
ClientInfo, ConnInfo, Session
),
register_channel(ClientId, Self, ConnInfo),
{ok, #{
session => clean_session(Session1),
present => true,
pendings => clean_pendings(Pendings)
}};
{error, _} -> {error, _} ->
CreateSess() create_register_session(ClientInfo, ConnInfo, Self)
end; end;
{expired, OldSession} ->
_ = emqx_persistent_session:discard(ClientId, OldSession),
Session = create_session(ClientInfo, ConnInfo),
Session1 = emqx_persistent_session:persist(
ClientInfo,
ConnInfo,
Session
),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session1, present => false}};
none -> none ->
CreateSess() create_register_session(ClientInfo, ConnInfo, Self)
end end
end, end,
emqx_cm_locker:trans(ClientId, ResumeStart). emqx_cm_locker:trans(ClientId, ResumeStart).
@ -347,6 +306,19 @@ create_session(ClientInfo, ConnInfo) ->
ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]),
Session. Session.
create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, ChanPid) ->
Session = create_session(ClientInfo, ConnInfo),
ok = register_channel(ClientId, ChanPid, ConnInfo),
{ok, #{session => Session, present => false}}.
clean_register_session(Session, Pendings, #{clientid := ClientId}, ConnInfo, ChanPid) ->
ok = register_channel(ClientId, ChanPid, ConnInfo),
{ok, #{
session => clean_session(Session),
present => true,
pendings => clean_pendings(Pendings)
}}.
get_session_confs(#{zone := Zone, clientid := ClientId}, #{ get_session_confs(#{zone := Zone, clientid := ClientId}, #{
receive_maximum := MaxInflight, expiry_interval := EI receive_maximum := MaxInflight, expiry_interval := EI
}) -> }) ->
@ -385,7 +357,7 @@ get_mqtt_conf(Zone, Key) ->
takeover_session(ClientId) -> takeover_session(ClientId) ->
case lookup_channels(ClientId) of case lookup_channels(ClientId) of
[] -> [] ->
emqx_persistent_session:lookup(ClientId); emqx_session:lookup(ClientId);
[ChanPid] -> [ChanPid] ->
takeover_session(ClientId, ChanPid); takeover_session(ClientId, ChanPid);
ChanPids -> ChanPids ->
@ -417,16 +389,16 @@ takeover_session(ClientId, Pid) ->
%% request_stepdown/3 %% request_stepdown/3
R == unexpected_exception R == unexpected_exception
-> ->
emqx_persistent_session:lookup(ClientId); emqx_session:lookup(ClientId);
% rpc_call/3 % rpc_call/3
_:{'EXIT', {noproc, _}} -> _:{'EXIT', {noproc, _}} ->
emqx_persistent_session:lookup(ClientId) emqx_session:lookup(ClientId)
end. end.
do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chann_conn_mod(ClientId, ChanPid) of case get_chann_conn_mod(ClientId, ChanPid) of
undefined -> undefined ->
emqx_persistent_session:lookup(ClientId); emqx_session:lookup(ClientId);
ConnMod when is_atom(ConnMod) -> ConnMod when is_atom(ConnMod) ->
case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
{ok, Session} -> {ok, Session} ->

View File

@ -47,13 +47,18 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-endif. -endif.
-export([
lookup/1,
destroy/1,
unpersist/1
]).
-export([init/1]). -export([init/1]).
-export([ -export([
@ -226,6 +231,23 @@ init(Opts) ->
created_at = erlang:system_time(millisecond) created_at = erlang:system_time(millisecond)
}. }.
-spec lookup(emqx_types:clientid()) -> none.
lookup(_ClientId) ->
% NOTE
% This is a stub. This session impl has no backing store, thus always `none`.
none.
-spec destroy(emqx_types:clientid()) -> ok.
destroy(_ClientId) ->
% NOTE
% This is a stub. This session impl has no backing store, thus always `ok`.
ok.
-spec unpersist(session()) -> session().
unpersist(Session) ->
ok = destroy(Session#session.clientid),
Session#session{is_persistent = false}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Info, Stats %% Info, Stats
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -242,6 +264,8 @@ info(Keys, Session) when is_list(Keys) ->
[{Key, info(Key, Session)} || Key <- Keys]; [{Key, info(Key, Session)} || Key <- Keys];
info(id, #session{id = Id}) -> info(id, #session{id = Id}) ->
Id; Id;
info(clientid, #session{clientid = ClientId}) ->
ClientId;
info(is_persistent, #session{is_persistent = Bool}) -> info(is_persistent, #session{is_persistent = Bool}) ->
Bool; Bool;
info(subscriptions, #session{subscriptions = Subs}) -> info(subscriptions, #session{subscriptions = Subs}) ->
@ -321,13 +345,12 @@ subscribe(
ClientInfo = #{clientid := ClientId}, ClientInfo = #{clientid := ClientId},
TopicFilter, TopicFilter,
SubOpts, SubOpts,
Session = #session{id = SessionID, is_persistent = IsPS, subscriptions = Subs} Session = #session{subscriptions = Subs}
) -> ) ->
IsNew = not maps:is_key(TopicFilter, Subs), IsNew = not maps:is_key(TopicFilter, Subs),
case IsNew andalso is_subscriptions_full(Session) of case IsNew andalso is_subscriptions_full(Session) of
false -> false ->
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts), ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts),
ok = emqx_persistent_session:add_subscription(TopicFilter, SessionID, IsPS),
ok = emqx_hooks:run( ok = emqx_hooks:run(
'session.subscribed', 'session.subscribed',
[ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}] [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]
@ -355,12 +378,11 @@ unsubscribe(
ClientInfo, ClientInfo,
TopicFilter, TopicFilter,
UnSubOpts, UnSubOpts,
Session = #session{id = SessionID, subscriptions = Subs, is_persistent = IsPS} Session = #session{subscriptions = Subs}
) -> ) ->
case maps:find(TopicFilter, Subs) of case maps:find(TopicFilter, Subs) of
{ok, SubOpts} -> {ok, SubOpts} ->
ok = emqx_broker:unsubscribe(TopicFilter), ok = emqx_broker:unsubscribe(TopicFilter),
ok = emqx_persistent_session:remove_subscription(TopicFilter, SessionID, IsPS),
ok = emqx_hooks:run( ok = emqx_hooks:run(
'session.unsubscribed', 'session.unsubscribed',
[ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)] [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]

View File

@ -31,7 +31,10 @@
all() -> all() ->
[ [
{group, persistent_store_enabled}, % NOTE
% Tests are disabled while existing session persistence impl is being
% phased out.
% {group, persistent_store_enabled},
{group, persistent_store_disabled} {group, persistent_store_disabled}
]. ].

View File

@ -1,6 +1,6 @@
{application, emqx_eviction_agent, [ {application, emqx_eviction_agent, [
{description, "EMQX Eviction Agent"}, {description, "EMQX Eviction Agent"},
{vsn, "5.0.1"}, {vsn, "5.1.0"},
{registered, [ {registered, [
emqx_eviction_agent_sup, emqx_eviction_agent_sup,
emqx_eviction_agent, emqx_eviction_agent,

View File

@ -165,9 +165,8 @@ handle_cast(Msg, Channel) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
{noreply, Channel}. {noreply, Channel}.
terminate(Reason, #{conninfo := ConnInfo, clientinfo := ClientInfo, session := Session} = Channel) -> terminate(Reason, #{clientinfo := ClientInfo, session := Session} = Channel) ->
ok = cancel_expiry_timer(Channel), ok = cancel_expiry_timer(Channel),
(Reason =:= expired) andalso emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
emqx_session:terminate(ClientInfo, Reason, Session). emqx_session:terminate(ClientInfo, Reason, Session).
code_change(_OldVsn, Channel, _Extra) -> code_change(_OldVsn, Channel, _Extra) ->
@ -205,10 +204,7 @@ handle_deliver(
Delivers1 = emqx_channel:maybe_nack(Delivers), Delivers1 = emqx_channel:maybe_nack(Delivers),
Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session), Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session),
NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session), NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session),
NChannel = persist(NSession, Channel), Channel#{session := NSession}.
%% We consider queued/dropped messages as delivered since they are now in the session state.
emqx_channel:maybe_mark_as_delivered(Session, Delivers),
NChannel.
cancel_expiry_timer(#{expiry_timer := TRef}) when is_reference(TRef) -> cancel_expiry_timer(#{expiry_timer := TRef}) when is_reference(TRef) ->
_ = erlang:cancel_timer(TRef), _ = erlang:cancel_timer(TRef),
@ -334,10 +330,6 @@ channel(ConnInfo, ClientInfo) ->
pendings => [] pendings => []
}. }.
persist(Session, #{clientinfo := ClientInfo, conninfo := ConnInfo} = Channel) ->
Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session),
Channel#{session => Session1}.
info(Channel) -> info(Channel) ->
#{ #{
conninfo => maps:get(conninfo, Channel, undefined), conninfo => maps:get(conninfo, Channel, undefined),

View File

@ -30,19 +30,12 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_conf]). emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_conf]).
init_per_testcase(t_persistence, Config) -> init_per_testcase(t_persistence, _Config) ->
emqx_config:put([persistent_session_store, enabled], true), {skip, "Existing session persistence implementation is being phased out"};
{ok, _} = emqx_persistent_session_sup:start_link(),
emqx_persistent_session:init_db_backend(),
?assert(emqx_persistent_session:is_store_enabled()),
Config;
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Config. Config.
end_per_testcase(t_persistence, Config) -> end_per_testcase(t_persistence, Config) ->
emqx_config:put([persistent_session_store, enabled], false),
emqx_persistent_session:init_db_backend(),
?assertNot(emqx_persistent_session:is_store_enabled()),
Config; Config;
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, _Config) ->
ok. ok.