From ac6693c8ccf0a8734ba3d9db754ed4cd48ae6a24 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 9 Jun 2023 18:14:35 +0800 Subject: [PATCH] refactor(mqttsn): takeover can resume the registrations of session --- apps/emqx_gateway/src/emqx_gateway_cm.erl | 2 +- .../src/emqx_mqttsn_channel.erl | 103 +++++++------ .../src/emqx_mqttsn_registry.erl | 46 +++--- .../src/emqx_mqttsn_session.erl | 144 ++++++++++++++++++ 4 files changed, 225 insertions(+), 70 deletions(-) create mode 100644 apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 814e37163..4c07d3938 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -389,7 +389,7 @@ open_session( end, case takeover_session(GwName, ClientId) of {ok, ConnMod, ChanPid, Session} -> - ok = emqx_session:resume(ClientInfo, Session), + ok = SessionMod:resume(ClientInfo, Session), case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of {ok, Pendings} -> register_channel( diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index fb8aa76e4..720c288d3 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -51,8 +51,6 @@ -record(channel, { %% Context ctx :: emqx_gateway_ctx:context(), - %% Registry - registry :: emqx_mqttsn_registry:registry(), %% Gateway Id gateway_id :: integer(), %% Enable negative_qos @@ -62,7 +60,7 @@ %% MQTT-SN Client Info clientinfo :: emqx_types:clientinfo(), %% Session - session :: emqx_session:session() | undefined, + session :: emqx_mqttsn_session:session() | undefined, %% Keepalive keepalive :: emqx_keepalive:keepalive() | undefined, %% Will Msg @@ -147,7 +145,6 @@ init( ) -> Peercert = maps:get(peercert, ConnInfo, undefined), Mountpoint = maps:get(mountpoint, Option, undefined), - Registry = maps:get(registry, Option), GwId = maps:get(gateway_id, Option), EnableNegQoS = maps:get(enable_qos3, Option, true), ListenerId = @@ -180,7 +177,6 @@ init( ), #channel{ ctx = Ctx, - registry = Registry, gateway_id = GwId, enable_negative_qos = EnableNegQoS, conninfo = ConnInfo, @@ -217,7 +213,7 @@ info(conn_state, #channel{conn_state = ConnState}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> - emqx_utils:maybe_apply(fun emqx_session:info/1, Session); + emqx_utils:maybe_apply(fun emqx_mqttsn_session:info/1, Session); info(will_msg, #channel{will_msg = WillMsg}) -> WillMsg; info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> @@ -229,7 +225,7 @@ info(ctx, #channel{ctx = Ctx}) -> stats(#channel{session = undefined}) -> []; stats(#channel{session = Session}) -> - emqx_session:stats(Session). + emqx_mqttsn_session:stats(Session). set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. @@ -388,19 +384,15 @@ process_connect( clientinfo = ClientInfo } ) -> - SessFun = fun(ClientInfoT, _) -> - Conf = emqx_cm:get_session_confs( - ClientInfoT, #{receive_maximum => 1, expiry_interval => 0} - ), - emqx_session:init(Conf) - end, + SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT) end, case emqx_gateway_ctx:open_session( Ctx, CleanStart, ClientInfo, ConnInfo, - SessFun + SessFun, + _SessMod = emqx_mqttsn_session ) of {ok, #{ @@ -470,7 +462,7 @@ handle_in( MsgId, Data ), - Channel = #channel{conn_state = idle, registry = Registry} + Channel = #channel{conn_state = idle} ) -> case check_negative_qos_enable(Publish, Channel) of ok -> @@ -479,6 +471,7 @@ handle_in( ?SN_SHORT_TOPIC -> TopicId; ?SN_PREDEFINED_TOPIC -> + Registry = emqx_mqttsn_registry:init(), emqx_mqttsn_registry:lookup_topic(TopicId, Registry); _ -> undefined @@ -627,8 +620,9 @@ handle_in( end; handle_in( ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName), - Channel = #channel{registry = Registry} + Channel = #channel{session = Session} ) -> + Registry = emqx_mqttsn_session:registry(Session), case emqx_mqttsn_registry:reg(TopicName, Registry) of {ok, TopicId, NRegistry} -> ?SLOG(debug, #{ @@ -637,7 +631,8 @@ handle_in( topic_id => TopicId }), AckPacket = ?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), - {ok, {outgoing, AckPacket}, Channel#channel{registry = NRegistry}}; + NSession = emqx_mqttsn_session:set_registry(NRegistry, Session), + {ok, {outgoing, AckPacket}, Channel#channel{session = NSession}}; {error, too_large} -> ?SLOG(error, #{ msg => "register_topic_failed", @@ -749,14 +744,14 @@ handle_in( ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), Channel = #channel{ ctx = Ctx, - registry = Registry, session = Session, clientinfo = ClientInfo } ) -> + Registry = emqx_mqttsn_session:registry(Session), case ReturnCode of ?SN_RC_ACCEPTED -> - case emqx_session:puback(ClientInfo, MsgId, Session) of + case emqx_mqttsn_session:puback(ClientInfo, MsgId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent( @@ -813,7 +808,7 @@ handle_in( clientinfo = ClientInfo } ) -> - case emqx_session:pubrec(ClientInfo, MsgId, Session) of + case emqx_mqttsn_session:pubrec(ClientInfo, MsgId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Channel), NChannel = Channel#channel{session = NSession}, @@ -839,7 +834,7 @@ handle_in( ?SN_PUBREC_MSG(?SN_PUBREL, MsgId), Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo} ) -> - case emqx_session:pubrel(ClientInfo, MsgId, Session) of + case emqx_mqttsn_session:pubrel(ClientInfo, MsgId, Session) of {ok, NSession} -> NChannel = Channel#channel{session = NSession}, handle_out(pubcomp, MsgId, NChannel); @@ -856,7 +851,7 @@ handle_in( ?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId), Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo} ) -> - case emqx_session:pubcomp(ClientInfo, MsgId, Session) of + case emqx_mqttsn_session:pubcomp(ClientInfo, MsgId, Session) of {ok, NSession} -> {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent( Channel#channel{session = NSession} @@ -1093,8 +1088,9 @@ convert_topic_id_to_name({{name, TopicName}, Flags, Data}, Channel) -> {ok, {TopicName, Flags, Data}, Channel}; convert_topic_id_to_name( {{id, TopicId}, Flags, Data}, - Channel = #channel{registry = Registry} + Channel = #channel{session = Session} ) -> + Registry = emqx_mqttsn_session:registry(Session), case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of undefined -> {error, ?SN_RC_INVALID_TOPIC_ID}; @@ -1164,7 +1160,7 @@ do_publish( Msg = #message{qos = ?QOS_2}, Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo} ) -> - case emqx_session:publish(ClientInfo, MsgId, Msg, Session) of + case emqx_mqttsn_session:publish(ClientInfo, MsgId, Msg, Session) of {ok, _PubRes, NSession} -> NChannel1 = ensure_timer( await_timer, @@ -1197,8 +1193,9 @@ preproc_subs_type( TopicName, QoS ), - Channel = #channel{registry = Registry} + Channel = #channel{session = Session} ) -> + Registry = emqx_mqttsn_session:registry(Session), %% If the gateway is able accept the subscription, %% it assigns a topic id to the received topic name %% and returns it within a SUBACK message @@ -1214,7 +1211,8 @@ preproc_subs_type( %% topic name to be sent to the client, see also Section 6.10. {ok, {?SN_INVALID_TOPIC_ID, TopicName, QoS}, Channel}; {ok, TopicId, NRegistry} -> - {ok, {TopicId, TopicName, QoS}, Channel#channel{registry = NRegistry}} + NSession = emqx_mqttsn_session:set_registry(NRegistry, Session), + {ok, {TopicId, TopicName, QoS}, Channel#channel{session = NSession}} end; preproc_subs_type( ?SN_SUBSCRIBE_MSG_TYPE( @@ -1222,8 +1220,9 @@ preproc_subs_type( TopicId, QoS ), - Channel = #channel{registry = Registry} + Channel = #channel{session = Session} ) -> + Registry = emqx_mqttsn_session:registry(Session), case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of undefined -> {error, ?SN_RC_INVALID_TOPIC_ID}; @@ -1301,7 +1300,7 @@ do_subscribe( ) -> NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts), - case emqx_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of + case emqx_mqttsn_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of {ok, NSession} -> {ok, {TopicId, NTopicName, NSubOpts}, Channel#channel{session = NSession}}; {error, ?RC_QUOTA_EXCEEDED} -> @@ -1329,8 +1328,9 @@ preproc_unsub_type( ?SN_PREDEFINED_TOPIC, TopicId ), - Channel = #channel{registry = Registry} + Channel = #channel{session = Session} ) -> + Registry = emqx_mqttsn_session:registry(Session), case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of undefined -> {error, not_found}; @@ -1391,7 +1391,7 @@ do_unsubscribe( SubOpts ), case - emqx_session:unsubscribe( + emqx_mqttsn_session:unsubscribe( ClientInfo, NTopicName, NSubOpts, @@ -1436,9 +1436,9 @@ awake( clientid => ClientId, previous_state => ConnState }), - {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), + {ok, Publishes, Session1} = emqx_mqttsn_session:replay(ClientInfo, Session), {NPublishes, NSession} = - case emqx_session:deliver(ClientInfo, [], Session1) of + case emqx_mqttsn_session:deliver(ClientInfo, [], Session1) of {ok, Session2} -> {Publishes, Session2}; {ok, More, Session2} -> @@ -1466,8 +1466,8 @@ goto_asleep_if_buffered_msgs_sent( } ) -> case - emqx_mqueue:is_empty(emqx_session:info(mqueue, Session)) andalso - emqx_inflight:is_empty(emqx_session:info(inflight, Session)) + emqx_mqueue:is_empty(emqx_mqttsn_session:info(mqueue, Session)) andalso + emqx_inflight:is_empty(emqx_mqttsn_session:info(inflight, Session)) of true -> ?SLOG(info, #{ @@ -1560,7 +1560,7 @@ handle_out( register_inflight = undefined } ) -> - {MsgId, NSession} = emqx_session:obtain_next_pkt_id(Session), + {MsgId, NSession} = emqx_mqttsn_session:obtain_next_pkt_id(Session), Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)}, NChannel = Channel#channel{ session = NSession, @@ -1636,7 +1636,7 @@ maybe_resume_session( resuming = true } ) -> - Subs = emqx_session:info(subscriptions, Session), + Subs = emqx_mqttsn_session:info(subscriptions, Session), case subs_resume() andalso map_size(Subs) =/= 0 of true -> TopicNames = lists:filter(fun(T) -> not emqx_topic:wildcard(T) end, maps:keys(Subs)), @@ -1661,9 +1661,9 @@ resume_or_replay_messages( false -> {[], Channel} end, - {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), + {ok, Publishes, Session1} = emqx_mqttsn_session:replay(ClientInfo, Session), {NPublishes, NSession} = - case emqx_session:deliver(ClientInfo, NPendings, Session1) of + case emqx_mqttsn_session:deliver(ClientInfo, NPendings, Session1) of {ok, Session2} -> {Publishes, Session2}; {ok, More, Session2} -> @@ -1734,7 +1734,7 @@ outgoing_deliver_and_register({Packets, Channel}) -> message_to_packet( MsgId, Message, - #channel{registry = Registry} + #channel{session = Session} ) -> QoS = emqx_message:qos(Message), Topic = emqx_message:topic(Message), @@ -1744,6 +1744,7 @@ message_to_packet( ?QOS_0 -> 0; _ -> MsgId end, + Registry = emqx_mqttsn_session:registry(Session), case emqx_mqttsn_registry:lookup_topic_id(Topic, Registry) of {predef, PredefTopicId} -> Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_PREDEFINED_TOPIC}, @@ -1779,7 +1780,7 @@ handle_call({unsubscribe, Topic}, _From, Channel) -> {ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel), reply_and_update(ok, NChannel); handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> - reply({ok, maps:to_list(emqx_session:info(subscriptions, Session))}, Channel); + reply({ok, maps:to_list(emqx_mqttsn_session:info(subscriptions, Session))}, Channel); handle_call(kick, _From, Channel) -> NChannel = ensure_disconnected(kicked, Channel), shutdown_and_reply(kicked, ok, NChannel); @@ -1800,7 +1801,7 @@ handle_call( pendings = Pendings } ) -> - ok = emqx_session:takeover(Session), + ok = emqx_mqttsn_session:takeover(Session), %% TODO: Should not drain deliver here (side effect) Delivers = emqx_utils:drain_deliver(), AllPendings = lists:append(Delivers, Pendings), @@ -1877,7 +1878,8 @@ handle_info(clean_authz_cache, Channel) -> {ok, Channel}; handle_info({subscribe, _}, Channel) -> {ok, Channel}; -handle_info({register, TopicName}, Channel = #channel{registry = Registry}) -> +handle_info({register, TopicName}, Channel = #channel{session = Session}) -> + Registry = emqx_mqttsn_session:registry(Session), case emqx_mqttsn_registry:reg(TopicName, Registry) of {error, Reason} -> ?SLOG(error, #{ @@ -1887,7 +1889,8 @@ handle_info({register, TopicName}, Channel = #channel{registry = Registry}) -> }), {ok, Channel}; {ok, TopicId, NRegistry} -> - handle_out(register, {TopicId, TopicName}, Channel#channel{registry = NRegistry}) + NSession = emqx_mqttsn_session:set_registry(NRegistry, Session), + handle_out(register, {TopicId, TopicName}, Channel#channel{session = NSession}) end; handle_info(Info, Channel) -> ?SLOG(error, #{ @@ -1954,7 +1957,7 @@ handle_deliver( ConnState =:= disconnected; ConnState =:= asleep -> - NSession = emqx_session:enqueue( + NSession = emqx_mqttsn_session:enqueue( ClientInfo, ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx), Session @@ -1990,7 +1993,7 @@ handle_deliver( } ) -> case - emqx_session:deliver( + emqx_mqttsn_session:deliver( ClientInfo, ignore_local(Delivers, ClientId, Session, Ctx), Session @@ -2008,7 +2011,7 @@ handle_deliver( end. ignore_local(Delivers, Subscriber, Session, Ctx) -> - Subs = emqx_session:info(subscriptions, Session), + Subs = emqx_mqttsn_session:info(subscriptions, Session), lists:filter( fun({deliver, Topic, #message{from = Publisher}}) -> case maps:find(Topic, Subs) of @@ -2083,7 +2086,7 @@ handle_timeout( retry_delivery, Channel = #channel{session = Session, clientinfo = ClientInfo} ) -> - case emqx_session:retry(ClientInfo, Session) of + case emqx_mqttsn_session:retry(ClientInfo, Session) of {ok, NSession} -> {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; {ok, Publishes, Timeout, NSession} -> @@ -2108,7 +2111,7 @@ handle_timeout( expire_awaiting_rel, Channel = #channel{session = Session, clientinfo = ClientInfo} ) -> - case emqx_session:expire(ClientInfo, awaiting_rel, Session) of + case emqx_mqttsn_session:expire(ClientInfo, awaiting_rel, Session) of {ok, NSession} -> {ok, clean_timer(await_timer, Channel#channel{session = NSession})}; {ok, Timeout, NSession} -> @@ -2252,9 +2255,9 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> interval(alive_timer, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); interval(retry_timer, #channel{session = Session}) -> - emqx_session:info(retry_interval, Session); + emqx_mqttsn_session:info(retry_interval, Session); interval(await_timer, #channel{session = Session}) -> - emqx_session:info(await_rel_timeout, Session). + emqx_mqttsn_session:info(await_rel_timeout, Session). %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl index 59ce39d4b..ce3495c52 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_registry.erl @@ -90,29 +90,17 @@ init() -> | {error, term()}. reg( TopicName, - Registry = #{ - next_topic_id := TopicId0, - id_to_name := IdMap, - name_to_id := NameMap - } + Registry ) when is_binary(TopicName) -> case emqx_topic:wildcard(TopicName) of false -> - case maps:find(TopicName, NameMap) of - {ok, TopicId} -> + case lookup_topic_id(TopicName, Registry) of + {predef, TopicId} when is_integer(TopicId) -> {ok, TopicId, Registry}; - error -> - case next_topic_id(TopicId0) of - {error, too_large} -> - {error, too_large}; - NextTopicId -> - NRegistry = Registry#{ - next_topic_id := NextTopicId, - id_to_name := maps:put(NextTopicId, TopicName, IdMap), - name_to_id := maps:put(TopicName, NextTopicId, NameMap) - }, - {ok, NextTopicId, NRegistry} - end + TopicId when is_integer(TopicId) -> + {ok, TopicId, Registry}; + undefined -> + do_reg(TopicName, Registry) end; %% TopicId: in case of “accepted” the value that will be used as topic %% id by the gateway when sending PUBLISH messages to the client (not @@ -122,6 +110,26 @@ reg( {error, wildcard_topic} end. +do_reg( + TopicName, + Registry = #{ + next_topic_id := TopicId0, + id_to_name := IdMap, + name_to_id := NameMap + } +) -> + case next_topic_id(TopicId0) of + {error, too_large} -> + {error, too_large}; + NextTopicId -> + NRegistry = Registry#{ + next_topic_id := NextTopicId, + id_to_name := maps:put(NextTopicId, TopicName, IdMap), + name_to_id := maps:put(TopicName, NextTopicId, NameMap) + }, + {ok, NextTopicId, NRegistry} + end. + next_topic_id(Id) when is_integer(Id) andalso (Id < 16#FFFF) -> Id + 1; next_topic_id(Id) when is_integer(Id) -> diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl new file mode 100644 index 000000000..7c62800cc --- /dev/null +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl @@ -0,0 +1,144 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mqttsn_session). + +-export([registry/1, set_registry/2]). + +-export([ + init/1, + info/1, + info/2, + stats/1, + resume/2 +]). + +-export([ + publish/4, + subscribe/4, + unsubscribe/4, + puback/3, + pubrec/3, + pubrel/3, + pubcomp/3 +]). + +-export([ + replay/2, + deliver/3, + obtain_next_pkt_id/1, + takeover/1, + enqueue/3, + retry/2, + expire/3 +]). + +-type session() :: #{ + registry := emqx_mqttsn_registry:registry(), + session := emqx_session:session() +}. + +-export_type([session/0]). + +init(ClientInfo) -> + Conf = emqx_cm:get_session_confs( + ClientInfo, #{receive_maximum => 1, expiry_interval => 0} + ), + #{ + registry => emqx_mqttsn_registry:init(), + session => emqx_session:init(Conf) + }. + +registry(#{registry := Registry}) -> + Registry. + +set_registry(Registry, Session) -> + Session#{registry := Registry}. + +info(#{session := Session}) -> + emqx_session:info(Session). + +info(Key, #{session := Session}) -> + emqx_session:info(Key, Session). + +stats(#{session := Session}) -> + emqx_session:stats(Session). + +puback(ClientInfo, MsgId, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session). + +pubrec(ClientInfo, MsgId, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session). + +pubrel(ClientInfo, MsgId, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session). + +pubcomp(ClientInfo, MsgId, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session). + +publish(ClientInfo, MsgId, Msg, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, MsgId, Msg], Session). + +subscribe(ClientInfo, Topic, SubOpts, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, Topic, SubOpts], Session). + +unsubscribe(ClientInfo, Topic, SubOpts, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, Topic, SubOpts], Session). + +replay(ClientInfo, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo], Session). + +deliver(ClientInfo, Delivers, Session1) -> + with_sess(?FUNCTION_NAME, [ClientInfo, Delivers], Session1). + +obtain_next_pkt_id(Session = #{session := Sess}) -> + {Id, Sess1} = emqx_session:obtain_next_pkt_id(Sess), + {Id, Session#{session := Sess1}}. + +takeover(_Session = #{session := Sess}) -> + emqx_session:takeover(Sess). + +enqueue(ClientInfo, Delivers, Session = #{session := Sess}) -> + Sess1 = emqx_session:enqueue(ClientInfo, Delivers, Sess), + Session#{session := Sess1}. + +retry(ClientInfo, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo], Session). + +expire(ClientInfo, awaiting_rel, Session) -> + with_sess(?FUNCTION_NAME, [ClientInfo, awaiting_rel], Session). + +resume(ClientInfo, #{session := Sess}) -> + emqx_session:resume(ClientInfo, Sess). + +%%-------------------------------------------------------------------- +%% internal funcs + +with_sess(Fun, Args, Session = #{session := Sess}) -> + case apply(emqx_session, Fun, Args ++ [Sess]) of + %% for subscribe + {error, Reason} -> + {error, Reason}; + %% for pubrel + {ok, Sess1} -> + {ok, Session#{session := Sess1}}; + %% for publish and puback + {ok, Result, Sess1} -> + {ok, Result, Session#{session := Sess1}}; + %% for puback + {ok, Msgs, Replies, Sess1} -> + {ok, Msgs, Replies, Session#{session := Sess1}} + end.