From 140cda2f134d4d82b1794235743216c60d9e79de Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 23 Jan 2023 19:38:08 +0100 Subject: [PATCH 1/2] fix(emqx_connection): crash when idle_timeout is set to infinity --- apps/emqx/i18n/emqx_schema_i18n.conf | 11 ++++-- apps/emqx/src/emqx.app.src | 2 +- apps/emqx/src/emqx_connection.erl | 9 +++-- apps/emqx/test/emqx_client_SUITE.erl | 52 ++++++++++++++++++++++++---- changes/v5.0.16/fix-9834.en.md | 1 + changes/v5.0.16/fix-9834.zh.md | 1 + 6 files changed, 64 insertions(+), 12 deletions(-) create mode 100644 changes/v5.0.16/fix-9834.en.md create mode 100644 changes/v5.0.16/fix-9834.zh.md diff --git a/apps/emqx/i18n/emqx_schema_i18n.conf b/apps/emqx/i18n/emqx_schema_i18n.conf index 0665cfb09..0ab781280 100644 --- a/apps/emqx/i18n/emqx_schema_i18n.conf +++ b/apps/emqx/i18n/emqx_schema_i18n.conf @@ -656,8 +656,15 @@ mqtt 下所有的配置作为全局的默认值存在,它可以被 zone< mqtt_idle_timeout { desc { - en: """After the TCP connection is established, if the MQTT CONNECT packet from the client is not received within the time specified by idle_timeout, the connection will be disconnected.""" - zh: """TCP 连接建立后,如果在 idle_timeout 指定的时间内未收到客户端的 MQTT CONNECT 报文,则连接将被断开。""" + en: """After the TCP connection is established, if the MQTT CONNECT packet from the client is +not received within the time specified by idle_timeout, the connection will be disconnected. +After the CONNECT packet has been accepted by EMQX, if the connection idles for this long time, +then the Erlang process is put to hibernation to save OS resources. Note: long idle_timeout +interval may impose risk at the system if large number of malicious clients only establish connections +but do not send any data.""" + zh: """TCP 连接建立后,如果在 idle_timeout 指定的时间内未收到客户端的 MQTT CONNECT 报文,则连接将被断开。 +如果连接在 CONNECT 报文被 EMQX 接受之后空闲超过该时长,那么服务这个连接的 Erlang 进程会进入休眠以节省系统资源。 +注意,该配置值如果设置过大的情况下,如果大量恶意客户端只连接,但不发任何数据,可能会导致系统资源被恶意消耗。""" } label: { en: """Idle Timeout""" diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index 40d2796cd..270d36a5e 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -3,7 +3,7 @@ {id, "emqx"}, {description, "EMQX Core"}, % strict semver, bump manually! - {vsn, "5.0.15"}, + {vsn, "5.0.16"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 0c6481399..5ed302a6f 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -403,14 +403,19 @@ exit_on_sock_error(Reason) -> recvloop( Parent, State = #state{ - idle_timeout = IdleTimeout, + idle_timeout = IdleTimeout0, zone = Zone } ) -> + IdleTimeout = + case IdleTimeout0 of + infinity -> infinity; + _ -> IdleTimeout0 + 100 + end, receive Msg -> handle_recv(Msg, Parent, State) - after IdleTimeout + 100 -> + after IdleTimeout -> case emqx_olp:backoff_hibernation(Zone) of true -> recvloop(Parent, State); diff --git a/apps/emqx/test/emqx_client_SUITE.erl b/apps/emqx/test/emqx_client_SUITE.erl index 79c934b47..8effb60ef 100644 --- a/apps/emqx/test/emqx_client_SUITE.erl +++ b/apps/emqx/test/emqx_client_SUITE.erl @@ -43,6 +43,8 @@ <<"TopicA/#">> ]). +-define(WAIT(EXPR, ATTEMPTS), wait(fun() -> EXPR end, ATTEMPTS)). + all() -> [ {group, mqttv3}, @@ -85,6 +87,12 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([]). +init_per_testcase(_Case, Config) -> + Config. + +end_per_testcase(_Case, _Config) -> + emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000). + %%-------------------------------------------------------------------- %% Test cases for MQTT v3 %%-------------------------------------------------------------------- @@ -101,16 +109,35 @@ t_basic_v4(_Config) -> t_cm(_) -> emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 1000), - ClientId = <<"myclient">>, + ClientId = atom_to_binary(?FUNCTION_NAME), {ok, C} = emqtt:start_link([{clientid, ClientId}]), {ok, _} = emqtt:connect(C), - ct:sleep(500), - #{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_info(ClientId), + ?WAIT(#{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_info(ClientId), 2), emqtt:subscribe(C, <<"mytopic">>, 0), - ct:sleep(1200), - Stats = emqx_cm:get_chan_stats(ClientId), - ?assertEqual(1, proplists:get_value(subscriptions_cnt, Stats)), - emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000). + ?WAIT( + begin + Stats = emqx_cm:get_chan_stats(ClientId), + ?assertEqual(1, proplists:get_value(subscriptions_cnt, Stats)) + end, + 2 + ), + ok. + +t_idle_timeout_infinity(_) -> + emqx_config:put_zone_conf(default, [mqtt, idle_timeout], infinity), + ClientId = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{clientid, ClientId}]), + {ok, _} = emqtt:connect(C), + ?WAIT(#{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_info(ClientId), 2), + emqtt:subscribe(C, <<"mytopic">>, 0), + ?WAIT( + begin + Stats = emqx_cm:get_chan_stats(ClientId), + ?assertEqual(1, proplists:get_value(subscriptions_cnt, Stats)) + end, + 2 + ), + ok. t_cm_registry(_) -> Children = supervisor:which_children(emqx_cm_sup), @@ -363,3 +390,14 @@ tls_certcn_as_clientid(TLSVsn, RequiredTLSVsn) -> #{clientinfo := #{clientid := CN}} = emqx_cm:get_chan_info(CN), confirm_tls_version(Client, RequiredTLSVsn), emqtt:disconnect(Client). + +wait(F, 1) -> + F(); +wait(F, Attempts) when Attempts > 0 -> + try + F() + catch + _:_ -> + timer:sleep(1000), + wait(F, Attempts - 1) + end. diff --git a/changes/v5.0.16/fix-9834.en.md b/changes/v5.0.16/fix-9834.en.md new file mode 100644 index 000000000..d5ad1f67a --- /dev/null +++ b/changes/v5.0.16/fix-9834.en.md @@ -0,0 +1 @@ +Allow `mqtt.idle_timeout` to be set to `infinity` diff --git a/changes/v5.0.16/fix-9834.zh.md b/changes/v5.0.16/fix-9834.zh.md new file mode 100644 index 000000000..06eafc1a0 --- /dev/null +++ b/changes/v5.0.16/fix-9834.zh.md @@ -0,0 +1 @@ +允许配置项 `mqtt.idle_timeout` 设置成 `infinity` From 7575120ea694e0ebacb5b43bdfcc685ae3c29c7e Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 24 Jan 2023 10:54:20 +0100 Subject: [PATCH 2/2] test: use snabbkaffe retry macro --- apps/emqx/test/emqx_client_SUITE.erl | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/apps/emqx/test/emqx_client_SUITE.erl b/apps/emqx/test/emqx_client_SUITE.erl index 8effb60ef..2f433c73d 100644 --- a/apps/emqx/test/emqx_client_SUITE.erl +++ b/apps/emqx/test/emqx_client_SUITE.erl @@ -24,6 +24,7 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(TOPICS, [ <<"TopicA">>, @@ -43,7 +44,7 @@ <<"TopicA/#">> ]). --define(WAIT(EXPR, ATTEMPTS), wait(fun() -> EXPR end, ATTEMPTS)). +-define(WAIT(EXPR, ATTEMPTS), ?retry(1000, ATTEMPTS, EXPR)). all() -> [ @@ -390,14 +391,3 @@ tls_certcn_as_clientid(TLSVsn, RequiredTLSVsn) -> #{clientinfo := #{clientid := CN}} = emqx_cm:get_chan_info(CN), confirm_tls_version(Client, RequiredTLSVsn), emqtt:disconnect(Client). - -wait(F, 1) -> - F(); -wait(F, Attempts) when Attempts > 0 -> - try - F() - catch - _:_ -> - timer:sleep(1000), - wait(F, Attempts - 1) - end.