From e6ecc6ca60f274c421dae77c97166ebdff39af6c Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Tue, 7 Dec 2021 14:07:40 +0100 Subject: [PATCH 1/2] fix(emqx_channel): fix race condition in session takeover Sessions must not enqueue messages when another process is taking over the client id, since it already passed on the message queue in the session state. Without this fix, messages arriving after `{takeover, 'begin'} to a channel with no connection (i.e., a persistent session) would be lost. --- apps/emqx/src/emqx_channel.erl | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 50f9fcf70..1a4a8c1aa 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -782,6 +782,19 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel. -spec(handle_deliver(list(emqx_types:deliver()), channel()) -> {ok, channel()} | {ok, replies(), channel()}). + +handle_deliver(Delivers, Channel = #channel{takeover = true, + pendings = Pendings, + session = Session, + clientinfo = #{clientid := ClientId}}) -> + %% NOTE: Order is important here. While the takeover is in + %% progress, the session cannot enqueue messages, since it already + %% passed on the queue to the new connection in the session state. + NPendings = lists:append( + Pendings, + emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)), + {ok, Channel#channel{pendings = NPendings}}; + handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, session = Session, clientinfo = #{clientid := ClientId}}) -> @@ -793,15 +806,6 @@ handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, maybe_mark_as_delivered(Session, Delivers), {ok, NChannel}; -handle_deliver(Delivers, Channel = #channel{takeover = true, - pendings = Pendings, - session = Session, - clientinfo = #{clientid := ClientId}}) -> - NPendings = lists:append( - Pendings, - emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)), - {ok, Channel#channel{pendings = NPendings}}; - handle_deliver(Delivers, Channel = #channel{session = Session, clientinfo = #{clientid := ClientId} }) -> From 14bef1ba312c0260131a8017748c166cffdc8eb1 Mon Sep 17 00:00:00 2001 From: Tobias Lindahl Date: Tue, 7 Dec 2021 16:14:32 +0100 Subject: [PATCH 2/2] refactor: make clauses safe for future reordering --- apps/emqx/src/emqx_channel.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 1a4a8c1aa..126c5e641 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -796,6 +796,7 @@ handle_deliver(Delivers, Channel = #channel{takeover = true, {ok, Channel#channel{pendings = NPendings}}; handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, + takeover = false, session = Session, clientinfo = #{clientid := ClientId}}) -> Delivers1 = maybe_nack(Delivers), @@ -807,6 +808,7 @@ handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, {ok, NChannel}; handle_deliver(Delivers, Channel = #channel{session = Session, + takeover = false, clientinfo = #{clientid := ClientId} }) -> case emqx_session:deliver(emqx_session:ignore_local(Delivers, ClientId, Session), Session) of