diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 50f9fcf70..1a4a8c1aa 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -782,6 +782,19 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel. -spec(handle_deliver(list(emqx_types:deliver()), channel()) -> {ok, channel()} | {ok, replies(), channel()}). + +handle_deliver(Delivers, Channel = #channel{takeover = true, + pendings = Pendings, + session = Session, + clientinfo = #{clientid := ClientId}}) -> + %% NOTE: Order is important here. While the takeover is in + %% progress, the session cannot enqueue messages, since it already + %% passed on the queue to the new connection in the session state. + NPendings = lists:append( + Pendings, + emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)), + {ok, Channel#channel{pendings = NPendings}}; + handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, session = Session, clientinfo = #{clientid := ClientId}}) -> @@ -793,15 +806,6 @@ handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, maybe_mark_as_delivered(Session, Delivers), {ok, NChannel}; -handle_deliver(Delivers, Channel = #channel{takeover = true, - pendings = Pendings, - session = Session, - clientinfo = #{clientid := ClientId}}) -> - NPendings = lists:append( - Pendings, - emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)), - {ok, Channel#channel{pendings = NPendings}}; - handle_deliver(Delivers, Channel = #channel{session = Session, clientinfo = #{clientid := ClientId} }) ->