Merge pull request #6396 from emqx/fix-takeover-race-on-enqueued-messages-v4.3

This commit is contained in:
Tobias Lindahl 2021-12-08 11:01:25 +01:00 committed by GitHub
commit 756a256137
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 7 deletions

View File

@ -720,20 +720,25 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
-spec(handle_deliver(list(emqx_types:deliver()), channel())
-> {ok, channel()} | {ok, replies(), channel()}).
handle_deliver(Delivers, Channel = #channel{takeover = true,
pendings = Pendings,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
%% NOTE: Order is important here. While the takeover is in
%% progress, the session cannot enqueue messages, since it already
%% passed on the queue to the new connection in the session state.
NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
{ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
takeover = false,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
{ok, Channel#channel{session = NSession}};
handle_deliver(Delivers, Channel = #channel{takeover = true,
pendings = Pendings,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
{ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{session = Session,
takeover = false,
clientinfo = #{clientid := ClientId}}) ->
case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
{ok, Publishes, NSession} ->