diff --git a/priv/emqx.schema b/priv/emqx.schema index 9e55579df..3b57c80e0 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1767,6 +1767,7 @@ end}. %% @doc Max message queue length for connection/session process. %% NOTE: Message queue here is the Erlang process mailbox, but not %% the number of MQTT queued messages. +%% Set to 0 or negative to have it disabled {mapping, "mqtt.conn.max_msg_queue_len", "emqx.conn_max_msg_queue_len", [ {default, 100000}, {datatype, integer} diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e2c8fdeed..6a56b76e9 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -214,9 +214,17 @@ handle_info({timeout, Timer, emit_stats}, proto_state = ProtoState }) -> emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), - ok = emqx_gc:reset(), - {noreply, State#state{stats_timer = undefined}, hibernate}; - + NewState = State#state{stats_timer = undefined}, + case meqx_misc:conn_proc_mng_policy() of + continue -> + {noreply, NewState}; + hibernate -> + ok = emqx_gc:reset(), + {noreply, NewState, hibernate}; + {shutdown, Reason} -> + ?LOG(warning, "shutdown due to ~p", [Reason], NewState), + shutdown(Reason, NewState) + end; handle_info(timeout, State) -> shutdown(idle_timeout, State); diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index e2b60a156..c3cec7427 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -15,7 +15,7 @@ -module(emqx_misc). -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, - proc_name/2, proc_stats/0, proc_stats/1]). + proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/0]). %% @doc Merge options -spec(merge_opts(list(), list()) -> list()). @@ -59,3 +59,42 @@ proc_stats(Pid) -> {value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats), [{mailbox_len, V} | Stats1]. +-define(DISABLED, 0). + +%% @doc Check self() process status against connection/session process management policy, +%% return `continue | hibernate | {shutdown, Reason}' accordingly. +%% `continue': There is nothing out of the ordinary +%% `hibernate': Nothing to process in my mailbox (and since this check is triggered +%% by a timer, we assume it is a fat chance to continue idel, hence hibernate. +%% `shutdown': Some numbers (message queue length or heap size have hit the limit, +%% hence shutdown for greater good (system stability). +-spec(conn_proc_mng_policy() -> continue | hibernate | {shutdown, _}). +conn_proc_mng_policy() -> + MaxMsgQueueLen = application:get_env(?APPLICATION, conn_max_msg_queue_len, ?DISABLED), + Qlength = proc_info(message_queue_len), + Checks = + [{fun() -> is_enabled(MaxMsgQueueLen) andalso Qlength > MaxMsgQueueLen end, + {shutdown, message_queue_too_long}}, + {fun() -> is_heap_size_too_large() end, + {shutdown, total_heap_size_too_large}}, + {fun() -> Qlength > 0 end, continue}, + {fun() -> true end, hibernate} + ], + check(Checks). + +check([{Pred, Result} | Rest]) -> + case Pred() of + true -> Result; + false -> check(Rest) + end. + +is_heap_size_too_large() -> + MaxTotalHeapSize = application:get_env(?APPLICATION, conn_max_total_heap_size, ?DISABLED), + is_enabled(MaxTotalHeapSize) andalso proc_info(total_heap_size) > MaxTotalHeapSize. + +is_enabled(Max) -> Max > ?DISABLED. + +proc_info(Key) -> + {Key, Value} = erlang:process_info(self(), Key), + Value. + diff --git a/src/emqx_session.erl b/src/emqx_session.erl index eab657eb6..2381d7c3b 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -574,9 +574,17 @@ handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) -> _ = emqx_sm:set_session_stats(ClientId, stats(State)), - ok = emqx_gc:reset(), %% going to hibernate, reset gc stats - {noreply, State#state{stats_timer = undefined}, hibernate}; - + NewState = State#state{stats_timer = undefined}, + case emqx_misc:conn_proc_mng_policy() of + continue -> + {noreply, NewState}; + hibernate -> + ok = emqx_gc:reset(), %% going to hibernate, reset gc stats + {noreply, NewState, hibernate}; + {shutdown, Reason} -> + ?LOG(warning, "shutdown due to ~p", [Reason], NewState), + shutdown(Reason, NewState) + end; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> ?LOG(info, "expired, shutdown now:(", [], State), shutdown(expired, State); diff --git a/test/emqx_misc_tests.erl b/test/emqx_misc_tests.erl new file mode 100644 index 000000000..775887439 --- /dev/null +++ b/test/emqx_misc_tests.erl @@ -0,0 +1,54 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_misc_tests). +-include_lib("eunit/include/eunit.hrl"). + +shutdown_disabled_test() -> + with_env( + [{conn_max_msg_queue_len, 0}, + {conn_max_total_heap_size, 0}], + fun() -> + self() ! foo, + ?assertEqual(continue, conn_proc_mng_policy()), + receive foo -> ok end, + ?assertEqual(hibernate, conn_proc_mng_policy()) + end). + +message_queue_too_long_test() -> + with_env( + [{conn_max_msg_queue_len, 1}, + {conn_max_total_heap_size, 0}], + fun() -> + self() ! foo, + self() ! bar, + ?assertEqual({shutdown, message_queue_too_long}, + conn_proc_mng_policy()), + receive foo -> ok end, + ?assertEqual(continue, conn_proc_mng_policy()), + receive bar -> ok end + end). + +total_heap_size_too_large_test() -> + with_env( + [{conn_max_msg_queue_len, 0}, + {conn_max_total_heap_size, 1}], + fun() -> + ?assertEqual({shutdown, total_heap_size_too_large}, + conn_proc_mng_policy()) + end). + +with_env(Envs, F) -> emqx_test_lib:with_env(Envs, F). + +conn_proc_mng_policy() -> emqx_misc:conn_proc_mng_policy(). diff --git a/test/emqx_test_lib.erl b/test/emqx_test_lib.erl new file mode 100644 index 000000000..dfb98d598 --- /dev/null +++ b/test/emqx_test_lib.erl @@ -0,0 +1,37 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_test_lib). + +-export([ with_env/2 + ]). + +with_env([], F) -> F(); +with_env([{Key, Val} | Rest], F) -> + Origin = get_env(Key), + try + ok = set_env(Key, Val), + with_env(Rest, F) + after + case Origin of + undefined -> ok = unset_env(Key); + _ -> ok = set_env(Key, Origin) + end + end. + +get_env(Key) -> application:get_env(?APPLICATION, Key). +set_env(Key, Val) -> application:set_env(?APPLICATION, Key, Val). +unset_env(Key) -> application:unset_env(?APPLICATION, Key). + +