diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 859f6fc91..afa6dffe5 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -224,7 +224,6 @@ publish(Msg) when is_record(Msg, message) -> }), []; Msg1 = #message{topic = Topic} -> - emqx_persistent_session:persist_message(Msg1), _ = emqx_persistent_session_ds:persist_message(Msg1), route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) end. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 5e594d35f..d879e5a2d 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -61,8 +61,7 @@ %% Export for emqx_channel implementations -export([ - maybe_nack/1, - maybe_mark_as_delivered/2 + maybe_nack/1 ]). %% Exports for CT @@ -199,11 +198,6 @@ info(timers, #channel{timers = Timers}) -> set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. -set_session(Session, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> - %% Assume that this is also an updated session. Allow side effect. - Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), - Channel#channel{session = Session1}. - -spec stats(channel()) -> emqx_types:stats(). stats(#channel{session = undefined}) -> emqx_pd:get_counters(?CHANNEL_METRICS); @@ -417,10 +411,10 @@ handle_in( case emqx_session:puback(ClientInfo, PacketId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), - {ok, set_session(NSession, Channel)}; + {ok, Channel#channel{session = NSession}}; {ok, Msg, Publishes, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), - handle_out(publish, Publishes, set_session(NSession, Channel)); + handle_out(publish, Publishes, Channel#channel{session = NSession}); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}), ok = emqx_metrics:inc('packets.puback.inuse'), @@ -438,7 +432,7 @@ handle_in( case emqx_session:pubrec(ClientInfo, PacketId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), - NChannel = set_session(NSession, Channel), + NChannel = Channel#channel{session = NSession}, handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> ?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}), @@ -458,7 +452,7 @@ handle_in( ) -> case emqx_session:pubrel(ClientInfo, PacketId, Session) of {ok, NSession} -> - NChannel = set_session(NSession, Channel), + NChannel = Channel#channel{session = NSession}, handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}), @@ -473,9 +467,9 @@ handle_in( ) -> case emqx_session:pubcomp(ClientInfo, PacketId, Session) of {ok, NSession} -> - {ok, set_session(NSession, Channel)}; + {ok, Channel#channel{session = NSession}}; {ok, Publishes, NSession} -> - handle_out(publish, Publishes, set_session(NSession, Channel)); + handle_out(publish, Publishes, Channel#channel{session = NSession}); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ok = emqx_metrics:inc('packets.pubcomp.inuse'), {ok, Channel}; @@ -734,7 +728,7 @@ do_publish( case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of {ok, PubRes, NSession} -> RC = pubrec_reason_code(PubRes), - NChannel0 = set_session(NSession, Channel), + NChannel0 = Channel#channel{session = NSession}, NChannel1 = ensure_timer(await_timer, NChannel0), NChannel2 = ensure_quota(PubRes, NChannel1), handle_out(pubrec, {PacketId, RC}, NChannel2); @@ -830,7 +824,7 @@ do_subscribe( NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel), case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of {ok, NSession} -> - {QoS, set_session(NSession, Channel)}; + {QoS, Channel#channel{session = NSession}}; {error, RC} -> ?SLOG( warning, @@ -869,7 +863,7 @@ do_unsubscribe( TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of {ok, NSession} -> - {?RC_SUCCESS, set_session(NSession, Channel)}; + {?RC_SUCCESS, Channel#channel{session = NSession}}; {error, RC} -> {RC, Channel} end. @@ -898,7 +892,7 @@ process_disconnect(ReasonCode, Properties, Channel) -> maybe_update_expiry_interval( #{'Session-Expiry-Interval' := Interval}, - Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo} + Channel = #channel{conninfo = ConnInfo} ) -> EI = timer:seconds(Interval), OldEI = maps:get(expiry_interval, ConnInfo, 0), @@ -907,12 +901,11 @@ maybe_update_expiry_interval( Channel; false -> NChannel = Channel#channel{conninfo = ConnInfo#{expiry_interval => EI}}, - ClientID = maps:get(clientid, ClientInfo, undefined), %% Check if the client turns off persistence (turning it on is disallowed) case EI =:= 0 andalso OldEI > 0 of true -> - S = emqx_persistent_session:discard(ClientID, NChannel#channel.session), - set_session(S, NChannel); + NSession = emqx_session:unpersist(NChannel#channel.session), + NChannel#channel{session = NSession}; false -> NChannel end @@ -956,9 +949,7 @@ handle_deliver( Delivers1 = maybe_nack(Delivers), Delivers2 = emqx_session:ignore_local(ClientInfo, Delivers1, ClientId, Session), NSession = emqx_session:enqueue(ClientInfo, Delivers2, Session), - NChannel = set_session(NSession, Channel), - %% We consider queued/dropped messages as delivered since they are now in the session state. - maybe_mark_as_delivered(Session, Delivers), + NChannel = Channel#channel{session = NSession}, {ok, NChannel}; handle_deliver( Delivers, @@ -976,11 +967,10 @@ handle_deliver( ) of {ok, Publishes, NSession} -> - NChannel = set_session(NSession, Channel), - maybe_mark_as_delivered(NSession, Delivers), + NChannel = Channel#channel{session = NSession}, handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel)); {ok, NSession} -> - {ok, set_session(NSession, Channel)} + {ok, Channel#channel{session = NSession}} end. %% Nack delivers from shared subscription @@ -996,15 +986,6 @@ not_nacked({deliver, _Topic, Msg}) -> true end. -maybe_mark_as_delivered(Session, Delivers) -> - case emqx_session:info(is_persistent, Session) of - false -> - skip; - true -> - SessionID = emqx_session:info(id, Session), - emqx_persistent_session:mark_as_delivered(SessionID, Delivers) - end. - %%-------------------------------------------------------------------- %% Handle outgoing packet %%-------------------------------------------------------------------- @@ -1096,11 +1077,11 @@ return_connack(AckPacket, Channel) -> ignore -> {ok, Replies, Channel}; {ok, Publishes, NSession} -> - NChannel0 = Channel#channel{ + NChannel1 = Channel#channel{ resuming = false, - pendings = [] + pendings = [], + session = NSession }, - NChannel1 = set_session(NSession, NChannel0), {Packets, NChannel2} = do_deliver(Publishes, NChannel1), Outgoing = [{outgoing, Packets} || length(Packets) > 0], {ok, Replies ++ Outgoing, NChannel2} @@ -1345,9 +1326,10 @@ handle_timeout( ) -> case emqx_session:retry(ClientInfo, Session) of {ok, NSession} -> - {ok, clean_timer(retry_timer, set_session(NSession, Channel))}; + NChannel = Channel#channel{session = NSession}, + {ok, clean_timer(retry_timer, NChannel)}; {ok, Publishes, Timeout, NSession} -> - NChannel = set_session(NSession, Channel), + NChannel = Channel#channel{session = NSession}, handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) end; handle_timeout( @@ -1363,9 +1345,11 @@ handle_timeout( ) -> case emqx_session:expire(ClientInfo, awaiting_rel, Session) of {ok, NSession} -> - {ok, clean_timer(await_timer, set_session(NSession, Channel))}; + NChannel = Channel#channel{session = NSession}, + {ok, clean_timer(await_timer, NChannel)}; {ok, Timeout, NSession} -> - {ok, reset_timer(await_timer, Timeout, set_session(NSession, Channel))} + NChannel = Channel#channel{session = NSession}, + {ok, reset_timer(await_timer, Timeout, NChannel)} end; handle_timeout(_TRef, expire_session, Channel) -> shutdown(expired, Channel); @@ -1453,25 +1437,11 @@ terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg %% if will_msg still exists when the session is terminated, it %% must be published immediately. WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg), - (Reason =:= expired) andalso persist_if_session(Channel), run_terminate_hook(Reason, Channel). -persist_if_session(#channel{session = Session} = Channel) -> - case emqx_session:is_session(Session) of - true -> - _ = emqx_persistent_session:persist( - Channel#channel.clientinfo, - Channel#channel.conninfo, - Channel#channel.session - ), - ok; - false -> - ok - end. - -run_terminate_hook(_Reason, #channel{session = undefined} = _Channel) -> +run_terminate_hook(_Reason, #channel{session = undefined}) -> ok; -run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session} = _Channel) -> +run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) -> emqx_session:terminate(ClientInfo, Reason, Session). %%-------------------------------------------------------------------- @@ -2096,11 +2066,9 @@ maybe_resume_session(#channel{ session = Session, resuming = true, pendings = Pendings, - clientinfo = #{clientid := ClientId} = ClientInfo + clientinfo = ClientInfo }) -> {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), - %% We consider queued/dropped messages as delivered since they are now in the session state. - emqx_persistent_session:mark_as_delivered(ClientId, Pendings), case emqx_session:deliver(ClientInfo, Pendings, Session1) of {ok, Session2} -> {ok, Publishes, Session2}; diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index c193cea44..6d18fef34 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -277,65 +277,24 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), CleanStart = fun(_) -> ok = discard_session(ClientId), - ok = emqx_persistent_session:discard_if_present(ClientId), - Session = create_session(ClientInfo, ConnInfo), - Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), - register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session1, present => false}} + ok = emqx_session:destroy(ClientId), + create_register_session(ClientInfo, ConnInfo, Self) end, emqx_cm_locker:trans(ClientId, CleanStart); open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), ResumeStart = fun(_) -> - CreateSess = - fun() -> - Session = create_session(ClientInfo, ConnInfo), - Session1 = emqx_persistent_session:persist( - ClientInfo, ConnInfo, Session - ), - register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session1, present => false}} - end, case takeover_session(ClientId) of - {persistent, Session} -> - %% This is a persistent session without a managing process. - {Session1, Pendings} = - emqx_persistent_session:resume(ClientInfo, ConnInfo, Session), - register_channel(ClientId, Self, ConnInfo), - - {ok, #{ - session => clean_session(Session1), - present => true, - pendings => clean_pendings(Pendings) - }}; {living, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), case wrap_rpc(emqx_cm_proto_v2:takeover_finish(ConnMod, ChanPid)) of {ok, Pendings} -> - Session1 = emqx_persistent_session:persist( - ClientInfo, ConnInfo, Session - ), - register_channel(ClientId, Self, ConnInfo), - {ok, #{ - session => clean_session(Session1), - present => true, - pendings => clean_pendings(Pendings) - }}; + clean_register_session(Session, Pendings, ClientInfo, ConnInfo, Self); {error, _} -> - CreateSess() + create_register_session(ClientInfo, ConnInfo, Self) end; - {expired, OldSession} -> - _ = emqx_persistent_session:discard(ClientId, OldSession), - Session = create_session(ClientInfo, ConnInfo), - Session1 = emqx_persistent_session:persist( - ClientInfo, - ConnInfo, - Session - ), - register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session1, present => false}}; none -> - CreateSess() + create_register_session(ClientInfo, ConnInfo, Self) end end, emqx_cm_locker:trans(ClientId, ResumeStart). @@ -347,6 +306,19 @@ create_session(ClientInfo, ConnInfo) -> ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), Session. +create_register_session(ClientInfo = #{clientid := ClientId}, ConnInfo, ChanPid) -> + Session = create_session(ClientInfo, ConnInfo), + ok = register_channel(ClientId, ChanPid, ConnInfo), + {ok, #{session => Session, present => false}}. + +clean_register_session(Session, Pendings, #{clientid := ClientId}, ConnInfo, ChanPid) -> + ok = register_channel(ClientId, ChanPid, ConnInfo), + {ok, #{ + session => clean_session(Session), + present => true, + pendings => clean_pendings(Pendings) + }}. + get_session_confs(#{zone := Zone, clientid := ClientId}, #{ receive_maximum := MaxInflight, expiry_interval := EI }) -> @@ -385,7 +357,7 @@ get_mqtt_conf(Zone, Key) -> takeover_session(ClientId) -> case lookup_channels(ClientId) of [] -> - emqx_persistent_session:lookup(ClientId); + emqx_session:lookup(ClientId); [ChanPid] -> takeover_session(ClientId, ChanPid); ChanPids -> @@ -417,16 +389,16 @@ takeover_session(ClientId, Pid) -> %% request_stepdown/3 R == unexpected_exception -> - emqx_persistent_session:lookup(ClientId); + emqx_session:lookup(ClientId); % rpc_call/3 _:{'EXIT', {noproc, _}} -> - emqx_persistent_session:lookup(ClientId) + emqx_session:lookup(ClientId) end. do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chann_conn_mod(ClientId, ChanPid) of undefined -> - emqx_persistent_session:lookup(ClientId); + emqx_session:lookup(ClientId); ConnMod when is_atom(ConnMod) -> case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of {ok, Session} -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 25bee629e..db0059709 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -47,13 +47,18 @@ -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("types.hrl"). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). -endif. +-export([ + lookup/1, + destroy/1, + unpersist/1 +]). + -export([init/1]). -export([ @@ -226,6 +231,23 @@ init(Opts) -> created_at = erlang:system_time(millisecond) }. +-spec lookup(emqx_types:clientid()) -> none. +lookup(_ClientId) -> + % NOTE + % This is a stub. This session impl has no backing store, thus always `none`. + none. + +-spec destroy(emqx_types:clientid()) -> ok. +destroy(_ClientId) -> + % NOTE + % This is a stub. This session impl has no backing store, thus always `ok`. + ok. + +-spec unpersist(session()) -> session(). +unpersist(Session) -> + ok = destroy(Session#session.clientid), + Session#session{is_persistent = false}. + %%-------------------------------------------------------------------- %% Info, Stats %%-------------------------------------------------------------------- @@ -242,6 +264,8 @@ info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; info(id, #session{id = Id}) -> Id; +info(clientid, #session{clientid = ClientId}) -> + ClientId; info(is_persistent, #session{is_persistent = Bool}) -> Bool; info(subscriptions, #session{subscriptions = Subs}) -> @@ -321,13 +345,12 @@ subscribe( ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts, - Session = #session{id = SessionID, is_persistent = IsPS, subscriptions = Subs} + Session = #session{subscriptions = Subs} ) -> IsNew = not maps:is_key(TopicFilter, Subs), case IsNew andalso is_subscriptions_full(Session) of false -> ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts), - ok = emqx_persistent_session:add_subscription(TopicFilter, SessionID, IsPS), ok = emqx_hooks:run( 'session.subscribed', [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}] @@ -355,12 +378,11 @@ unsubscribe( ClientInfo, TopicFilter, UnSubOpts, - Session = #session{id = SessionID, subscriptions = Subs, is_persistent = IsPS} + Session = #session{subscriptions = Subs} ) -> case maps:find(TopicFilter, Subs) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(TopicFilter), - ok = emqx_persistent_session:remove_subscription(TopicFilter, SessionID, IsPS), ok = emqx_hooks:run( 'session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)] diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index cc583c632..07cfabc70 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -31,7 +31,10 @@ all() -> [ - {group, persistent_store_enabled}, + % NOTE + % Tests are disabled while existing session persistence impl is being + % phased out. + % {group, persistent_store_enabled}, {group, persistent_store_disabled} ]. diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl index 3fd21f389..936df0b0c 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl @@ -30,12 +30,8 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([emqx_eviction_agent, emqx_conf]). -init_per_testcase(t_persistence, Config) -> - emqx_config:put([persistent_session_store, enabled], true), - {ok, _} = emqx_persistent_session_sup:start_link(), - emqx_persistent_session:init_db_backend(), - ?assert(emqx_persistent_session:is_store_enabled()), - Config; +init_per_testcase(t_persistence, _Config) -> + {skip, "Existing session persistence implementation is being phased out"}; init_per_testcase(_TestCase, Config) -> Config.