diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 68ca21a3b..4678aca89 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -3,6 +3,7 @@ {VSN, [{"4.4.1", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {add_module,emqx_relup}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, @@ -40,6 +41,7 @@ {<<".*">>,[]}], [{"4.4.1", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_session,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {delete_module,emqx_relup}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 4aa953caa..4b06f35a8 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -242,7 +242,8 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> {ok, #{session => Session, present => false}} end, case takeover_session(ClientId) of - {ok, ConnMod, ChanPid, Session} -> + {ok, ConnMod, ChanPid, Session0} -> + Session = emqx_session:upgrade(ClientInfo, Session0), ok = emqx_session:resume(ClientInfo, Session), case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of {ok, Pendings} -> @@ -289,7 +290,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> ConnMod when is_atom(ConnMod) -> case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of {ok, Session} -> - {ok, ConnMod, ChanPid, Session}; + {ok, ConnMod, ChanPid, emqx_session:downgrade(Session)}; {error, Reason} -> {error, Reason} end diff --git a/src/emqx_session.erl b/src/emqx_session.erl index bb5e84ac4..0364bde61 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -83,6 +83,8 @@ -export([ takeover/1 , resume/2 , replay/2 + , upgrade/2 + , downgrade/1 ]). -export([expire/2]). @@ -163,6 +165,7 @@ -ifdef(TEST). -define(GET_CLIENT_ID(C), maps:get(clientid, C, <<>>)). +-export([dummy/0]). -else. -define(GET_CLIENT_ID(C), maps:get(clientid, C)). -endif. @@ -196,6 +199,22 @@ init_mqueue(Zone) -> default_priority => get_env(Zone, mqueue_default_priority, lowest) }). +%% @doc uprade from 4.3 +upgrade(CInfo, S) -> + ?LOG(warning, "upgrading from 4.3", []), + [session | Fields] = tuple_to_list(S), + #session{} = list_to_tuple([session, ?GET_CLIENT_ID(CInfo) | Fields] ++ [#{}]). + +%% @doc Downgrade to 4.3 +downgrade(#session{} = S) -> + ?LOG(warning, "downgrading to 4.3", []), + [session, _ClientID | Fields] = tuple_to_list(S), + list_to_tuple([session | lists:reverse(tl(lists:reverse(Fields)))]). + +-ifdef(TEST). +dummy() -> + #session{}. +-endif. %%-------------------------------------------------------------------- %% Info, Stats %%-------------------------------------------------------------------- diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 4fcaaf473..84b362c0a 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -332,15 +332,17 @@ t_discard_session_race(_) -> t_takeover_session(_) -> #{conninfo := ConnInfo} = ?ChanInfo, {error, not_found} = emqx_cm:takeover_session(<<"clientid">>), + Dummy = emqx_session:dummy(), + Downgraded = emqx_session:downgrade(Dummy), erlang:spawn_link(fun() -> ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), receive {'$gen_call', From, {takeover, 'begin'}} -> - gen_server:reply(From, test), ok + gen_server:reply(From, Dummy), ok end end), timer:sleep(100), - {ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), + {ok, emqx_connection, _, Downgraded} = emqx_cm:takeover_session(<<"clientid">>), emqx_cm:unregister_channel(<<"clientid">>). t_all_channels(_) ->