From 522302fee1c7817049715a200e102a44ded1aa94 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 9 Oct 2023 15:42:36 +0700 Subject: [PATCH] fix(cm): bring back pre-v5.3.0 compat in `takeover_session_begin/1` Which was accidentally broken in bf164175. --- apps/emqx/src/emqx_cm.erl | 15 +++++++++++---- apps/emqx/src/proto/emqx_cm_proto_v2.erl | 4 +++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index cea22652d..1e4940965 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -298,9 +298,9 @@ takeover_session_begin(ClientId) -> takeover_session_begin(ClientId, ChanPid) when is_pid(ChanPid) -> case takeover_session(ClientId, ChanPid) of - {living, ConnMod, Session} -> + {living, ConnMod, ChanPid, Session} -> {ok, Session, {ConnMod, ChanPid}}; - none -> + _ -> none end; takeover_session_begin(_ClientId, undefined) -> @@ -368,13 +368,20 @@ do_takeover_begin(ClientId, ChanPid) when node(ChanPid) == node() -> ConnMod when is_atom(ConnMod) -> case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of {ok, Session} -> - {living, ConnMod, Session}; + {living, ConnMod, ChanPid, Session}; {error, Reason} -> error(Reason) end end; do_takeover_begin(ClientId, ChanPid) -> - wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)). + case wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)) of + %% NOTE: v5.3.0 + {living, ConnMod, Session} -> + {living, ConnMod, ChanPid, Session}; + %% NOTE: other versions + Res -> + Res + end. %% @doc Discard all the sessions identified by the ClientId. -spec discard_session(emqx_types:clientid()) -> ok. diff --git a/apps/emqx/src/proto/emqx_cm_proto_v2.erl b/apps/emqx/src/proto/emqx_cm_proto_v2.erl index 29dec50cd..6ccf596c3 100644 --- a/apps/emqx/src/proto/emqx_cm_proto_v2.erl +++ b/apps/emqx/src/proto/emqx_cm_proto_v2.erl @@ -65,8 +65,10 @@ get_chann_conn_mod(ClientId, ChanPid) -> -spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) -> none - | {expired | persistent, emqx_session:session()} | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()} + %% NOTE: v5.3.0 + | {living, _ConnMod :: atom(), emqx_session:session()} + | {expired | persistent, emqx_session:session()} | {badrpc, _}. takeover_session(ClientId, ChanPid) -> rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2).