diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 51e1ed162..cba9094a2 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -33,8 +33,6 @@ , get_mqtt_conf/2 , get_mqtt_conf/3 , set_conn_state/2 - , get_session/1 - , set_session/2 , stats/1 , caps/1 ]). @@ -180,9 +178,6 @@ info(timers, #channel{timers = Timers}) -> Timers. set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. -get_session(#channel{session = Session}) -> - Session. - set_session(Session, Channel) -> Channel#channel{session = Session}. @@ -369,10 +364,10 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel case emqx_session:puback(PacketId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), - {ok, Channel#channel{session = NSession}}; + {ok, set_session(NSession, Channel)}; {ok, Msg, Publishes, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), - handle_out(publish, Publishes, Channel#channel{session = NSession}); + handle_out(publish, Publishes, set_session(NSession, Channel)); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}), ok = emqx_metrics:inc('packets.puback.inuse'), @@ -388,7 +383,7 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel case emqx_session:pubrec(PacketId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), - NChannel = Channel#channel{session = NSession}, + NChannel = set_session(NSession, Channel), handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> ?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}), @@ -403,7 +398,7 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> case emqx_session:pubrel(PacketId, Session) of {ok, NSession} -> - NChannel = Channel#channel{session = NSession}, + NChannel = set_session(NSession, Channel), handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}), @@ -414,9 +409,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> case emqx_session:pubcomp(PacketId, Session) of {ok, NSession} -> - {ok, Channel#channel{session = NSession}}; + {ok, set_session(NSession, Channel)}; {ok, Publishes, NSession} -> - handle_out(publish, Publishes, Channel#channel{session = NSession}); + handle_out(publish, Publishes, set_session(NSession, Channel)); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ok = emqx_metrics:inc('packets.pubcomp.inuse'), {ok, Channel}; @@ -624,7 +619,8 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, case emqx_session:publish(PacketId, Msg, Session) of {ok, PubRes, NSession} -> RC = puback_reason_code(PubRes), - NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}), + NChannel0 = set_session(NSession, Channel), + NChannel1 = ensure_timer(await_timer, NChannel0), NChannel2 = ensure_quota(PubRes, NChannel1), handle_out(pubrec, {PacketId, RC}, NChannel2); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> @@ -698,7 +694,7 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel), case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of {ok, NSession} -> - {QoS, Channel#channel{session = NSession}}; + {QoS, set_session(NSession, Channel)}; {error, RC} -> ?SLOG(warning, #{ msg => "cannot_subscribe_topic_filter", @@ -728,7 +724,7 @@ do_unsubscribe(TopicFilter, SubOpts, Channel = TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of {ok, NSession} -> - {?RC_SUCCESS, Channel#channel{session = NSession}}; + {?RC_SUCCESS, set_session(NSession, Channel)}; {error, RC} -> {RC, Channel} end. %%-------------------------------------------------------------------- @@ -765,8 +761,11 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel. handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, session = Session, clientinfo = #{clientid := ClientId}}) -> - NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session), - {ok, Channel#channel{session = NSession}}; + Delivers1 = maybe_nack(Delivers), + Delivers2 = ignore_local(Delivers1, ClientId, Session), + NSession = emqx_session:enqueue(Delivers2, Session), + NChannel = set_session(NSession, Channel), + {ok, NChannel}; handle_deliver(Delivers, Channel = #channel{takeover = true, pendings = Pendings, @@ -779,10 +778,10 @@ handle_deliver(Delivers, Channel = #channel{session = Session, clientinfo = #{clientid := ClientId}}) -> case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of {ok, Publishes, NSession} -> - NChannel = Channel#channel{session = NSession}, + NChannel = set_session(NSession, Channel), handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel)); {ok, NSession} -> - {ok, Channel#channel{session = NSession}} + {ok, set_session(NSession, Channel)} end. ignore_local(Delivers, Subscriber, Session) -> @@ -898,13 +897,13 @@ return_connack(AckPacket, Channel) -> case maybe_resume_session(Channel) of ignore -> {ok, Replies, Channel}; {ok, Publishes, NSession} -> - NChannel = Channel#channel{session = NSession, - resuming = false, + NChannel0 = Channel#channel{resuming = false, pendings = [] }, - {Packets, NChannel1} = do_deliver(Publishes, NChannel), + NChannel1 = set_session(NSession, NChannel0), + {Packets, NChannel2} = do_deliver(Publishes, NChannel1), Outgoing = [{outgoing, Packets} || length(Packets) > 0], - {ok, Replies ++ Outgoing, NChannel1} + {ok, Replies ++ Outgoing, NChannel2} end. %%-------------------------------------------------------------------- @@ -1063,9 +1062,9 @@ handle_timeout(_TRef, retry_delivery, Channel = #channel{session = Session}) -> case emqx_session:retry(Session) of {ok, NSession} -> - {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; + {ok, clean_timer(retry_timer, set_session(NSession, Channel))}; {ok, Publishes, Timeout, NSession} -> - NChannel = Channel#channel{session = NSession}, + NChannel = set_session(NSession, Channel), handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) end; @@ -1076,9 +1075,9 @@ handle_timeout(_TRef, expire_awaiting_rel, Channel = #channel{session = Session}) -> case emqx_session:expire(awaiting_rel, Session) of {ok, NSession} -> - {ok, clean_timer(await_timer, Channel#channel{session = NSession})}; + {ok, clean_timer(await_timer, set_session(NSession, Channel))}; {ok, Timeout, NSession} -> - {ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})} + {ok, reset_timer(await_timer, Timeout, set_session(NSession, Channel))} end; handle_timeout(_TRef, expire_session, Channel) ->