fix(emqx_channel): fix race condition in session takeover
Sessions must not enqueue messages when another process is taking over the client id, since it already passed on the message queue in the session state. Without this fix, messages arriving after `{takeover, 'begin'} to a channel with no connection (i.e., a persistent session) would be lost.
This commit is contained in:
parent
4eef9a5bac
commit
e6ecc6ca60
|
@ -782,6 +782,19 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
|
||||||
|
|
||||||
-spec(handle_deliver(list(emqx_types:deliver()), channel())
|
-spec(handle_deliver(list(emqx_types:deliver()), channel())
|
||||||
-> {ok, channel()} | {ok, replies(), channel()}).
|
-> {ok, channel()} | {ok, replies(), channel()}).
|
||||||
|
|
||||||
|
handle_deliver(Delivers, Channel = #channel{takeover = true,
|
||||||
|
pendings = Pendings,
|
||||||
|
session = Session,
|
||||||
|
clientinfo = #{clientid := ClientId}}) ->
|
||||||
|
%% NOTE: Order is important here. While the takeover is in
|
||||||
|
%% progress, the session cannot enqueue messages, since it already
|
||||||
|
%% passed on the queue to the new connection in the session state.
|
||||||
|
NPendings = lists:append(
|
||||||
|
Pendings,
|
||||||
|
emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)),
|
||||||
|
{ok, Channel#channel{pendings = NPendings}};
|
||||||
|
|
||||||
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
|
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
|
||||||
session = Session,
|
session = Session,
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
clientinfo = #{clientid := ClientId}}) ->
|
||||||
|
@ -793,15 +806,6 @@ handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
|
||||||
maybe_mark_as_delivered(Session, Delivers),
|
maybe_mark_as_delivered(Session, Delivers),
|
||||||
{ok, NChannel};
|
{ok, NChannel};
|
||||||
|
|
||||||
handle_deliver(Delivers, Channel = #channel{takeover = true,
|
|
||||||
pendings = Pendings,
|
|
||||||
session = Session,
|
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
|
||||||
NPendings = lists:append(
|
|
||||||
Pendings,
|
|
||||||
emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)),
|
|
||||||
{ok, Channel#channel{pendings = NPendings}};
|
|
||||||
|
|
||||||
handle_deliver(Delivers, Channel = #channel{session = Session,
|
handle_deliver(Delivers, Channel = #channel{session = Session,
|
||||||
clientinfo = #{clientid := ClientId}
|
clientinfo = #{clientid := ClientId}
|
||||||
}) ->
|
}) ->
|
||||||
|
|
Loading…
Reference in New Issue