chore: simplify session eviction for node rebalance

This commit is contained in:
Ilya Averyanov 2023-09-14 18:11:45 +03:00 committed by Zaiming (Stone) Shi
parent 6919cd4df0
commit 1959e15703
6 changed files with 33 additions and 35 deletions

View File

@ -77,7 +77,6 @@
%% Client management %% Client management
-export([ -export([
all_channels_table/1, all_channels_table/1,
channel_with_session_table/1,
live_connection_table/1 live_connection_table/1
]). ]).
@ -564,27 +563,7 @@ all_channels() ->
Pat = [{{'_', '$1'}, [], ['$1']}], Pat = [{{'_', '$1'}, [], ['$1']}],
ets:select(?CHAN_TAB, Pat). ets:select(?CHAN_TAB, Pat).
%% @doc Get clientinfo for all clients with sessions %% @doc Get clientinfo for all clients
channel_with_session_table(ConnModuleList) ->
Ms = ets:fun2ms(
fun({{ClientId, _ChanPid}, Info, _Stats}) ->
{ClientId, Info}
end
),
Table = ets:table(?CHAN_INFO_TAB, [{traverse, {select, Ms}}]),
ConnModules = sets:from_list(ConnModuleList, [{version, 2}]),
qlc:q([
{ClientId, ConnState, ConnInfo, ClientInfo}
|| {ClientId, #{
conn_state := ConnState,
clientinfo := ClientInfo,
conninfo := #{clean_start := false, conn_mod := ConnModule} = ConnInfo
}} <-
Table,
sets:is_element(ConnModule, ConnModules)
]).
%% @doc Get clientinfo for all clients, regardless if they use clean start or not.
all_channels_table(ConnModuleList) -> all_channels_table(ConnModuleList) ->
Ms = ets:fun2ms( Ms = ets:fun2ms(
fun({{ClientId, _ChanPid}, Info, _Stats}) -> fun({{ClientId, _ChanPid}, Info, _Stats}) ->

View File

@ -230,17 +230,17 @@ connection_table() ->
connection_count() -> connection_count() ->
table_count(connection_table()). table_count(connection_table()).
channel_with_session_table(any) -> channel_table(any) ->
qlc:q([ qlc:q([
{ClientId, ConnInfo, ClientInfo} {ClientId, ConnInfo, ClientInfo}
|| {ClientId, _, ConnInfo, ClientInfo} <- || {ClientId, _, ConnInfo, ClientInfo} <-
emqx_cm:channel_with_session_table(?CONN_MODULES) emqx_cm:all_channels_table(?CONN_MODULES)
]); ]);
channel_with_session_table(RequiredConnState) -> channel_table(RequiredConnState) ->
qlc:q([ qlc:q([
{ClientId, ConnInfo, ClientInfo} {ClientId, ConnInfo, ClientInfo}
|| {ClientId, ConnState, ConnInfo, ClientInfo} <- || {ClientId, ConnState, ConnInfo, ClientInfo} <-
emqx_cm:channel_with_session_table(?CONN_MODULES), emqx_cm:all_channels_table(?CONN_MODULES),
RequiredConnState =:= ConnState RequiredConnState =:= ConnState
]). ]).
@ -269,13 +269,13 @@ all_channels_count() ->
-spec all_local_channels_count() -> non_neg_integer(). -spec all_local_channels_count() -> non_neg_integer().
all_local_channels_count() -> all_local_channels_count() ->
table_count(emqx_cm:all_channels_table(?CONN_MODULES)). table_count(channel_table(any)).
session_count() -> session_count() ->
session_count(any). session_count(any).
session_count(ConnState) -> session_count(ConnState) ->
table_count(channel_with_session_table(ConnState)). table_count(channel_table(ConnState)).
table_count(QH) -> table_count(QH) ->
qlc:fold(fun(_, Acc) -> Acc + 1 end, 0, QH). qlc:fold(fun(_, Acc) -> Acc + 1 end, 0, QH).
@ -298,8 +298,8 @@ take_channels(N) ->
ok = qlc:delete_cursor(ChanPidCursor), ok = qlc:delete_cursor(ChanPidCursor),
Channels. Channels.
take_channel_with_sessions(N, ConnState) -> take_channels(N, ConnState) ->
ChanPidCursor = qlc:cursor(channel_with_session_table(ConnState)), ChanPidCursor = qlc:cursor(channel_table(ConnState)),
Channels = qlc:next_answers(ChanPidCursor, N), Channels = qlc:next_answers(ChanPidCursor, N),
ok = qlc:delete_cursor(ChanPidCursor), ok = qlc:delete_cursor(ChanPidCursor),
Channels. Channels.
@ -314,7 +314,7 @@ do_evict_connections(N, ServerReference) when N > 0 ->
). ).
do_evict_sessions(N, Nodes, ConnState) when N > 0 -> do_evict_sessions(N, Nodes, ConnState) when N > 0 ->
Channels = take_channel_with_sessions(N, ConnState), Channels = take_channels(N, ConnState),
ok = lists:foreach( ok = lists:foreach(
fun({ClientId, ConnInfo, ClientInfo}) -> fun({ClientId, ConnInfo, ClientInfo}) ->
evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo)
@ -346,6 +346,16 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) ->
} }
), ),
{error, Reason}; {error, Reason};
{error, {no_session, _}} = Error ->
?SLOG(
warning,
#{
msg => "evict_session_channel_no_session",
client_id => ClientId,
node => Node
}
),
Error;
{error, Reason} = Error -> {error, Reason} = Error ->
?SLOG( ?SLOG(
error, error,

View File

@ -15,7 +15,7 @@
-import( -import(
emqx_eviction_agent_test_helpers, emqx_eviction_agent_test_helpers,
[emqtt_connect/0, emqtt_connect/1, emqtt_connect/2] [emqtt_connect/0, emqtt_connect/1, emqtt_connect/2, emqtt_connect_for_publish/1]
). ).
-define(assertPrinted(Printed, Code), -define(assertPrinted(Printed, Code),
@ -202,7 +202,7 @@ t_explicit_session_takeover(Config) ->
ok = rpc:call(Node1, emqx_eviction_agent, disable, [test_eviction]), ok = rpc:call(Node1, emqx_eviction_agent, disable, [test_eviction]),
{ok, C1} = emqtt_connect([{port, Port1}]), {ok, C1} = emqtt_connect_for_publish(Port1),
emqtt:publish(C1, <<"t1">>, <<"MessageToEvictedSession1">>), emqtt:publish(C1, <<"t1">>, <<"MessageToEvictedSession1">>),
ok = emqtt:disconnect(C1), ok = emqtt:disconnect(C1),
@ -229,7 +229,7 @@ t_explicit_session_takeover(Config) ->
ok = rpc:call(Node1, emqx_eviction_agent, disable, [test_eviction]), ok = rpc:call(Node1, emqx_eviction_agent, disable, [test_eviction]),
%% Session is on Node2, but we connect to Node1 %% Session is on Node2, but we connect to Node1
{ok, C2} = emqtt_connect([{port, Port1}]), {ok, C2} = emqtt_connect_for_publish(Port1),
emqtt:publish(C2, <<"t1">>, <<"MessageToEvictedSession2">>), emqtt:publish(C2, <<"t1">>, <<"MessageToEvictedSession2">>),
ok = emqtt:disconnect(C2), ok = emqtt:disconnect(C2),

View File

@ -9,7 +9,6 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_channel.hrl"). -include_lib("emqx/include/emqx_channel.hrl").
-define(CLIENT_ID, <<"client_with_session">>). -define(CLIENT_ID, <<"client_with_session">>).

View File

@ -8,6 +8,7 @@
emqtt_connect/0, emqtt_connect/0,
emqtt_connect/1, emqtt_connect/1,
emqtt_connect/2, emqtt_connect/2,
emqtt_connect_for_publish/1,
emqtt_connect_many/2, emqtt_connect_many/2,
emqtt_connect_many/3, emqtt_connect_many/3,
stop_many/1, stop_many/1,
@ -42,6 +43,14 @@ emqtt_connect(Opts) ->
{error, _} = Error -> Error {error, _} = Error -> Error
end. end.
emqtt_connect_for_publish(Port) ->
ClientId = <<"pubclient-", (integer_to_binary(erlang:unique_integer([positive])))/binary>>,
{ok, C} = emqtt:start_link([{clientid, ClientId}, {port, Port}]),
case emqtt:connect(C) of
{ok, _} -> {ok, C};
{error, _} = Error -> Error
end.
emqtt_connect_many(Port, Count) -> emqtt_connect_many(Port, Count) ->
emqtt_connect_many(Port, Count, _StartN = 1). emqtt_connect_many(Port, Count, _StartN = 1).

View File

@ -0,0 +1 @@
During node evacuation, evacuate all disconnected sessions, not only those started with `clean_start` set to `false`.