Merge pull request #7340 from zmstone/fix-session-record-backward-compatibility

fix(session): compatible to 4.3 takeover
This commit is contained in:
Zaiming (Stone) Shi 2022-03-20 07:22:16 +01:00 committed by GitHub
commit 145b3a363c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 28 additions and 4 deletions

View File

@ -3,6 +3,7 @@
{VSN,
[{"4.4.1",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{add_module,emqx_relup},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
@ -40,6 +41,7 @@
{<<".*">>,[]}],
[{"4.4.1",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]},
{delete_module,emqx_relup},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},

View File

@ -242,7 +242,8 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
{ok, #{session => Session, present => false}}
end,
case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} ->
{ok, ConnMod, ChanPid, Session0} ->
Session = emqx_session:upgrade(ClientInfo, Session0),
ok = emqx_session:resume(ClientInfo, Session),
case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of
{ok, Pendings} ->
@ -289,7 +290,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
ConnMod when is_atom(ConnMod) ->
case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
{ok, Session} ->
{ok, ConnMod, ChanPid, Session};
{ok, ConnMod, ChanPid, emqx_session:downgrade(Session)};
{error, Reason} ->
{error, Reason}
end

View File

@ -83,6 +83,8 @@
-export([ takeover/1
, resume/2
, replay/2
, upgrade/2
, downgrade/1
]).
-export([expire/2]).
@ -163,6 +165,7 @@
-ifdef(TEST).
-define(GET_CLIENT_ID(C), maps:get(clientid, C, <<>>)).
-export([dummy/0]).
-else.
-define(GET_CLIENT_ID(C), maps:get(clientid, C)).
-endif.
@ -196,6 +199,22 @@ init_mqueue(Zone) ->
default_priority => get_env(Zone, mqueue_default_priority, lowest)
}).
%% @doc uprade from 4.3
upgrade(CInfo, S) ->
?LOG(warning, "upgrading from 4.3", []),
[session | Fields] = tuple_to_list(S),
#session{} = list_to_tuple([session, ?GET_CLIENT_ID(CInfo) | Fields] ++ [#{}]).
%% @doc Downgrade to 4.3
downgrade(#session{} = S) ->
?LOG(warning, "downgrading to 4.3", []),
[session, _ClientID | Fields] = tuple_to_list(S),
list_to_tuple([session | lists:reverse(tl(lists:reverse(Fields)))]).
-ifdef(TEST).
dummy() ->
#session{}.
-endif.
%%--------------------------------------------------------------------
%% Info, Stats
%%--------------------------------------------------------------------

View File

@ -332,15 +332,17 @@ t_discard_session_race(_) ->
t_takeover_session(_) ->
#{conninfo := ConnInfo} = ?ChanInfo,
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
Dummy = emqx_session:dummy(),
Downgraded = emqx_session:downgrade(Dummy),
erlang:spawn_link(fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo),
receive
{'$gen_call', From, {takeover, 'begin'}} ->
gen_server:reply(From, test), ok
gen_server:reply(From, Dummy), ok
end
end),
timer:sleep(100),
{ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
{ok, emqx_connection, _, Downgraded} = emqx_cm:takeover_session(<<"clientid">>),
emqx_cm:unregister_channel(<<"clientid">>).
t_all_channels(_) ->