diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index c8296f317..66c1db36e 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -602,14 +602,14 @@ all_channels() -> ets:select(?CHAN_TAB, Pat). %% @doc Get clientinfo for all clients with sessions -channel_with_session_table(ConnModules) -> +channel_with_session_table(ConnModuleList) -> Ms = ets:fun2ms( fun({{ClientId, _ChanPid}, Info, _Stats}) -> {ClientId, Info} end ), Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]), - ConnModuleMap = maps:from_list([{Mod, true} || Mod <- ConnModules]), + ConnModules = sets:from_list(ConnModuleList, [{version, 2}]), qlc:q([ {ClientId, ConnState, ConnInfo, ClientInfo} || {ClientId, #{ @@ -618,7 +618,7 @@ channel_with_session_table(ConnModules) -> conninfo := #{clean_start := false, conn_mod := ConnModule} = ConnInfo }} <- Table, - maps:is_key(ConnModule, ConnModuleMap) + sets:is_element(ConnModule, ConnModules) ]). %% @doc Get all local connection query handle diff --git a/apps/emqx_utils/src/emqx_utils_api.erl b/apps/emqx_utils/src/emqx_utils_api.erl index e6bd07272..a1bc97cd6 100644 --- a/apps/emqx_utils/src/emqx_utils_api.erl +++ b/apps/emqx_utils/src/emqx_utils_api.erl @@ -72,4 +72,6 @@ is_running_node(Node) -> handle_result({ok, Result}) -> ?OK(Result); handle_result({error, Reason}) -> - ?BAD_REQUEST(Reason). + ?BAD_REQUEST(Reason); +handle_result({HTTPCode, Content}) when is_integer(HTTPCode) -> + {HTTPCode, Content}. diff --git a/lib-ee/emqx_eviction_agent/i18n/emqx_eviction_agent_api_i18n.conf b/lib-ee/emqx_eviction_agent/i18n/emqx_eviction_agent_api_i18n.conf deleted file mode 100644 index 8bb7282c3..000000000 --- a/lib-ee/emqx_eviction_agent/i18n/emqx_eviction_agent_api_i18n.conf +++ /dev/null @@ -1,14 +0,0 @@ -emqx_eviction_agent_api { - - node_eviction_status_get { - desc { - en: "Get the node eviction status" - zh: "获取节点驱逐状态" - } - label { - en: "Node Eviction Status" - zh: "节点驱逐状态" - } - } - -} diff --git a/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent.erl b/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent.erl index b8e1b5236..9a29adc69 100644 --- a/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent.erl +++ b/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent.erl @@ -48,7 +48,9 @@ -export_type([server_reference/0]). --define(CONN_MODULES, [emqx_connection, emqx_ws_connection, emqx_eviction_agent_channel]). +-define(CONN_MODULES, [ + emqx_connection, emqx_ws_connection, emqx_quic_connection, emqx_eviction_agent_channel +]). %%-------------------------------------------------------------------- %% APIs diff --git a/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent_app.erl b/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent_app.erl index 63af59b09..90b09884f 100644 --- a/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent_app.erl +++ b/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent_app.erl @@ -6,8 +6,6 @@ -behaviour(application). --emqx_plugin(?MODULE). - -export([ start/2, stop/1 diff --git a/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl b/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl index a42033c0f..a6097f03d 100644 --- a/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl +++ b/lib-ee/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl @@ -13,8 +13,6 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --logger_header("[Evicted Channel]"). - -export([ start_link/1, start_supervised/1, @@ -33,13 +31,6 @@ code_change/3 ]). --import( - emqx_misc, - [ - maybe_apply/2 - ] -). - -type opts() :: #{ conninfo := emqx_types:conninfo(), clientinfo := emqx_types:clientinfo() @@ -133,7 +124,7 @@ handle_call( ) -> ok = emqx_session:takeover(Session), %% TODO: Should not drain deliver here (side effect) - Delivers = emqx_misc:drain_deliver(), + Delivers = emqx_utils:drain_deliver(), AllPendings = lists:append(Delivers, Pendings), ?tp( debug, @@ -156,7 +147,7 @@ handle_call(Req, _From, Channel) -> {reply, ignored, Channel}. handle_info(Deliver = {deliver, _Topic, _Msg}, Channel) -> - Delivers = [Deliver | emqx_misc:drain_deliver()], + Delivers = [Deliver | emqx_utils:drain_deliver()], {noreply, handle_deliver(Delivers, Channel)}; handle_info(expire_session, Channel) -> {stop, expired, Channel}; @@ -186,7 +177,6 @@ code_change(_OldVsn, Channel, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -%% TODO: sync with emqx_channel handle_deliver( Delivers, #{ @@ -239,7 +229,7 @@ set_expiry_timer(#{conninfo := ConnInfo} = Channel) -> open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) -> Channel = channel(ConnInfo, ClientInfo), - case emqx_cm:open_session(false, ClientInfo, ConnInfo) of + case emqx_cm:open_session(_CleanSession = false, ClientInfo, ConnInfo) of {ok, #{present := false}} -> ?SLOG( info, @@ -259,7 +249,7 @@ open_session(ConnInfo, #{clientid := ClientId} = ClientInfo) -> node => node() } ), - Pendings1 = lists:usort(lists:append(Pendings0, emqx_misc:drain_deliver())), + Pendings1 = lists:usort(lists:append(Pendings0, emqx_utils:drain_deliver())), NSession = emqx_session:enqueue( ClientInfo, emqx_session:ignore_local( @@ -352,7 +342,7 @@ info(Channel) -> #{ conninfo => maps:get(conninfo, Channel, undefined), clientinfo => maps:get(clientinfo, Channel, undefined), - session => maybe_apply( + session => emqx_utils:maybe_apply( fun emqx_session:info/1, maps:get(session, Channel, undefined) ), diff --git a/lib-ee/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl b/lib-ee/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl index 0574ccec3..22b694d77 100644 --- a/lib-ee/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl +++ b/lib-ee/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl @@ -362,13 +362,77 @@ t_will_msg(_Config) -> ok = emqtt:disconnect(C). +t_ws_conn(_Config) -> + erlang:process_flag(trap_exit, true), + + ClientId = <<"ws_client">>, + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {port, 8083}, + {ws_path, "/mqtt"} + ]), + {ok, _} = emqtt:ws_connect(C), + + ok = emqx_eviction_agent:enable(test_eviction, undefined), + + ?assertEqual( + 1, + emqx_eviction_agent:connection_count() + ), + + ?assertWaitEvent( + ok = emqx_eviction_agent:evict_connections(1), + #{?snk_kind := emqx_cm_connected_client_count_dec}, + 1000 + ), + + ?assertEqual( + 0, + emqx_eviction_agent:connection_count() + ). + +-ifndef(BUILD_WITHOUT_QUIC). + +t_quic_conn(_Config) -> + erlang:process_flag(trap_exit, true), + + QuicPort = emqx_common_test_helpers:select_free_port(quic), + application:ensure_all_started(quicer), + emqx_common_test_helpers:ensure_quic_listener(?MODULE, QuicPort), + + ClientId = <<"quic_client">>, + {ok, C} = emqtt:start_link([ + {proto_ver, v5}, + {clientid, ClientId}, + {port, QuicPort} + ]), + {ok, _} = emqtt:quic_connect(C), + + ok = emqx_eviction_agent:enable(test_eviction, undefined), + + ?assertEqual( + 1, + emqx_eviction_agent:connection_count() + ), + + ?assertWaitEvent( + ok = emqx_eviction_agent:evict_connections(1), + #{?snk_kind := emqx_cm_connected_client_count_dec}, + 1000 + ), + + ?assertEqual( + 0, + emqx_eviction_agent:connection_count() + ). + +-endif. + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- -% sn_connect_and_subscribe(ClientId, Topic) -> -% emqx_eviction_agent_test_helpers:sn_connect_and_subscribe(ClientId, Topic). - assert_receive_publish([]) -> ok; assert_receive_publish([#{payload := Msg, topic := Topic} | Rest]) -> diff --git a/lib-ee/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl b/lib-ee/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl index 8f88ebf97..3953ec3e2 100644 --- a/lib-ee/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl +++ b/lib-ee/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl @@ -81,12 +81,11 @@ start_cluster(NamesWithPorts, Apps, Env) -> NamesWithPorts ), Opts0 = [ - {env, [{emqx, boot_modules, [broker, listeners]}]}, + {env, [{emqx, boot_modules, [broker, listeners]}] ++ Env}, {apps, Apps}, {conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]] ++ - [{[rpc, mode], async}]}, - {env, Env} + [{[rpc, mode], async}]} ], Cluster = emqx_common_test_helpers:emqx_cluster( Specs, @@ -99,12 +98,6 @@ start_cluster(NamesWithPorts, Apps, Env) -> } || {Name, Opts} <- Cluster ], - ok = lists:foreach( - fun({Node, _Port}) -> - snabbkaffe:forward_trace(Node) - end, - NodesWithPorts - ), NodesWithPorts. stop_cluster(NodesWithPorts, Apps) -> diff --git a/lib-ee/emqx_node_rebalance/i18n/emqx_node_rebalance_api_i18n.conf b/lib-ee/emqx_node_rebalance/i18n/emqx_node_rebalance_api_i18n.conf deleted file mode 100644 index f5f161a92..000000000 --- a/lib-ee/emqx_node_rebalance/i18n/emqx_node_rebalance_api_i18n.conf +++ /dev/null @@ -1,490 +0,0 @@ -emqx_node_rebalance_api { - - ## API Request Fields - - load_rebalance_status { - desc { - en: "Get rebalance status of the current node" - zh: "获取当前节点的rebalance状态" - } - label { - en: "Get rebalance status" - zh: "获取rebalance状态" - } - } - - load_rebalance_global_status { - desc { - en: "Get status of all rebalance/evacuation processes across the cluster" - zh: "获取集群中所有rebalance/evacuation进程的状态" - } - label { - en: "Get global rebalance status" - zh: "获取全局rebalance状态" - } - } - - load_rebalance_availability_check { - desc { - en: "Check if the node is being evacuated or rebalanced" - zh: "检查节点是否正在被evacuate或rebalance" - } - label { - en: "Availability check" - zh: "可用性检查" - } - } - - load_rebalance_start { - desc { - en: "Start rebalance process" - zh: "启动rebalance进程" - } - label { - en: "Start rebalance" - zh: "启动rebalance" - } - } - - load_rebalance_stop { - desc { - en: "Stop rebalance process" - zh: "停止rebalance进程" - } - label { - en: "Stop rebalance" - zh: "停止rebalance" - } - } - - load_rebalance_evacuation_start { - desc { - en: "Start evacuation process" - zh: "启动evacuation进程" - } - label { - en: "Start evacuation" - zh: "启动evacuation" - } - } - - load_rebalance_evacuation_stop { - desc { - en: "Stop evacuation process" - zh: "停止evacuation进程" - } - label { - en: "Stop evacuation" - zh: "停止evacuation" - } - } - - param_node { - desc { - en: "Node name" - zh: "节点名称" - } - label { - en: "Node name" - zh: "节点名称" - } - } - - wait_health_check { - desc { - en: "Time to wait before starting the rebalance process, in seconds" - zh: "启动rebalance进程前等待的时间,单位为秒" - } - label { - en: "Wait health check" - zh: "等待健康检查" - } - } - - conn_evict_rate { - desc { - en: "The rate of evicting connections, in connections per second" - zh: "逐出连接的速率,以每秒连接数表示" - } - label { - en: "Connection eviction rate" - zh: "连接驱逐率" - } - } - - sess_evict_rate { - desc { - en: "The rate of evicting sessions, in sessions per second" - zh: "逐出会话的速率,以每秒会话为单位" - } - label { - en: "Session eviction rate" - zh: "会话驱逐率" - } - } - - abs_conn_threshold { - desc { - en: "Maximum desired difference between the number of connections on the node and the average number of connections on the recipient nodes" - zh: "节点上的连接数与接收节点上的平均连接数之间的最大期望差值" - } - label { - en: "Absolute connection threshold" - zh: "绝对连接阈值" - } - } - - rel_conn_threshold { - desc { - en: "Maximum desired fraction between the number of connections on the node and the average number of connections on the recipient nodes" - zh: "节点上的连接数与接收节点上的平均连接数之间的最大期望分数" - } - label { - en: "Relative connection threshold" - zh: "相对连接阈值" - } - } - - abs_sess_threshold { - desc { - en: "Maximum desired difference between the number of sessions on the node and the average number of sessions on the recipient nodes" - zh: "节点上的会话数与接收节点上的平均会话数之间的最大期望差异" - } - label { - en: "Absolute session threshold" - zh: "绝对会话阈值" - } - } - - rel_sess_threshold { - desc { - en: "Maximum desired fraction between the number of sessions on the node and the average number of sessions on the recipient nodes" - zh: "节点上的会话数与接收节点上的平均会话数之间的最大期望分数" - } - label { - en: "Relative session threshold" - zh: "相对会话阈值" - } - } - - wait_takeover { - desc { - en: "Time to wait before starting session evacuation process, in seconds" - zh: "开始会话疏散过程之前等待的时间,以秒为单位" - } - label { - en: "Wait takeover" - zh: "等待接管" - } - } - - redirect_to { - desc { - en: "Server reference to redirect clients to (MQTTv5 Server redirection)" - zh: "将客户端重定向到的服务器参考(MQTTv5 服务器重定向)" - } - label { - en: "Redirect to" - zh: "重定向至" - } - } - - migrate_to { - desc { - en: "Nodes to migrate sessions to" - zh: "将会话迁移到的节点" - } - label { - en: "Migrate to" - zh: "迁移到" - } - } - - rebalance_nodes { - desc { - en: "Nodes to participate in rebalance" - zh: "参与rebalance的节点" - } - label { - en: "Rebalance nodes" - zh: "重新平衡节点" - } - } - - ## API Response Fields - - local_status_enabled { - desc { - en: "Whether the node is being evacuated" - zh: "节点是否正在撤离" - } - label { - en: "Local evacuation status" - zh: "当地避难状况" - } - } - - local_status_process { - desc { - en: "The process that is being performed on the node: evacuation or rebalance" - zh: "正在节点上执行的过程:疏散或重新平衡" - } - label { - en: "Node process" - zh: "节点进程" - } - } - - local_status_state { - desc { - en: "The state of the process that is being performed on the node" - zh: "正在节点上执行的进程的状态" - } - label { - en: "Rebalance/evacuation current state" - zh: "重新平衡/疏散当前状态" - } - } - - local_status_coordinator_node { - desc { - en: "The node that is coordinating rebalance process" - zh: "协调再平衡过程的节点" - } - label { - en: "Coordinator node" - zh: "协调节点" - } - } - - local_status_connection_eviction_rate { - desc { - en: "The rate of evicting connections, in connections per second" - zh: "逐出连接的速率,以每秒连接数表示" - } - label { - en: "Connection eviction rate" - zh: "连接驱逐率" - } - } - - local_status_session_eviction_rate { - desc { - en: "The rate of evicting sessions, in sessions per second" - zh: "逐出会话的速率,以每秒会话为单位" - } - label { - en: "Session eviction rate" - zh: "会话驱逐率" - } - } - - local_status_connection_goal { - desc { - en: "The number of connections that the node should have after the rebalance/evacuation process" - zh: "节点在重新平衡/疏散过程后应该拥有的连接数" - } - label { - en: "Connection goal" - zh: "连接目标" - } - } - - local_status_session_goal { - desc { - en: "The number of sessions that the node should have after the evacuation process" - zh: "疏散过程后节点应有的会话数" - } - label { - en: "Session goal" - zh: "会话目标" - } - } - - local_status_disconnected_session_goal { - desc { - en: "The number of disconnected sessions that the node should have after the rebalance process" - zh: "重新平衡过程后节点应具有的断开连接的会话数" - } - label { - en: "Disconnected session goal" - zh: "断开连接的会话目标" - } - } - - local_status_session_recipients { - desc { - en: "List of nodes to which sessions are being evacuated" - zh: "会话被疏散到的节点列表" - } - label { - en: "Session recipients" - zh: "会话收件人" - } - } - - local_status_recipients { - desc { - en: "List of nodes to which connections/sessions are being evacuated during rebalance" - zh: "在重新平衡期间连接/会话被疏散到的节点列表" - } - label { - en: "Recipients" - zh: "收件人" - } - } - - local_status_stats { - desc { - en: "Statistics of the evacuation/rebalance process" - zh: "疏散/再平衡过程的统计" - } - label { - en: "Statistics" - zh: "统计数据" - } - } - - status_stats_initial_connected { - desc { - en: "The number of connections on the node before the evacuation/rebalance process" - zh: "疏散/重新平衡过程之前节点上的连接数" - } - label { - en: "Initial connected" - zh: "初始连接" - } - } - - status_stats_current_connected { - desc { - en: "Current number of connections on the node" - zh: "节点上的当前连接数" - } - label { - en: "Current connections" - zh: "当前连接" - } - } - - status_stats_initial_sessions { - desc { - en: "The number of sessions on the node before the evacuation/rebalance process" - zh: "疏散/重新平衡过程之前节点上的会话数" - } - label { - en: "Initial sessions" - zh: "初始会话" - } - } - - status_stats_current_sessions { - desc { - en: "Current number of sessions on the node" - zh: "节点上的当前会话数" - } - label { - en: "Current sessions" - zh: "当前会话" - } - } - - status_stats_current_disconnected_sessions { - desc { - en: "Current number of disconnected sessions on the node" - zh: "节点上当前断开连接的会话数" - } - label { - en: "Current disconnected sessions" - zh: "当前断开连接的会话" - } - } - - coordinator_status_donors { - desc { - en: "List of nodes from which connections/sessions are being evacuated" - zh: "正在疏散连接/会话的节点列表" - } - label { - en: "Donors" - zh: "捐助者" - } - } - - coordinator_status_donor_conn_avg { - desc { - en: "Average number of connections per donor node" - zh: "每个供体节点的平均连接数" - } - label { - en: "Donor connections average" - zh: "捐助者连接平均值" - } - } - - coordinator_status_donor_sess_avg { - desc { - en: "Average number of sessions per donor node" - zh: "每个供体节点的平均会话数" - } - label { - en: "Donor sessions average" - zh: "平均捐助会议" - } - } - - coordinator_status_node { - desc { - en: "The node that is coordinating the evacuation/rebalance process" - zh: "协调疏散/再平衡过程的节点" - } - label { - en: "Coordinator node" - zh: "协调节点" - } - } - - evacuation_status_node { - desc { - en: "The node that is being evacuated" - zh: "正在撤离的节点" - } - label { - en: "Evacuated node" - zh: "疏散节点" - } - } - - global_status_evacuations { - desc { - en: "List of nodes that are being evacuated" - zh: "正在撤离的节点列表" - } - label { - en: "Evacuations" - zh: "疏散" - } - } - - global_status_rebalances { - desc { - en: "List of nodes that coordinate a rebalance" - zh: "协调再平衡的节点列表" - } - label { - en: "Rebalances" - zh: "再平衡" - } - } - - empty_response { - desc { - en: "The response is empty" - zh: "响应为空" - } - label { - en: "Empty response" - zh: "空响应" - } - } -} diff --git a/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_api.erl b/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_api.erl index fa322d146..1f6328a63 100644 --- a/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_api.erl +++ b/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_api.erl @@ -8,6 +8,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_utils/include/emqx_utils_api.hrl"). %% Swagger specs from hocon schema -export([ @@ -44,9 +45,9 @@ -import(emqx_dashboard_swagger, [error_codes/2]). -define(BAD_REQUEST, 'BAD_REQUEST'). --define(NODE_UNAVAILABLE, 'NODE_UNAVAILABLE'). -define(NODE_EVACUATING, 'NODE_EVACUATING'). -define(RPC_ERROR, 'RPC_ERROR'). +-define(NOT_FOUND, 'NOT_FOUND'). %%-------------------------------------------------------------------- %% API Spec @@ -120,7 +121,8 @@ schema("/load_rebalance/:node/start") -> ), responses => #{ 200 => response_schema(), - 400 => error_codes([?BAD_REQUEST, ?NODE_UNAVAILABLE], <<"Bad Request">>) + 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), + 404 => error_codes([?NOT_FOUND], <<"Not Found">>) } } }; @@ -134,7 +136,8 @@ schema("/load_rebalance/:node/stop") -> parameters => [param_node()], responses => #{ 200 => response_schema(), - 400 => error_codes([?BAD_REQUEST, ?NODE_UNAVAILABLE], <<"Bad Request">>) + 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), + 404 => error_codes([?NOT_FOUND], <<"Not Found">>) } } }; @@ -153,7 +156,8 @@ schema("/load_rebalance/:node/evacuation/start") -> ), responses => #{ 200 => response_schema(), - 400 => error_codes([?BAD_REQUEST, ?NODE_UNAVAILABLE], <<"Bad Request">>) + 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), + 404 => error_codes([?NOT_FOUND], <<"Not Found">>) } } }; @@ -167,7 +171,8 @@ schema("/load_rebalance/:node/evacuation/stop") -> parameters => [param_node()], responses => #{ 200 => response_schema(), - 400 => error_codes([?BAD_REQUEST, ?NODE_UNAVAILABLE], <<"Bad Request">>) + 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), + 404 => error_codes([?NOT_FOUND], <<"Not Found">>) } } }. @@ -205,7 +210,7 @@ schema("/load_rebalance/:node/evacuation/stop") -> end. '/load_rebalance/:node/start'(post, #{bindings := #{node := NodeBin}, body := Params0}) -> - with_node(NodeBin, fun(Node) -> + emqx_utils_api:with_node(NodeBin, fun(Node) -> Params1 = translate(rebalance_start, Params0), with_nodes_at_key(nodes, Params1, fun(Params2) -> wrap_rpc( @@ -215,7 +220,7 @@ schema("/load_rebalance/:node/evacuation/stop") -> end). '/load_rebalance/:node/stop'(post, #{bindings := #{node := NodeBin}}) -> - with_node(NodeBin, fun(Node) -> + emqx_utils_api:with_node(NodeBin, fun(Node) -> wrap_rpc( Node, emqx_node_rebalance_api_proto_v1:node_rebalance_stop(Node) ) @@ -224,7 +229,7 @@ schema("/load_rebalance/:node/evacuation/stop") -> '/load_rebalance/:node/evacuation/start'(post, #{ bindings := #{node := NodeBin}, body := Params0 }) -> - with_node(NodeBin, fun(Node) -> + emqx_utils_api:with_node(NodeBin, fun(Node) -> Params1 = translate(rebalance_evacuation_start, Params0), with_nodes_at_key(migrate_to, Params1, fun(Params2) -> wrap_rpc( @@ -237,7 +242,7 @@ schema("/load_rebalance/:node/evacuation/stop") -> end). '/load_rebalance/:node/evacuation/stop'(post, #{bindings := #{node := NodeBin}}) -> - with_node(NodeBin, fun(Node) -> + emqx_utils_api:with_node(NodeBin, fun(Node) -> wrap_rpc( Node, emqx_node_rebalance_api_proto_v1:node_rebalance_evacuation_stop(Node) ) @@ -288,19 +293,13 @@ validate_nodes(Key, Params) when is_map_key(Key, Params) -> validate_nodes(_Key, Params) -> {ok, Params}. -with_node(BinNode, Fun) -> - case parse_node(BinNode) of - {ok, Node} -> Fun(Node); - {error, _} -> error_response(400, ?BAD_REQUEST, [<<"Invalid node: ">>, BinNode]) - end. - with_nodes_at_key(Key, Params, Fun) -> Res = validate_nodes(Key, Params), case Res of {ok, Params1} -> Fun(Params1); {error, {unavailable, Nodes}} -> - error_response(400, ?NODE_UNAVAILABLE, io_lib:format("Nodes unavailable: ~p", [Nodes])); + error_response(400, ?NOT_FOUND, io_lib:format("Nodes unavailable: ~p", [Nodes])); {error, {invalid, Nodes}} -> error_response(400, ?BAD_REQUEST, io_lib:format("Invalid nodes: ~p", [Nodes])) end. @@ -322,10 +321,7 @@ format_as_map_list(List) -> ). error_response(HttpCode, Code, Message) -> - {HttpCode, #{ - code => atom_to_binary(Code), - message => iolist_to_binary(Message) - }}. + {HttpCode, ?ERROR_MSG(Code, Message)}. without(Keys, Props) -> lists:filter( @@ -470,11 +466,10 @@ fields(rebalance_evacuation_start) -> )}, {"migrate_to", mk( - list(binary()), + nonempty_list(binary()), #{ desc => ?DESC(migrate_to), - required => false, - validator => [fun(Values) -> length(Values) > 0 end] + required => false } )} ]; diff --git a/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl b/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl index a2706f13b..3bafb9ffe 100644 --- a/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl +++ b/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_cli.erl @@ -43,7 +43,7 @@ cli(["start" | StartArgs]) -> false end; cli(["node-status", NodeStr]) -> - case emqx_misc:safe_to_existing_atom(NodeStr, utf8) of + case emqx_utils:safe_to_existing_atom(NodeStr, utf8) of {ok, Node} -> node_status(emqx_node_rebalance_status:local_status(Node)); {error, _} -> @@ -297,7 +297,7 @@ strings_to_atoms(Strings) -> strings_to_atoms([], Atoms, Invalid) -> {lists:reverse(Atoms), lists:reverse(Invalid)}; strings_to_atoms([Str | Rest], Atoms, Invalid) -> - case emqx_misc:safe_to_existing_atom(Str, utf8) of + case emqx_utils:safe_to_existing_atom(Str, utf8) of {ok, Atom} -> strings_to_atoms(Rest, [Atom | Atoms], Invalid); {error, _} -> diff --git a/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_evacuation_persist.erl b/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_evacuation_persist.erl index 3fc9faeea..6b145c699 100644 --- a/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_evacuation_persist.erl +++ b/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_evacuation_persist.erl @@ -55,7 +55,7 @@ save( Filepath = evacuation_filepath(), case filelib:ensure_dir(Filepath) of ok -> - JsonData = emqx_json:encode( + JsonData = emqx_utils_json:encode( prepare_for_encode(maps:with(persist_keys(), Data)), [pretty] ), @@ -72,7 +72,7 @@ clear() -> read(DefaultOpts) -> case file:read_file(evacuation_filepath()) of {ok, Data} -> - case emqx_json:safe_decode(Data, [return_maps]) of + case emqx_utils_json:safe_decode(Data, [return_maps]) of {ok, Map} when is_map(Map) -> {ok, map_to_opts(DefaultOpts, Map)}; _NotAMap -> diff --git a/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_status.erl b/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_status.erl index 63675a3da..1d45d64e8 100644 --- a/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_status.erl +++ b/lib-ee/emqx_node_rebalance/src/emqx_node_rebalance_status.erl @@ -208,7 +208,7 @@ format_local_status_field({session_goal, SessGoal}) -> format_local_status_field({disconnected_session_goal, DisconnSessGoal}) -> io_lib:format("Disconnected session goal: ~p~n", [DisconnSessGoal]); format_local_status_field({session_recipients, SessionRecipients}) -> - io_lib:format("Session recipient nodes: ~p~n", [SessionRecipients]); + io_lib:format("Session recipient nodes: ~p~n", [SessionRecipients]); format_local_status_field({recipients, Recipients}) -> io_lib:format("Recipient nodes: ~p~n", [Recipients]); format_local_status_field({donors, Donors}) -> diff --git a/lib-ee/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl b/lib-ee/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl index 21608b8bc..d8202a33e 100644 --- a/lib-ee/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl +++ b/lib-ee/emqx_node_rebalance/test/emqx_node_rebalance_api_SUITE.erl @@ -88,7 +88,7 @@ t_start_evacuation_validation(Config) -> BadOpts ), ?assertMatch( - {ok, 400, #{}}, + {ok, 404, #{}}, api_post( ["load_rebalance", "bad@node", "evacuation", "start"], #{} @@ -148,7 +148,7 @@ t_start_rebalance_validation(Config) -> BadOpts ), ?assertMatch( - {ok, 400, #{}}, + {ok, 404, #{}}, api_post( ["load_rebalance", "bad@node", "start"], #{} @@ -346,7 +346,7 @@ t_start_stop_rebalance(Config) -> } ] }}, - api_get(["load_rebalance", "global_status"]) + GlobalStatusResponse ), ?assertMatch( diff --git a/lib-ee/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl b/lib-ee/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl index cdafad97a..5d774ba7c 100644 --- a/lib-ee/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl +++ b/lib-ee/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl @@ -22,21 +22,20 @@ all() -> [{group, one_node}, {group, two_node}]. groups() -> [ - {one_node, [], [ - t_agent_busy, - t_already_started, - t_not_started, - t_start, - t_persistence, - t_unknown_messages - ]}, - {two_node, [], [ - t_conn_evicted, - t_migrate_to, - t_session_evicted - ]} + {one_node, [], one_node_cases()}, + {two_node, [], two_node_cases()} ]. +two_node_cases() -> + [ + t_conn_evicted, + t_migrate_to, + t_session_evicted + ]. + +one_node_cases() -> + emqx_common_test_helpers:all(?MODULE) -- two_node_cases(). + init_per_suite(Config) -> ok = emqx_common_test_helpers:start_apps([]), Config. diff --git a/rel/i18n/emqx_eviction_agent_api.hocon b/rel/i18n/emqx_eviction_agent_api.hocon new file mode 100644 index 000000000..40566fca6 --- /dev/null +++ b/rel/i18n/emqx_eviction_agent_api.hocon @@ -0,0 +1,9 @@ +emqx_eviction_agent_api { + +node_eviction_status_get.desc: +"""Get the node eviction status""" + +node_eviction_status_get.label: +"""Node Eviction Status""" + +} diff --git a/rel/i18n/emqx_node_rebalance_api.hocon b/rel/i18n/emqx_node_rebalance_api.hocon new file mode 100644 index 000000000..51d0fa8bc --- /dev/null +++ b/rel/i18n/emqx_node_rebalance_api.hocon @@ -0,0 +1,267 @@ +emqx_node_rebalance_api { + +load_rebalance_status.desc: +"""Get rebalance status of the current node""" + +load_rebalance_status.label: +"""Get rebalance status""" + +load_rebalance_global_status.desc: +"""Get status of all rebalance/evacuation processes across the cluster""" + +load_rebalance_global_status.label: +"""Get global rebalance status""" + +load_rebalance_availability_check.desc: +"""Check if the node is being evacuated or rebalanced""" + +load_rebalance_availability_check.label: +"""Availability check""" + +load_rebalance_start.desc: +"""Start rebalance process""" + +load_rebalance_start.label: +"""Start rebalance""" + +load_rebalance_stop.desc: +"""Stop rebalance process""" + +load_rebalance_stop.label: +"""Stop rebalance""" + +load_rebalance_evacuation_start.desc: +"""Start evacuation process""" + +load_rebalance_evacuation_start.label: +"""Start evacuation""" + +load_rebalance_evacuation_stop.desc: +"""Stop evacuation process""" + +load_rebalance_evacuation_stop.label: +"""Stop evacuation""" + +param_node.desc: +"""Node name""" + +param_node.label: +"""Node name""" + +wait_health_check.desc: +"""Time to wait before starting the rebalance process, in seconds""" + +wait_health_check.label: +"""Wait health check""" + +conn_evict_rate.desc: +"""The rate of evicting connections, in connections per second""" + +conn_evict_rate.label: +"""Connection eviction rate""" + +sess_evict_rate.desc: +"""The rate of evicting sessions, in sessions per second""" + +sess_evict_rate.label: +"""Session eviction rate""" + +abs_conn_threshold.desc: +"""Maximum desired difference between the number of connections on the node and the average number of connections on the recipient nodes""" + +abs_conn_threshold.label: +"""Absolute connection threshold""" + +rel_conn_threshold.desc: +"""Maximum desired fraction between the number of connections on the node and the average number of connections on the recipient nodes""" + +rel_conn_threshold.label: +"""Relative connection threshold""" + +abs_sess_threshold.desc: +"""Maximum desired difference between the number of sessions on the node and the average number of sessions on the recipient nodes""" + +abs_sess_threshold.label: +"""Absolute session threshold""" + +rel_sess_threshold.desc: +"""Maximum desired fraction between the number of sessions on the node and the average number of sessions on the recipient nodes""" + +rel_sess_threshold.label: +"""Relative session threshold""" + +wait_takeover.desc: +"""Time to wait before starting session evacuation process, in seconds""" + +wait_takeover.label: +"""Wait takeover""" + +redirect_to.desc: +"""Server reference to redirect clients to (MQTTv5 Server redirection)""" + +redirect_to.label: +"""Redirect to""" + +migrate_to.desc: +"""Nodes to migrate sessions to""" + +migrate_to.label: +"""Migrate to""" + +rebalance_nodes.desc: +"""Nodes to participate in rebalance""" + +rebalance_nodes.label: +"""Rebalance nodes""" + +local_status_enabled.desc: +"""Whether the node is being evacuated""" + +local_status_enabled.label: +"""Local evacuation status""" + +local_status_process.desc: +"""The process that is being performed on the node: evacuation or rebalance""" + +local_status_process.label: +"""Node process""" + +local_status_state.desc: +"""The state of the process that is being performed on the node""" + +local_status_state.label: +"""Rebalance/evacuation current state""" + +local_status_coordinator_node.desc: +"""The node that is coordinating rebalance process""" + +local_status_coordinator_node.label: +"""Coordinator node""" + +local_status_connection_eviction_rate.desc: +"""The rate of evicting connections, in connections per second""" + +local_status_connection_eviction_rate.label: +"""Connection eviction rate""" + +local_status_session_eviction_rate.desc: +"""The rate of evicting sessions, in sessions per second""" + +local_status_session_eviction_rate.label: +"""Session eviction rate""" + +local_status_connection_goal.desc: +"""The number of connections that the node should have after the rebalance/evacuation process""" + +local_status_connection_goal.label: +"""Connection goal""" + +local_status_session_goal.desc: +"""The number of sessions that the node should have after the evacuation process""" + +local_status_session_goal.label: +"""Session goal""" + +local_status_disconnected_session_goal.desc: +"""The number of disconnected sessions that the node should have after the rebalance process""" + +local_status_disconnected_session_goal.label: +"""Disconnected session goal""" + +local_status_session_recipients.desc: +"""List of nodes to which sessions are being evacuated""" + +local_status_session_recipients.label: +"""Session recipients""" + +local_status_recipients.desc: +"""List of nodes to which connections/sessions are being evacuated during rebalance""" + +local_status_recipients.label: +"""Recipients""" + +local_status_stats.desc: +"""Statistics of the evacuation/rebalance process""" + +local_status_stats.label: +"""Statistics""" + +status_stats_initial_connected.desc: +"""The number of connections on the node before the evacuation/rebalance process""" + +status_stats_initial_connected.label: +"""Initial connected""" + +status_stats_current_connected.desc: +"""Current number of connections on the node""" + +status_stats_current_connected.label: +"""Current connections""" + +status_stats_initial_sessions.desc: +"""The number of sessions on the node before the evacuation/rebalance process""" + +status_stats_initial_sessions.label: +"""Initial sessions""" + +status_stats_current_sessions.desc: +"""Current number of sessions on the node""" + +status_stats_current_sessions.label: +"""Current sessions""" + +status_stats_current_disconnected_sessions.desc: +"""Current number of disconnected sessions on the node""" + +status_stats_current_disconnected_sessions.label: +"""Current disconnected sessions""" + +coordinator_status_donors.desc: +"""List of nodes from which connections/sessions are being evacuated""" + +coordinator_status_donors.label: +"""Donors""" + +coordinator_status_donor_conn_avg.desc: +"""Average number of connections per donor node""" + +coordinator_status_donor_conn_avg.label: +"""Donor connections average""" + +coordinator_status_donor_sess_avg.desc: +"""Average number of sessions per donor node""" + +coordinator_status_donor_sess_avg.label: +"""Donor sessions average""" + +coordinator_status_node.desc: +"""The node that is coordinating the evacuation/rebalance process""" + +coordinator_status_node.label: +"""Coordinator node""" + +evacuation_status_node.desc: +"""The node that is being evacuated""" + +evacuation_status_node.label: +"""Evacuated node""" + +global_status_evacuations.desc: +"""List of nodes that are being evacuated""" + +global_status_evacuations.label: +"""Evacuations""" + +global_status_rebalances.desc: +"""List of nodes that coordinate a rebalance""" + +global_status_rebalances.label: +"""Rebalances""" + +empty_response.desc: +"""The response is empty""" + +empty_response.label: +"""Empty response""" + +} diff --git a/rel/i18n/zh/emqx_eviction_agent_api.hocon b/rel/i18n/zh/emqx_eviction_agent_api.hocon new file mode 100644 index 000000000..a4d9f5c12 --- /dev/null +++ b/rel/i18n/zh/emqx_eviction_agent_api.hocon @@ -0,0 +1,9 @@ +emqx_eviction_agent_api { + +node_eviction_status_get.desc: +"""获取节点驱逐状态""" + +node_eviction_status_get.label: +"""节点驱逐状态""" + +} diff --git a/rel/i18n/zh/emqx_node_rebalance_api.hocon b/rel/i18n/zh/emqx_node_rebalance_api.hocon new file mode 100644 index 000000000..3066158b3 --- /dev/null +++ b/rel/i18n/zh/emqx_node_rebalance_api.hocon @@ -0,0 +1,267 @@ +emqx_node_rebalance_api { + +load_rebalance_status.desc: +"""获取当前节点的rebalance状态""" + +load_rebalance_status.label: +"""获取rebalance状态""" + +load_rebalance_global_status.desc: +"""获取集群中所有rebalance/evacuation进程的状态""" + +load_rebalance_global_status.label: +"""获取全局rebalance状态""" + +load_rebalance_availability_check.desc: +"""检查节点是否正在被evacuate或rebalance""" + +load_rebalance_availability_check.label: +"""可用性检查""" + +load_rebalance_start.desc: +"""启动rebalance进程""" + +load_rebalance_start.label: +"""启动rebalance""" + +load_rebalance_stop.desc: +"""停止rebalance进程""" + +load_rebalance_stop.label: +"""停止rebalance""" + +load_rebalance_evacuation_start.desc: +"""启动evacuation进程""" + +load_rebalance_evacuation_start.label: +"""启动evacuation""" + +load_rebalance_evacuation_stop.desc: +"""停止evacuation进程""" + +load_rebalance_evacuation_stop.label: +"""停止evacuation""" + +param_node.desc: +"""节点名称""" + +param_node.label: +"""节点名称""" + +wait_health_check.desc: +"""启动rebalance进程前等待的时间,单位为秒""" + +wait_health_check.label: +"""等待健康检查""" + +conn_evict_rate.desc: +"""逐出连接的速率,以每秒连接数表示""" + +conn_evict_rate.label: +"""连接驱逐率""" + +sess_evict_rate.desc: +"""逐出会话的速率,以每秒会话为单位""" + +sess_evict_rate.label: +"""会话驱逐率""" + +abs_conn_threshold.desc: +"""节点上的连接数与接收节点上的平均连接数之间的最大期望差值""" + +abs_conn_threshold.label: +"""绝对连接阈值""" + +rel_conn_threshold.desc: +"""节点上的连接数与接收节点上的平均连接数之间的最大期望分数""" + +rel_conn_threshold.label: +"""相对连接阈值""" + +abs_sess_threshold.desc: +"""节点上的会话数与接收节点上的平均会话数之间的最大期望差异""" + +abs_sess_threshold.label: +"""绝对会话阈值""" + +rel_sess_threshold.desc: +"""节点上的会话数与接收节点上的平均会话数之间的最大期望分数""" + +rel_sess_threshold.label: +"""相对会话阈值""" + +wait_takeover.desc: +"""开始会话疏散过程之前等待的时间,以秒为单位""" + +wait_takeover.label: +"""等待接管""" + +redirect_to.desc: +"""将客户端重定向到的服务器参考(MQTTv5 服务器重定向)""" + +redirect_to.label: +"""重定向至""" + +migrate_to.desc: +"""将会话迁移到的节点""" + +migrate_to.label: +"""迁移到""" + +rebalance_nodes.desc: +"""参与rebalance的节点""" + +rebalance_nodes.label: +"""重新平衡节点""" + +local_status_enabled.desc: +"""节点是否正在撤离""" + +local_status_enabled.label: +"""当地避难状况""" + +local_status_process.desc: +"""正在节点上执行的过程:疏散或重新平衡""" + +local_status_process.label: +"""节点进程""" + +local_status_state.desc: +"""正在节点上执行的进程的状态""" + +local_status_state.label: +"""重新平衡/疏散当前状态""" + +local_status_coordinator_node.desc: +"""协调再平衡过程的节点""" + +local_status_coordinator_node.label: +"""协调节点""" + +local_status_connection_eviction_rate.desc: +"""逐出连接的速率,以每秒连接数表示""" + +local_status_connection_eviction_rate.label: +"""连接驱逐率""" + +local_status_session_eviction_rate.desc: +"""逐出会话的速率,以每秒会话为单位""" + +local_status_session_eviction_rate.label: +"""会话驱逐率""" + +local_status_connection_goal.desc: +"""节点在重新平衡/疏散过程后应该拥有的连接数""" + +local_status_connection_goal.label: +"""连接目标""" + +local_status_session_goal.desc: +"""疏散过程后节点应有的会话数""" + +local_status_session_goal.label: +"""会话目标""" + +local_status_disconnected_session_goal.desc: +"""重新平衡过程后节点应具有的断开连接的会话数""" + +local_status_disconnected_session_goal.label: +"""断开连接的会话目标""" + +local_status_session_recipients.desc: +"""会话被疏散到的节点列表""" + +local_status_session_recipients.label: +"""会话收件人""" + +local_status_recipients.desc: +"""在重新平衡期间连接/会话被疏散到的节点列表""" + +local_status_recipients.label: +"""收件人""" + +local_status_stats.desc: +"""疏散/再平衡过程的统计""" + +local_status_stats.label: +"""统计数据""" + +status_stats_initial_connected.desc: +"""疏散/重新平衡过程之前节点上的连接数""" + +status_stats_initial_connected.label: +"""初始连接""" + +status_stats_current_connected.desc: +"""节点上的当前连接数""" + +status_stats_current_connected.label: +"""当前连接""" + +status_stats_initial_sessions.desc: +"""疏散/重新平衡过程之前节点上的会话数""" + +status_stats_initial_sessions.label: +"""初始会话""" + +status_stats_current_sessions.desc: +"""节点上的当前会话数""" + +status_stats_current_sessions.label: +"""当前会话""" + +status_stats_current_disconnected_sessions.desc: +"""节点上当前断开连接的会话数""" + +status_stats_current_disconnected_sessions.label: +"""当前断开连接的会话""" + +coordinator_status_donors.desc: +"""正在疏散连接/会话的节点列表""" + +coordinator_status_donors.label: +"""捐助者""" + +coordinator_status_donor_conn_avg.desc: +"""每个供体节点的平均连接数""" + +coordinator_status_donor_conn_avg.label: +"""捐助者连接平均值""" + +coordinator_status_donor_sess_avg.desc: +"""每个供体节点的平均会话数""" + +coordinator_status_donor_sess_avg.label: +"""平均捐助会议""" + +coordinator_status_node.desc: +"""协调疏散/再平衡过程的节点""" + +coordinator_status_node.label: +"""协调节点""" + +evacuation_status_node.desc: +"""正在撤离的节点""" + +evacuation_status_node.label: +"""疏散节点""" + +global_status_evacuations.desc: +"""正在撤离的节点列表""" + +global_status_evacuations.label: +"""疏散""" + +global_status_rebalances.desc: +"""协调再平衡的节点列表""" + +global_status_rebalances.label: +"""再平衡""" + +empty_response.desc: +"""响应为空""" + +empty_response.label: +"""空响应""" +} +