From 1959e157037683eb696ffc0d255ca3614d9398de Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 14 Sep 2023 18:11:45 +0300 Subject: [PATCH] chore: simplify session eviction for node rebalance --- apps/emqx/src/emqx_cm.erl | 23 +-------------- .../src/emqx_eviction_agent.erl | 28 +++++++++++++------ .../test/emqx_eviction_agent_SUITE.erl | 6 ++-- .../emqx_eviction_agent_channel_SUITE.erl | 1 - .../test/emqx_eviction_agent_test_helpers.erl | 9 ++++++ changes/ee/feat-11612.en.md | 1 + 6 files changed, 33 insertions(+), 35 deletions(-) create mode 100644 changes/ee/feat-11612.en.md diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index e3c126629..f286a5056 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -77,7 +77,6 @@ %% Client management -export([ all_channels_table/1, - channel_with_session_table/1, live_connection_table/1 ]). @@ -564,27 +563,7 @@ all_channels() -> Pat = [{{'_', '$1'}, [], ['$1']}], ets:select(?CHAN_TAB, Pat). -%% @doc Get clientinfo for all clients with sessions -channel_with_session_table(ConnModuleList) -> - Ms = ets:fun2ms( - fun({{ClientId, _ChanPid}, Info, _Stats}) -> - {ClientId, Info} - end - ), - Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]), - ConnModules = sets:from_list(ConnModuleList, [{version, 2}]), - qlc:q([ - {ClientId, ConnState, ConnInfo, ClientInfo} - || {ClientId, #{ - conn_state := ConnState, - clientinfo := ClientInfo, - conninfo := #{clean_start := false, conn_mod := ConnModule} = ConnInfo - }} <- - Table, - sets:is_element(ConnModule, ConnModules) - ]). - -%% @doc Get clientinfo for all clients, regardless if they use clean start or not. +%% @doc Get clientinfo for all clients all_channels_table(ConnModuleList) -> Ms = ets:fun2ms( fun({{ClientId, _ChanPid}, Info, _Stats}) -> diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl index ab2b9e66a..42cffcb3d 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl @@ -230,17 +230,17 @@ connection_table() -> connection_count() -> table_count(connection_table()). -channel_with_session_table(any) -> +channel_table(any) -> qlc:q([ {ClientId, ConnInfo, ClientInfo} || {ClientId, _, ConnInfo, ClientInfo} <- - emqx_cm:channel_with_session_table(?CONN_MODULES) + emqx_cm:all_channels_table(?CONN_MODULES) ]); -channel_with_session_table(RequiredConnState) -> +channel_table(RequiredConnState) -> qlc:q([ {ClientId, ConnInfo, ClientInfo} || {ClientId, ConnState, ConnInfo, ClientInfo} <- - emqx_cm:channel_with_session_table(?CONN_MODULES), + emqx_cm:all_channels_table(?CONN_MODULES), RequiredConnState =:= ConnState ]). @@ -269,13 +269,13 @@ all_channels_count() -> -spec all_local_channels_count() -> non_neg_integer(). all_local_channels_count() -> - table_count(emqx_cm:all_channels_table(?CONN_MODULES)). + table_count(channel_table(any)). session_count() -> session_count(any). session_count(ConnState) -> - table_count(channel_with_session_table(ConnState)). + table_count(channel_table(ConnState)). table_count(QH) -> qlc:fold(fun(_, Acc) -> Acc + 1 end, 0, QH). @@ -298,8 +298,8 @@ take_channels(N) -> ok = qlc:delete_cursor(ChanPidCursor), Channels. -take_channel_with_sessions(N, ConnState) -> - ChanPidCursor = qlc:cursor(channel_with_session_table(ConnState)), +take_channels(N, ConnState) -> + ChanPidCursor = qlc:cursor(channel_table(ConnState)), Channels = qlc:next_answers(ChanPidCursor, N), ok = qlc:delete_cursor(ChanPidCursor), Channels. @@ -314,7 +314,7 @@ do_evict_connections(N, ServerReference) when N > 0 -> ). do_evict_sessions(N, Nodes, ConnState) when N > 0 -> - Channels = take_channel_with_sessions(N, ConnState), + Channels = take_channels(N, ConnState), ok = lists:foreach( fun({ClientId, ConnInfo, ClientInfo}) -> evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) @@ -346,6 +346,16 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) -> } ), {error, Reason}; + {error, {no_session, _}} = Error -> + ?SLOG( + warning, + #{ + msg => "evict_session_channel_no_session", + client_id => ClientId, + node => Node + } + ), + Error; {error, Reason} = Error -> ?SLOG( error, diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl index 3bbdcd707..bc6f626d2 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl @@ -15,7 +15,7 @@ -import( emqx_eviction_agent_test_helpers, - [emqtt_connect/0, emqtt_connect/1, emqtt_connect/2] + [emqtt_connect/0, emqtt_connect/1, emqtt_connect/2, emqtt_connect_for_publish/1] ). -define(assertPrinted(Printed, Code), @@ -202,7 +202,7 @@ t_explicit_session_takeover(Config) -> ok = rpc:call(Node1, emqx_eviction_agent, disable, [test_eviction]), - {ok, C1} = emqtt_connect([{port, Port1}]), + {ok, C1} = emqtt_connect_for_publish(Port1), emqtt:publish(C1, <<"t1">>, <<"MessageToEvictedSession1">>), ok = emqtt:disconnect(C1), @@ -229,7 +229,7 @@ t_explicit_session_takeover(Config) -> ok = rpc:call(Node1, emqx_eviction_agent, disable, [test_eviction]), %% Session is on Node2, but we connect to Node1 - {ok, C2} = emqtt_connect([{port, Port1}]), + {ok, C2} = emqtt_connect_for_publish(Port1), emqtt:publish(C2, <<"t1">>, <<"MessageToEvictedSession2">>), ok = emqtt:disconnect(C2), diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl index 764306ce8..1d77fe170 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl @@ -9,7 +9,6 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_channel.hrl"). -define(CLIENT_ID, <<"client_with_session">>). diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl index 860436f67..b3b3e8767 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_test_helpers.erl @@ -8,6 +8,7 @@ emqtt_connect/0, emqtt_connect/1, emqtt_connect/2, + emqtt_connect_for_publish/1, emqtt_connect_many/2, emqtt_connect_many/3, stop_many/1, @@ -42,6 +43,14 @@ emqtt_connect(Opts) -> {error, _} = Error -> Error end. +emqtt_connect_for_publish(Port) -> + ClientId = <<"pubclient-", (integer_to_binary(erlang:unique_integer([positive])))/binary>>, + {ok, C} = emqtt:start_link([{clientid, ClientId}, {port, Port}]), + case emqtt:connect(C) of + {ok, _} -> {ok, C}; + {error, _} = Error -> Error + end. + emqtt_connect_many(Port, Count) -> emqtt_connect_many(Port, Count, _StartN = 1). diff --git a/changes/ee/feat-11612.en.md b/changes/ee/feat-11612.en.md new file mode 100644 index 000000000..dcad59d40 --- /dev/null +++ b/changes/ee/feat-11612.en.md @@ -0,0 +1 @@ +During node evacuation, evacuate all disconnected sessions, not only those started with `clean_start` set to `false`.