Merge pull request #9834 from zmstone/0123-fix-idle_timeout-infinity

fix(emqx_connection): crash when idle_timeout is set to infinity
This commit is contained in:
Zaiming (Stone) Shi 2023-01-24 16:05:09 +01:00 committed by GitHub
commit e5b65087af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 54 additions and 12 deletions

View File

@ -656,8 +656,15 @@ mqtt 下所有的配置作为全局的默认值存在,它可以被 <code>zone<
mqtt_idle_timeout { mqtt_idle_timeout {
desc { desc {
en: """After the TCP connection is established, if the MQTT CONNECT packet from the client is not received within the time specified by <code>idle_timeout</code>, the connection will be disconnected.""" en: """After the TCP connection is established, if the MQTT CONNECT packet from the client is
zh: """TCP 连接建立后,如果在 <code>idle_timeout</code> 指定的时间内未收到客户端的 MQTT CONNECT 报文,则连接将被断开。""" not received within the time specified by <code>idle_timeout</code>, the connection will be disconnected.
After the CONNECT packet has been accepted by EMQX, if the connection idles for this long time,
then the Erlang process is put to hibernation to save OS resources. Note: long <code>idle_timeout</code>
interval may impose risk at the system if large number of malicious clients only establish connections
but do not send any data."""
zh: """TCP 连接建立后,如果在 <code>idle_timeout</code> 指定的时间内未收到客户端的 MQTT CONNECT 报文,则连接将被断开。
如果连接在 CONNECT 报文被 EMQX 接受之后空闲超过该时长,那么服务这个连接的 Erlang 进程会进入休眠以节省系统资源。
注意,该配置值如果设置过大的情况下,如果大量恶意客户端只连接,但不发任何数据,可能会导致系统资源被恶意消耗。"""
} }
label: { label: {
en: """Idle Timeout""" en: """Idle Timeout"""

View File

@ -3,7 +3,7 @@
{id, "emqx"}, {id, "emqx"},
{description, "EMQX Core"}, {description, "EMQX Core"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.15"}, {vsn, "5.0.16"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [ {applications, [

View File

@ -403,14 +403,19 @@ exit_on_sock_error(Reason) ->
recvloop( recvloop(
Parent, Parent,
State = #state{ State = #state{
idle_timeout = IdleTimeout, idle_timeout = IdleTimeout0,
zone = Zone zone = Zone
} }
) -> ) ->
IdleTimeout =
case IdleTimeout0 of
infinity -> infinity;
_ -> IdleTimeout0 + 100
end,
receive receive
Msg -> Msg ->
handle_recv(Msg, Parent, State) handle_recv(Msg, Parent, State)
after IdleTimeout + 100 -> after IdleTimeout ->
case emqx_olp:backoff_hibernation(Zone) of case emqx_olp:backoff_hibernation(Zone) of
true -> true ->
recvloop(Parent, State); recvloop(Parent, State);

View File

@ -24,6 +24,7 @@
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(TOPICS, [ -define(TOPICS, [
<<"TopicA">>, <<"TopicA">>,
@ -43,6 +44,8 @@
<<"TopicA/#">> <<"TopicA/#">>
]). ]).
-define(WAIT(EXPR, ATTEMPTS), ?retry(1000, ATTEMPTS, EXPR)).
all() -> all() ->
[ [
{group, mqttv3}, {group, mqttv3},
@ -85,6 +88,12 @@ init_per_suite(Config) ->
end_per_suite(_Config) -> end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]). emqx_common_test_helpers:stop_apps([]).
init_per_testcase(_Case, Config) ->
Config.
end_per_testcase(_Case, _Config) ->
emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases for MQTT v3 %% Test cases for MQTT v3
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -101,16 +110,35 @@ t_basic_v4(_Config) ->
t_cm(_) -> t_cm(_) ->
emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 1000), emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 1000),
ClientId = <<"myclient">>, ClientId = atom_to_binary(?FUNCTION_NAME),
{ok, C} = emqtt:start_link([{clientid, ClientId}]), {ok, C} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
ct:sleep(500), ?WAIT(#{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_info(ClientId), 2),
#{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_info(ClientId),
emqtt:subscribe(C, <<"mytopic">>, 0), emqtt:subscribe(C, <<"mytopic">>, 0),
ct:sleep(1200), ?WAIT(
begin
Stats = emqx_cm:get_chan_stats(ClientId), Stats = emqx_cm:get_chan_stats(ClientId),
?assertEqual(1, proplists:get_value(subscriptions_cnt, Stats)), ?assertEqual(1, proplists:get_value(subscriptions_cnt, Stats))
emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000). end,
2
),
ok.
t_idle_timeout_infinity(_) ->
emqx_config:put_zone_conf(default, [mqtt, idle_timeout], infinity),
ClientId = atom_to_binary(?FUNCTION_NAME),
{ok, C} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(C),
?WAIT(#{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_info(ClientId), 2),
emqtt:subscribe(C, <<"mytopic">>, 0),
?WAIT(
begin
Stats = emqx_cm:get_chan_stats(ClientId),
?assertEqual(1, proplists:get_value(subscriptions_cnt, Stats))
end,
2
),
ok.
t_cm_registry(_) -> t_cm_registry(_) ->
Children = supervisor:which_children(emqx_cm_sup), Children = supervisor:which_children(emqx_cm_sup),

View File

@ -0,0 +1 @@
Allow `mqtt.idle_timeout` to be set to `infinity`

View File

@ -0,0 +1 @@
允许配置项 `mqtt.idle_timeout` 设置成 `infinity`