diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index f8c510482..d2ac642ca 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -89,6 +89,7 @@ mark_channel_connected/1, mark_channel_disconnected/1, get_connected_client_count/0, + takeover_finish/2, do_kick_session/3, do_get_chan_stats/2, @@ -171,6 +172,7 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}), ok = emqx_cm_registry:register_channel(Chan), mark_channel_connected(ChanPid), + ok = emqx_hooks:run('channel.registered', [ConnMod, ChanPid]), cast({registered, Chan}). %% @doc Unregister a channel. @@ -180,11 +182,13 @@ unregister_channel(ClientId) when is_binary(ClientId) -> ok. %% @private -do_unregister_channel(Chan) -> +do_unregister_channel({_ClientId, ChanPid} = Chan) -> ok = emqx_cm_registry:unregister_channel(Chan), true = ets:delete(?CHAN_CONN_TAB, Chan), true = ets:delete(?CHAN_INFO_TAB, Chan), - ets:delete_object(?CHAN_TAB, Chan). + ets:delete_object(?CHAN_TAB, Chan), + ok = emqx_hooks:run('channel.unregistered', [ChanPid]), + true. -spec connection_closed(emqx_types:clientid()) -> true. connection_closed(ClientId) -> @@ -212,7 +216,7 @@ do_get_chan_info(ClientId, ChanPid) -> -spec get_chan_info(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:infos()). get_chan_info(ClientId, ChanPid) -> - wrap_rpc(emqx_cm_proto_v1:get_chan_info(ClientId, ChanPid)). + wrap_rpc(emqx_cm_proto_v2:get_chan_info(ClientId, ChanPid)). %% @doc Update infos of the channel. -spec set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean(). @@ -242,7 +246,7 @@ do_get_chan_stats(ClientId, ChanPid) -> -spec get_chan_stats(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:stats()). get_chan_stats(ClientId, ChanPid) -> - wrap_rpc(emqx_cm_proto_v1:get_chan_stats(ClientId, ChanPid)). + wrap_rpc(emqx_cm_proto_v2:get_chan_stats(ClientId, ChanPid)). %% @doc Set channel's stats. -spec set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean(). @@ -278,7 +282,7 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> {ok, #{session => Session1, present => false}} end, emqx_cm_locker:trans(ClientId, CleanStart); -open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> +open_session(false, ClientInfo = #{clientid := ClientId}, #{conn_mod := NewConnMod} = ConnInfo) -> Self = self(), ResumeStart = fun(_) -> CreateSess = @@ -304,18 +308,12 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> }}; {living, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - case - request_stepdown( - {takeover, 'end'}, - ConnMod, - ChanPid - ) - of - {ok, Pendings} -> + case wrap_rpc(emqx_cm_proto_v2:takeover_finish(ConnMod, ChanPid)) of + {ok, Pendings, TakoverData} -> Session1 = emqx_persistent_session:persist( ClientInfo, ConnInfo, Session ), - register_channel(ClientId, Self, ConnInfo), + ok = emqx_hooks:run('channel.takeovered', [NewConnMod, Self, TakoverData]), {ok, #{ session => clean_session(Session1), present => true, @@ -400,6 +398,20 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid) end. +takeover_finish(ConnMod, ChanPid) -> + TakoverData = emqx_hooks:run_fold('channel.takeover', [ConnMod, ChanPid], #{}), + case + %% node-local call + request_stepdown( + {takeover, 'end'}, + ConnMod, + ChanPid + ) + of + {ok, Pendings} -> {ok, Pendings, TakoverData}; + {error, _} = Error -> Error + end. + takeover_session(ClientId, Pid) -> try do_takeover_session(ClientId, Pid) @@ -429,7 +441,7 @@ do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> end end; do_takeover_session(ClientId, ChanPid) -> - wrap_rpc(emqx_cm_proto_v1:takeover_session(ClientId, ChanPid)). + wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)). %% @doc Discard all the sessions identified by the ClientId. -spec discard_session(emqx_types:clientid()) -> ok. @@ -531,7 +543,7 @@ do_kick_session(Action, ClientId, ChanPid) -> %% @private This function is shared for session 'kick' and 'discard' (as the first arg Action). kick_session(Action, ClientId, ChanPid) -> try - wrap_rpc(emqx_cm_proto_v1:kick_session(Action, ClientId, ChanPid)) + wrap_rpc(emqx_cm_proto_v2:kick_session(Action, ClientId, ChanPid)) catch Error:Reason -> %% This should mostly be RPC failures. @@ -716,7 +728,7 @@ do_get_chann_conn_mod(ClientId, ChanPid) -> end. get_chann_conn_mod(ClientId, ChanPid) -> - wrap_rpc(emqx_cm_proto_v1:get_chann_conn_mod(ClientId, ChanPid)). + wrap_rpc(emqx_cm_proto_v2:get_chann_conn_mod(ClientId, ChanPid)). mark_channel_connected(ChanPid) -> ?tp(emqx_cm_connected_client_count_inc, #{}), diff --git a/apps/emqx/src/emqx_types.erl b/apps/emqx/src/emqx_types.erl index 7223da245..8b7fdae9b 100644 --- a/apps/emqx/src/emqx_types.erl +++ b/apps/emqx/src/emqx_types.erl @@ -101,6 +101,8 @@ -export_type([oom_policy/0]). +-export_type([takeover_data/0]). + -type proto_ver() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 @@ -242,3 +244,5 @@ max_heap_size => non_neg_integer(), enable => boolean() }. + +-type takeover_data() :: map(). diff --git a/apps/emqx/src/proto/emqx_cm_proto_v2.erl b/apps/emqx/src/proto/emqx_cm_proto_v2.erl new file mode 100644 index 000000000..2981dbd40 --- /dev/null +++ b/apps/emqx/src/proto/emqx_cm_proto_v2.erl @@ -0,0 +1,88 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cm_proto_v2). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + lookup_client/2, + kickout_client/2, + + get_chan_stats/2, + get_chan_info/2, + get_chann_conn_mod/2, + + takeover_session/2, + takeover_finish/2, + kick_session/3 +]). + +-include("bpapi.hrl"). +-include("src/emqx_cm.hrl"). + +introduced_in() -> + "5.0.0". + +-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}. +kickout_client(Node, ClientId) -> + rpc:call(Node, emqx_cm, kick_session, [ClientId]). + +-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) -> + [emqx_cm:channel_info()] | {badrpc, _}. +lookup_client(Node, Key) -> + rpc:call(Node, emqx_cm, lookup_client, [Key]). + +-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:stats() | {badrpc, _}. +get_chan_stats(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:infos() | {badrpc, _}. +get_chan_info(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) -> + module() | undefined | {badrpc, _}. +get_chann_conn_mod(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2). + +-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) -> + none + | {expired | persistent, emqx_session:session()} + | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()} + | {badrpc, _}. +takeover_session(ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2). + +-spec takeover_finish(module(), emqx_cm:chan_pid()) -> + {ok, emqx_type:takeover_data()} + | {ok, list(emqx_type:deliver()), emqx_type:takeover_data()} + | {error, term()} + | {badrpc, _}. +takeover_finish(ConnMod, ChanPid) -> + erpc:call( + node(ChanPid), + emqx_cm, + takeover_session_finish, + [ConnMod, ChanPid], + ?T_TAKEOVER * 2 + ). + +-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}. +kick_session(Action, ClientId, ChanPid) -> + rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2). diff --git a/mix.exs b/mix.exs index 384b08100..92024f48d 100644 --- a/mix.exs +++ b/mix.exs @@ -293,6 +293,7 @@ defmodule EMQXUmbrella.MixProject do emqx_psk: :permanent, emqx_slow_subs: :permanent, emqx_plugins: :permanent, + emqx_ft: :permanent, emqx_mix: :none ] ++ if(enable_quicer?(), do: [quicer: :permanent], else: []) ++