From 81d575c3c1863c5d878b3e1a203493ca7c6885e2 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Tue, 18 Sep 2018 21:44:25 +0200 Subject: [PATCH 01/11] Add new shutdown-policy config schemas --- priv/emqx.schema | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/priv/emqx.schema b/priv/emqx.schema index 9fcd97376..40ace9f93 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1764,3 +1764,18 @@ end}. end}. +%% @doc Max message queue length for connection/session process. +%% NOTE: Message queue here is the Erlang process mailbox, but not +%% the number of MQTT queued messages. +{mapping, "mqtt.conn.max_msg_queue_len", "emqx.conn_max_msg_queue_len", [ + {default, 100000}, + {datatype, integer} +]}. + +%% @doc Max total heap size to kill connection/session process. +%% set to 0 or negative number to have it disabled. +{mapping, "mqtt.conn.max_total_heap_size", "emqx.conn_max_total_heap_size", [ + {default, -1}, %% disabled by default + {datatype, integer} +]}. + From 25b61afe0dbc81e3f4057b48e8b22001de528f66 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Tue, 18 Sep 2018 22:53:37 +0200 Subject: [PATCH 02/11] Add connection/session shutdown policy The hibernation behaviour is also changed (implicitly) in this commit: Prior to this change, connection/session always hibernates after the stats timer expires regardless of messages in mailbox. After this commit, connection/session process only goes to hibernate when the timer expires AND there is nothing left in the mailbox to process --- priv/emqx.schema | 1 + src/emqx_connection.erl | 14 ++++++++--- src/emqx_misc.erl | 41 +++++++++++++++++++++++++++++- src/emqx_session.erl | 14 ++++++++--- test/emqx_misc_tests.erl | 54 ++++++++++++++++++++++++++++++++++++++++ test/emqx_test_lib.erl | 37 +++++++++++++++++++++++++++ 6 files changed, 154 insertions(+), 7 deletions(-) create mode 100644 test/emqx_misc_tests.erl create mode 100644 test/emqx_test_lib.erl diff --git a/priv/emqx.schema b/priv/emqx.schema index 40ace9f93..99e489fd8 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1767,6 +1767,7 @@ end}. %% @doc Max message queue length for connection/session process. %% NOTE: Message queue here is the Erlang process mailbox, but not %% the number of MQTT queued messages. +%% Set to 0 or negative to have it disabled {mapping, "mqtt.conn.max_msg_queue_len", "emqx.conn_max_msg_queue_len", [ {default, 100000}, {datatype, integer} diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index d9d67f08d..63a344944 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -214,9 +214,17 @@ handle_info({timeout, Timer, emit_stats}, proto_state = ProtoState }) -> emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), - ok = emqx_gc:reset(), - {noreply, State#state{stats_timer = undefined}, hibernate}; - + NewState = State#state{stats_timer = undefined}, + case meqx_misc:conn_proc_mng_policy() of + continue -> + {noreply, NewState}; + hibernate -> + ok = emqx_gc:reset(), + {noreply, NewState, hibernate}; + {shutdown, Reason} -> + ?LOG(warning, "shutdown due to ~p", [Reason], NewState), + shutdown(Reason, NewState) + end; handle_info(timeout, State) -> shutdown(idle_timeout, State); diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index e2b60a156..c3cec7427 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -15,7 +15,7 @@ -module(emqx_misc). -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, - proc_name/2, proc_stats/0, proc_stats/1]). + proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/0]). %% @doc Merge options -spec(merge_opts(list(), list()) -> list()). @@ -59,3 +59,42 @@ proc_stats(Pid) -> {value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats), [{mailbox_len, V} | Stats1]. +-define(DISABLED, 0). + +%% @doc Check self() process status against connection/session process management policy, +%% return `continue | hibernate | {shutdown, Reason}' accordingly. +%% `continue': There is nothing out of the ordinary +%% `hibernate': Nothing to process in my mailbox (and since this check is triggered +%% by a timer, we assume it is a fat chance to continue idel, hence hibernate. +%% `shutdown': Some numbers (message queue length or heap size have hit the limit, +%% hence shutdown for greater good (system stability). +-spec(conn_proc_mng_policy() -> continue | hibernate | {shutdown, _}). +conn_proc_mng_policy() -> + MaxMsgQueueLen = application:get_env(?APPLICATION, conn_max_msg_queue_len, ?DISABLED), + Qlength = proc_info(message_queue_len), + Checks = + [{fun() -> is_enabled(MaxMsgQueueLen) andalso Qlength > MaxMsgQueueLen end, + {shutdown, message_queue_too_long}}, + {fun() -> is_heap_size_too_large() end, + {shutdown, total_heap_size_too_large}}, + {fun() -> Qlength > 0 end, continue}, + {fun() -> true end, hibernate} + ], + check(Checks). + +check([{Pred, Result} | Rest]) -> + case Pred() of + true -> Result; + false -> check(Rest) + end. + +is_heap_size_too_large() -> + MaxTotalHeapSize = application:get_env(?APPLICATION, conn_max_total_heap_size, ?DISABLED), + is_enabled(MaxTotalHeapSize) andalso proc_info(total_heap_size) > MaxTotalHeapSize. + +is_enabled(Max) -> Max > ?DISABLED. + +proc_info(Key) -> + {Key, Value} = erlang:process_info(self(), Key), + Value. + diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 54dcfb1fe..64ed698bd 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -574,9 +574,17 @@ handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) -> _ = emqx_sm:set_session_stats(ClientId, stats(State)), - ok = emqx_gc:reset(), %% going to hibernate, reset gc stats - {noreply, State#state{stats_timer = undefined}, hibernate}; - + NewState = State#state{stats_timer = undefined}, + case emqx_misc:conn_proc_mng_policy() of + continue -> + {noreply, NewState}; + hibernate -> + ok = emqx_gc:reset(), %% going to hibernate, reset gc stats + {noreply, NewState, hibernate}; + {shutdown, Reason} -> + ?LOG(warning, "shutdown due to ~p", [Reason], NewState), + shutdown(Reason, NewState) + end; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> ?LOG(info, "expired, shutdown now:(", [], State), shutdown(expired, State); diff --git a/test/emqx_misc_tests.erl b/test/emqx_misc_tests.erl new file mode 100644 index 000000000..775887439 --- /dev/null +++ b/test/emqx_misc_tests.erl @@ -0,0 +1,54 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_misc_tests). +-include_lib("eunit/include/eunit.hrl"). + +shutdown_disabled_test() -> + with_env( + [{conn_max_msg_queue_len, 0}, + {conn_max_total_heap_size, 0}], + fun() -> + self() ! foo, + ?assertEqual(continue, conn_proc_mng_policy()), + receive foo -> ok end, + ?assertEqual(hibernate, conn_proc_mng_policy()) + end). + +message_queue_too_long_test() -> + with_env( + [{conn_max_msg_queue_len, 1}, + {conn_max_total_heap_size, 0}], + fun() -> + self() ! foo, + self() ! bar, + ?assertEqual({shutdown, message_queue_too_long}, + conn_proc_mng_policy()), + receive foo -> ok end, + ?assertEqual(continue, conn_proc_mng_policy()), + receive bar -> ok end + end). + +total_heap_size_too_large_test() -> + with_env( + [{conn_max_msg_queue_len, 0}, + {conn_max_total_heap_size, 1}], + fun() -> + ?assertEqual({shutdown, total_heap_size_too_large}, + conn_proc_mng_policy()) + end). + +with_env(Envs, F) -> emqx_test_lib:with_env(Envs, F). + +conn_proc_mng_policy() -> emqx_misc:conn_proc_mng_policy(). diff --git a/test/emqx_test_lib.erl b/test/emqx_test_lib.erl new file mode 100644 index 000000000..dfb98d598 --- /dev/null +++ b/test/emqx_test_lib.erl @@ -0,0 +1,37 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_test_lib). + +-export([ with_env/2 + ]). + +with_env([], F) -> F(); +with_env([{Key, Val} | Rest], F) -> + Origin = get_env(Key), + try + ok = set_env(Key, Val), + with_env(Rest, F) + after + case Origin of + undefined -> ok = unset_env(Key); + _ -> ok = set_env(Key, Origin) + end + end. + +get_env(Key) -> application:get_env(?APPLICATION, Key). +set_env(Key, Val) -> application:set_env(?APPLICATION, Key, Val). +unset_env(Key) -> application:unset_env(?APPLICATION, Key). + + From f75a6241977e761f96e48a6d7fb0200f7e3cd1f4 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Tue, 18 Sep 2018 23:20:40 +0200 Subject: [PATCH 03/11] Add a test case to cover timeout message flush in emqx_misc --- src/emqx_misc.erl | 9 ++++----- test/emqx_misc_tests.erl | 7 +++++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index c3cec7427..d9e96b9c9 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -36,14 +36,13 @@ start_timer(Interval, Dest, Msg) -> erlang:start_timer(Interval, Dest, Msg). -spec(cancel_timer(undefined | reference()) -> ok). -cancel_timer(undefined) -> - ok; -cancel_timer(Timer) -> - case catch erlang:cancel_timer(Timer) of +cancel_timer(Timer) when is_reference(Timer) -> + case erlang:cancel_timer(Timer) of false -> receive {timeout, Timer, _} -> ok after 0 -> ok end; _ -> ok - end. + end; +cancel_timer(_) -> ok. -spec(proc_name(atom(), pos_integer()) -> atom()). proc_name(Mod, Id) -> diff --git a/test/emqx_misc_tests.erl b/test/emqx_misc_tests.erl index 775887439..a4abac683 100644 --- a/test/emqx_misc_tests.erl +++ b/test/emqx_misc_tests.erl @@ -15,6 +15,13 @@ -module(emqx_misc_tests). -include_lib("eunit/include/eunit.hrl"). +timer_cancel_flush_test() -> + Timer = emqx_misc:start_timer(0, foo), + ok = emqx_misc:cancel_timer(Timer), + receive {timeout, Timer, foo} -> error(unexpected) + after 0 -> ok + end. + shutdown_disabled_test() -> with_env( [{conn_max_msg_queue_len, 0}, From f58165db73a36d93cbd967370e8e4e5aa8d74061 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Wed, 19 Sep 2018 21:39:26 +0200 Subject: [PATCH 04/11] Move shutdown policy config to zone configs --- priv/emqx.schema | 34 +++++++++++++-------------- src/emqx_connection.erl | 4 +++- src/emqx_misc.erl | 36 +++++++++++++++++----------- src/emqx_session.erl | 4 +++- test/emqx_misc_tests.erl | 47 +++++++++++++------------------------ test/emqx_session_SUITE.erl | 2 +- test/emqx_test_lib.erl | 37 ----------------------------- 7 files changed, 61 insertions(+), 103 deletions(-) delete mode 100644 test/emqx_test_lib.erl diff --git a/priv/emqx.schema b/priv/emqx.schema index 99e489fd8..ed545b7ec 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -828,10 +828,21 @@ end}. %% messages | bytes passed through. %% Numbers delimited by `|'. Zero or negative is to disable. {mapping, "zone.$name.force_gc_policy", "emqx.zones", [ - {default, "0|0"}, + {default, "0 | 0"}, {datatype, string} ]}. +%% @doc Max message queue length and total heap size to force shutdown +%% connection/session process. +%% Message queue here is the Erlang process mailbox, but not the number +%% of queued MQTT messages of QoS 1 and 2. +%% Total heap size is the in Erlang 'words' not in 'bytes'. +%% Zero or negative is to disable. +{mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [ + {default, "0 | 0"}, + {datatype, string} +]}. + {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {mqtt_retain_available, Val}; @@ -843,6 +854,10 @@ end}. [Count, Bytes] = string:tokens(Val, "| "), {force_gc_policy, #{count => list_to_integer(Count), bytes => list_to_integer(Bytes)}}; + ("force_shutdown_policy", Val) -> + [Len, Siz] = string:tokens(Val, "| "), + {force_shutdown_policy, #{message_queue_len => list_to_integer(Len), + total_heap_size => list_to_integer(Siz)}}; (Opt, Val) -> {list_to_atom(Opt), Val} end, @@ -1763,20 +1778,3 @@ end}. {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}. - -%% @doc Max message queue length for connection/session process. -%% NOTE: Message queue here is the Erlang process mailbox, but not -%% the number of MQTT queued messages. -%% Set to 0 or negative to have it disabled -{mapping, "mqtt.conn.max_msg_queue_len", "emqx.conn_max_msg_queue_len", [ - {default, 100000}, - {datatype, integer} -]}. - -%% @doc Max total heap size to kill connection/session process. -%% set to 0 or negative number to have it disabled. -{mapping, "mqtt.conn.max_total_heap_size", "emqx.conn_max_total_heap_size", [ - {default, -1}, %% disabled by default - {datatype, integer} -]}. - diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 63a344944..376fe5a9a 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -152,6 +152,7 @@ init([Transport, RawSocket, Options]) -> }), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), ok = emqx_gc:init(GcPolicy), + erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State, self(), IdleTimout); {error, Reason} -> @@ -215,7 +216,8 @@ handle_info({timeout, Timer, emit_stats}, }) -> emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), NewState = State#state{stats_timer = undefined}, - case meqx_misc:conn_proc_mng_policy() of + Limits = erlang:get(force_shutdown_policy), + case meqx_misc:conn_proc_mng_policy(Limits) of continue -> {noreply, NewState}; hibernate -> diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index d9e96b9c9..656b0fca3 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -15,7 +15,7 @@ -module(emqx_misc). -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, - proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/0]). + proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/1]). %% @doc Merge options -spec(merge_opts(list(), list()) -> list()). @@ -62,24 +62,30 @@ proc_stats(Pid) -> %% @doc Check self() process status against connection/session process management policy, %% return `continue | hibernate | {shutdown, Reason}' accordingly. -%% `continue': There is nothing out of the ordinary -%% `hibernate': Nothing to process in my mailbox (and since this check is triggered +%% `continue': There is nothing out of the ordinary. +%% `hibernate': Nothing to process in my mailbox, and since this check is triggered %% by a timer, we assume it is a fat chance to continue idel, hence hibernate. -%% `shutdown': Some numbers (message queue length or heap size have hit the limit, +%% `shutdown': Some numbers (message queue length or heap size have hit the limit), %% hence shutdown for greater good (system stability). --spec(conn_proc_mng_policy() -> continue | hibernate | {shutdown, _}). -conn_proc_mng_policy() -> - MaxMsgQueueLen = application:get_env(?APPLICATION, conn_max_msg_queue_len, ?DISABLED), +-spec(conn_proc_mng_policy(#{message_queue_len := integer(), + total_heap_size := integer() + } | undefined) -> continue | hibernate | {shutdown, _}). +conn_proc_mng_policy(#{message_queue_len := MaxMsgQueueLen, + total_heap_size := MaxTotalHeapSize + }) -> Qlength = proc_info(message_queue_len), Checks = - [{fun() -> is_enabled(MaxMsgQueueLen) andalso Qlength > MaxMsgQueueLen end, + [{fun() -> is_message_queue_too_long(Qlength, MaxMsgQueueLen) end, {shutdown, message_queue_too_long}}, - {fun() -> is_heap_size_too_large() end, + {fun() -> is_heap_size_too_large(MaxTotalHeapSize) end, {shutdown, total_heap_size_too_large}}, {fun() -> Qlength > 0 end, continue}, {fun() -> true end, hibernate} ], - check(Checks). + check(Checks); +conn_proc_mng_policy(_) -> + %% disable by default + conn_proc_mng_policy(#{message_queue_len => 0, total_heap_size => 0}). check([{Pred, Result} | Rest]) -> case Pred() of @@ -87,11 +93,13 @@ check([{Pred, Result} | Rest]) -> false -> check(Rest) end. -is_heap_size_too_large() -> - MaxTotalHeapSize = application:get_env(?APPLICATION, conn_max_total_heap_size, ?DISABLED), - is_enabled(MaxTotalHeapSize) andalso proc_info(total_heap_size) > MaxTotalHeapSize. +is_message_queue_too_long(Qlength, Max) -> + is_enabled(Max) andalso Qlength > Max. -is_enabled(Max) -> Max > ?DISABLED. +is_heap_size_too_large(Max) -> + is_enabled(Max) andalso proc_info(total_heap_size) > Max. + +is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED. proc_info(Key) -> {Key, Value} = erlang:process_info(self(), Key), diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 64ed698bd..4d52afefc 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -357,6 +357,7 @@ init([Parent, #{zone := Zone, emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), ok = emqx_gc:init(GcPolicy), + erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)), ok = proc_lib:init_ack(Parent, {ok, self()}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). @@ -575,7 +576,8 @@ handle_info({timeout, Timer, emit_stats}, stats_timer = Timer}) -> _ = emqx_sm:set_session_stats(ClientId, stats(State)), NewState = State#state{stats_timer = undefined}, - case emqx_misc:conn_proc_mng_policy() of + Limits = erlang:get(force_shutdown_policy), + case emqx_misc:conn_proc_mng_policy(Limits) of continue -> {noreply, NewState}; hibernate -> diff --git a/test/emqx_misc_tests.erl b/test/emqx_misc_tests.erl index a4abac683..40a9aae87 100644 --- a/test/emqx_misc_tests.erl +++ b/test/emqx_misc_tests.erl @@ -23,39 +23,24 @@ timer_cancel_flush_test() -> end. shutdown_disabled_test() -> - with_env( - [{conn_max_msg_queue_len, 0}, - {conn_max_total_heap_size, 0}], - fun() -> - self() ! foo, - ?assertEqual(continue, conn_proc_mng_policy()), - receive foo -> ok end, - ?assertEqual(hibernate, conn_proc_mng_policy()) - end). + self() ! foo, + ?assertEqual(continue, conn_proc_mng_policy(0, 0)), + receive foo -> ok end, + ?assertEqual(hibernate, conn_proc_mng_policy(0, 0)). message_queue_too_long_test() -> - with_env( - [{conn_max_msg_queue_len, 1}, - {conn_max_total_heap_size, 0}], - fun() -> - self() ! foo, - self() ! bar, - ?assertEqual({shutdown, message_queue_too_long}, - conn_proc_mng_policy()), - receive foo -> ok end, - ?assertEqual(continue, conn_proc_mng_policy()), - receive bar -> ok end - end). + self() ! foo, + self() ! bar, + ?assertEqual({shutdown, message_queue_too_long}, + conn_proc_mng_policy(1, 0)), + receive foo -> ok end, + ?assertEqual(continue, conn_proc_mng_policy(1, 0)), + receive bar -> ok end. total_heap_size_too_large_test() -> - with_env( - [{conn_max_msg_queue_len, 0}, - {conn_max_total_heap_size, 1}], - fun() -> - ?assertEqual({shutdown, total_heap_size_too_large}, - conn_proc_mng_policy()) - end). + ?assertEqual({shutdown, total_heap_size_too_large}, + conn_proc_mng_policy(0, 1)). -with_env(Envs, F) -> emqx_test_lib:with_env(Envs, F). - -conn_proc_mng_policy() -> emqx_misc:conn_proc_mng_policy(). +conn_proc_mng_policy(L, S) -> + emqx_misc:conn_proc_mng_policy(#{message_queue_len => L, + total_heap_size => S}). diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 29a6edc61..2b60b747d 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -25,7 +25,7 @@ all() -> [t_session_all]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), Config. - + end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). diff --git a/test/emqx_test_lib.erl b/test/emqx_test_lib.erl deleted file mode 100644 index dfb98d598..000000000 --- a/test/emqx_test_lib.erl +++ /dev/null @@ -1,37 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_test_lib). - --export([ with_env/2 - ]). - -with_env([], F) -> F(); -with_env([{Key, Val} | Rest], F) -> - Origin = get_env(Key), - try - ok = set_env(Key, Val), - with_env(Rest, F) - after - case Origin of - undefined -> ok = unset_env(Key); - _ -> ok = set_env(Key, Origin) - end - end. - -get_env(Key) -> application:get_env(?APPLICATION, Key). -set_env(Key, Val) -> application:set_env(?APPLICATION, Key, Val). -unset_env(Key) -> application:unset_env(?APPLICATION, Key). - - From f9f09f66ddd61d98a51d5def15a80694b9c61a30 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Tue, 18 Sep 2018 21:44:25 +0200 Subject: [PATCH 05/11] Add new shutdown-policy config schemas --- priv/emqx.schema | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/priv/emqx.schema b/priv/emqx.schema index 261c1981a..9e55579df 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1764,3 +1764,18 @@ end}. end}. +%% @doc Max message queue length for connection/session process. +%% NOTE: Message queue here is the Erlang process mailbox, but not +%% the number of MQTT queued messages. +{mapping, "mqtt.conn.max_msg_queue_len", "emqx.conn_max_msg_queue_len", [ + {default, 100000}, + {datatype, integer} +]}. + +%% @doc Max total heap size to kill connection/session process. +%% set to 0 or negative number to have it disabled. +{mapping, "mqtt.conn.max_total_heap_size", "emqx.conn_max_total_heap_size", [ + {default, -1}, %% disabled by default + {datatype, integer} +]}. + From 6fca651a8489049eaab65342b0a9a026ac40a472 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Tue, 18 Sep 2018 22:53:37 +0200 Subject: [PATCH 06/11] Add connection/session shutdown policy The hibernation behaviour is also changed (implicitly) in this commit: Prior to this change, connection/session always hibernates after the stats timer expires regardless of messages in mailbox. After this commit, connection/session process only goes to hibernate when the timer expires AND there is nothing left in the mailbox to process --- priv/emqx.schema | 1 + src/emqx_connection.erl | 14 ++++++++--- src/emqx_misc.erl | 41 +++++++++++++++++++++++++++++- src/emqx_session.erl | 14 ++++++++--- test/emqx_misc_tests.erl | 54 ++++++++++++++++++++++++++++++++++++++++ test/emqx_test_lib.erl | 37 +++++++++++++++++++++++++++ 6 files changed, 154 insertions(+), 7 deletions(-) create mode 100644 test/emqx_misc_tests.erl create mode 100644 test/emqx_test_lib.erl diff --git a/priv/emqx.schema b/priv/emqx.schema index 9e55579df..3b57c80e0 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1767,6 +1767,7 @@ end}. %% @doc Max message queue length for connection/session process. %% NOTE: Message queue here is the Erlang process mailbox, but not %% the number of MQTT queued messages. +%% Set to 0 or negative to have it disabled {mapping, "mqtt.conn.max_msg_queue_len", "emqx.conn_max_msg_queue_len", [ {default, 100000}, {datatype, integer} diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e2c8fdeed..6a56b76e9 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -214,9 +214,17 @@ handle_info({timeout, Timer, emit_stats}, proto_state = ProtoState }) -> emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), - ok = emqx_gc:reset(), - {noreply, State#state{stats_timer = undefined}, hibernate}; - + NewState = State#state{stats_timer = undefined}, + case meqx_misc:conn_proc_mng_policy() of + continue -> + {noreply, NewState}; + hibernate -> + ok = emqx_gc:reset(), + {noreply, NewState, hibernate}; + {shutdown, Reason} -> + ?LOG(warning, "shutdown due to ~p", [Reason], NewState), + shutdown(Reason, NewState) + end; handle_info(timeout, State) -> shutdown(idle_timeout, State); diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index e2b60a156..c3cec7427 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -15,7 +15,7 @@ -module(emqx_misc). -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, - proc_name/2, proc_stats/0, proc_stats/1]). + proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/0]). %% @doc Merge options -spec(merge_opts(list(), list()) -> list()). @@ -59,3 +59,42 @@ proc_stats(Pid) -> {value, {_, V}, Stats1} = lists:keytake(message_queue_len, 1, Stats), [{mailbox_len, V} | Stats1]. +-define(DISABLED, 0). + +%% @doc Check self() process status against connection/session process management policy, +%% return `continue | hibernate | {shutdown, Reason}' accordingly. +%% `continue': There is nothing out of the ordinary +%% `hibernate': Nothing to process in my mailbox (and since this check is triggered +%% by a timer, we assume it is a fat chance to continue idel, hence hibernate. +%% `shutdown': Some numbers (message queue length or heap size have hit the limit, +%% hence shutdown for greater good (system stability). +-spec(conn_proc_mng_policy() -> continue | hibernate | {shutdown, _}). +conn_proc_mng_policy() -> + MaxMsgQueueLen = application:get_env(?APPLICATION, conn_max_msg_queue_len, ?DISABLED), + Qlength = proc_info(message_queue_len), + Checks = + [{fun() -> is_enabled(MaxMsgQueueLen) andalso Qlength > MaxMsgQueueLen end, + {shutdown, message_queue_too_long}}, + {fun() -> is_heap_size_too_large() end, + {shutdown, total_heap_size_too_large}}, + {fun() -> Qlength > 0 end, continue}, + {fun() -> true end, hibernate} + ], + check(Checks). + +check([{Pred, Result} | Rest]) -> + case Pred() of + true -> Result; + false -> check(Rest) + end. + +is_heap_size_too_large() -> + MaxTotalHeapSize = application:get_env(?APPLICATION, conn_max_total_heap_size, ?DISABLED), + is_enabled(MaxTotalHeapSize) andalso proc_info(total_heap_size) > MaxTotalHeapSize. + +is_enabled(Max) -> Max > ?DISABLED. + +proc_info(Key) -> + {Key, Value} = erlang:process_info(self(), Key), + Value. + diff --git a/src/emqx_session.erl b/src/emqx_session.erl index eab657eb6..2381d7c3b 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -574,9 +574,17 @@ handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) -> _ = emqx_sm:set_session_stats(ClientId, stats(State)), - ok = emqx_gc:reset(), %% going to hibernate, reset gc stats - {noreply, State#state{stats_timer = undefined}, hibernate}; - + NewState = State#state{stats_timer = undefined}, + case emqx_misc:conn_proc_mng_policy() of + continue -> + {noreply, NewState}; + hibernate -> + ok = emqx_gc:reset(), %% going to hibernate, reset gc stats + {noreply, NewState, hibernate}; + {shutdown, Reason} -> + ?LOG(warning, "shutdown due to ~p", [Reason], NewState), + shutdown(Reason, NewState) + end; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> ?LOG(info, "expired, shutdown now:(", [], State), shutdown(expired, State); diff --git a/test/emqx_misc_tests.erl b/test/emqx_misc_tests.erl new file mode 100644 index 000000000..775887439 --- /dev/null +++ b/test/emqx_misc_tests.erl @@ -0,0 +1,54 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_misc_tests). +-include_lib("eunit/include/eunit.hrl"). + +shutdown_disabled_test() -> + with_env( + [{conn_max_msg_queue_len, 0}, + {conn_max_total_heap_size, 0}], + fun() -> + self() ! foo, + ?assertEqual(continue, conn_proc_mng_policy()), + receive foo -> ok end, + ?assertEqual(hibernate, conn_proc_mng_policy()) + end). + +message_queue_too_long_test() -> + with_env( + [{conn_max_msg_queue_len, 1}, + {conn_max_total_heap_size, 0}], + fun() -> + self() ! foo, + self() ! bar, + ?assertEqual({shutdown, message_queue_too_long}, + conn_proc_mng_policy()), + receive foo -> ok end, + ?assertEqual(continue, conn_proc_mng_policy()), + receive bar -> ok end + end). + +total_heap_size_too_large_test() -> + with_env( + [{conn_max_msg_queue_len, 0}, + {conn_max_total_heap_size, 1}], + fun() -> + ?assertEqual({shutdown, total_heap_size_too_large}, + conn_proc_mng_policy()) + end). + +with_env(Envs, F) -> emqx_test_lib:with_env(Envs, F). + +conn_proc_mng_policy() -> emqx_misc:conn_proc_mng_policy(). diff --git a/test/emqx_test_lib.erl b/test/emqx_test_lib.erl new file mode 100644 index 000000000..dfb98d598 --- /dev/null +++ b/test/emqx_test_lib.erl @@ -0,0 +1,37 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_test_lib). + +-export([ with_env/2 + ]). + +with_env([], F) -> F(); +with_env([{Key, Val} | Rest], F) -> + Origin = get_env(Key), + try + ok = set_env(Key, Val), + with_env(Rest, F) + after + case Origin of + undefined -> ok = unset_env(Key); + _ -> ok = set_env(Key, Origin) + end + end. + +get_env(Key) -> application:get_env(?APPLICATION, Key). +set_env(Key, Val) -> application:set_env(?APPLICATION, Key, Val). +unset_env(Key) -> application:unset_env(?APPLICATION, Key). + + From f70d16e3879da89d0704ae19e76c9ddf74458d02 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Tue, 18 Sep 2018 23:20:40 +0200 Subject: [PATCH 07/11] Add a test case to cover timeout message flush in emqx_misc --- src/emqx_misc.erl | 9 ++++----- test/emqx_misc_tests.erl | 7 +++++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index c3cec7427..d9e96b9c9 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -36,14 +36,13 @@ start_timer(Interval, Dest, Msg) -> erlang:start_timer(Interval, Dest, Msg). -spec(cancel_timer(undefined | reference()) -> ok). -cancel_timer(undefined) -> - ok; -cancel_timer(Timer) -> - case catch erlang:cancel_timer(Timer) of +cancel_timer(Timer) when is_reference(Timer) -> + case erlang:cancel_timer(Timer) of false -> receive {timeout, Timer, _} -> ok after 0 -> ok end; _ -> ok - end. + end; +cancel_timer(_) -> ok. -spec(proc_name(atom(), pos_integer()) -> atom()). proc_name(Mod, Id) -> diff --git a/test/emqx_misc_tests.erl b/test/emqx_misc_tests.erl index 775887439..a4abac683 100644 --- a/test/emqx_misc_tests.erl +++ b/test/emqx_misc_tests.erl @@ -15,6 +15,13 @@ -module(emqx_misc_tests). -include_lib("eunit/include/eunit.hrl"). +timer_cancel_flush_test() -> + Timer = emqx_misc:start_timer(0, foo), + ok = emqx_misc:cancel_timer(Timer), + receive {timeout, Timer, foo} -> error(unexpected) + after 0 -> ok + end. + shutdown_disabled_test() -> with_env( [{conn_max_msg_queue_len, 0}, From b61615323b09d3f64776ab09f9ea99d67e7e7466 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Wed, 19 Sep 2018 21:39:26 +0200 Subject: [PATCH 08/11] Move shutdown policy config to zone configs --- priv/emqx.schema | 34 +++++++++++++-------------- src/emqx_connection.erl | 4 +++- src/emqx_misc.erl | 36 +++++++++++++++++----------- src/emqx_session.erl | 4 +++- test/emqx_misc_tests.erl | 47 +++++++++++++------------------------ test/emqx_session_SUITE.erl | 2 +- test/emqx_test_lib.erl | 37 ----------------------------- 7 files changed, 61 insertions(+), 103 deletions(-) delete mode 100644 test/emqx_test_lib.erl diff --git a/priv/emqx.schema b/priv/emqx.schema index 3b57c80e0..9fe26ead3 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -828,10 +828,21 @@ end}. %% messages | bytes passed through. %% Numbers delimited by `|'. Zero or negative is to disable. {mapping, "zone.$name.force_gc_policy", "emqx.zones", [ - {default, "0|0"}, + {default, "0 | 0"}, {datatype, string} ]}. +%% @doc Max message queue length and total heap size to force shutdown +%% connection/session process. +%% Message queue here is the Erlang process mailbox, but not the number +%% of queued MQTT messages of QoS 1 and 2. +%% Total heap size is the in Erlang 'words' not in 'bytes'. +%% Zero or negative is to disable. +{mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [ + {default, "0 | 0"}, + {datatype, string} +]}. + {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {mqtt_retain_available, Val}; @@ -843,6 +854,10 @@ end}. [Count, Bytes] = string:tokens(Val, "| "), {force_gc_policy, #{count => list_to_integer(Count), bytes => list_to_integer(Bytes)}}; + ("force_shutdown_policy", Val) -> + [Len, Siz] = string:tokens(Val, "| "), + {force_shutdown_policy, #{message_queue_len => list_to_integer(Len), + total_heap_size => list_to_integer(Siz)}}; (Opt, Val) -> {list_to_atom(Opt), Val} end, @@ -1763,20 +1778,3 @@ end}. {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}. - -%% @doc Max message queue length for connection/session process. -%% NOTE: Message queue here is the Erlang process mailbox, but not -%% the number of MQTT queued messages. -%% Set to 0 or negative to have it disabled -{mapping, "mqtt.conn.max_msg_queue_len", "emqx.conn_max_msg_queue_len", [ - {default, 100000}, - {datatype, integer} -]}. - -%% @doc Max total heap size to kill connection/session process. -%% set to 0 or negative number to have it disabled. -{mapping, "mqtt.conn.max_total_heap_size", "emqx.conn_max_total_heap_size", [ - {default, -1}, %% disabled by default - {datatype, integer} -]}. - diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 6a56b76e9..4290ae0d9 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -152,6 +152,7 @@ init([Transport, RawSocket, Options]) -> }), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), ok = emqx_gc:init(GcPolicy), + erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State, self(), IdleTimout); {error, Reason} -> @@ -215,7 +216,8 @@ handle_info({timeout, Timer, emit_stats}, }) -> emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), NewState = State#state{stats_timer = undefined}, - case meqx_misc:conn_proc_mng_policy() of + Limits = erlang:get(force_shutdown_policy), + case meqx_misc:conn_proc_mng_policy(Limits) of continue -> {noreply, NewState}; hibernate -> diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index d9e96b9c9..656b0fca3 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -15,7 +15,7 @@ -module(emqx_misc). -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, - proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/0]). + proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/1]). %% @doc Merge options -spec(merge_opts(list(), list()) -> list()). @@ -62,24 +62,30 @@ proc_stats(Pid) -> %% @doc Check self() process status against connection/session process management policy, %% return `continue | hibernate | {shutdown, Reason}' accordingly. -%% `continue': There is nothing out of the ordinary -%% `hibernate': Nothing to process in my mailbox (and since this check is triggered +%% `continue': There is nothing out of the ordinary. +%% `hibernate': Nothing to process in my mailbox, and since this check is triggered %% by a timer, we assume it is a fat chance to continue idel, hence hibernate. -%% `shutdown': Some numbers (message queue length or heap size have hit the limit, +%% `shutdown': Some numbers (message queue length or heap size have hit the limit), %% hence shutdown for greater good (system stability). --spec(conn_proc_mng_policy() -> continue | hibernate | {shutdown, _}). -conn_proc_mng_policy() -> - MaxMsgQueueLen = application:get_env(?APPLICATION, conn_max_msg_queue_len, ?DISABLED), +-spec(conn_proc_mng_policy(#{message_queue_len := integer(), + total_heap_size := integer() + } | undefined) -> continue | hibernate | {shutdown, _}). +conn_proc_mng_policy(#{message_queue_len := MaxMsgQueueLen, + total_heap_size := MaxTotalHeapSize + }) -> Qlength = proc_info(message_queue_len), Checks = - [{fun() -> is_enabled(MaxMsgQueueLen) andalso Qlength > MaxMsgQueueLen end, + [{fun() -> is_message_queue_too_long(Qlength, MaxMsgQueueLen) end, {shutdown, message_queue_too_long}}, - {fun() -> is_heap_size_too_large() end, + {fun() -> is_heap_size_too_large(MaxTotalHeapSize) end, {shutdown, total_heap_size_too_large}}, {fun() -> Qlength > 0 end, continue}, {fun() -> true end, hibernate} ], - check(Checks). + check(Checks); +conn_proc_mng_policy(_) -> + %% disable by default + conn_proc_mng_policy(#{message_queue_len => 0, total_heap_size => 0}). check([{Pred, Result} | Rest]) -> case Pred() of @@ -87,11 +93,13 @@ check([{Pred, Result} | Rest]) -> false -> check(Rest) end. -is_heap_size_too_large() -> - MaxTotalHeapSize = application:get_env(?APPLICATION, conn_max_total_heap_size, ?DISABLED), - is_enabled(MaxTotalHeapSize) andalso proc_info(total_heap_size) > MaxTotalHeapSize. +is_message_queue_too_long(Qlength, Max) -> + is_enabled(Max) andalso Qlength > Max. -is_enabled(Max) -> Max > ?DISABLED. +is_heap_size_too_large(Max) -> + is_enabled(Max) andalso proc_info(total_heap_size) > Max. + +is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED. proc_info(Key) -> {Key, Value} = erlang:process_info(self(), Key), diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 2381d7c3b..0a798f0c4 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -357,6 +357,7 @@ init([Parent, #{zone := Zone, emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), ok = emqx_gc:init(GcPolicy), + erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)), ok = proc_lib:init_ack(Parent, {ok, self()}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). @@ -575,7 +576,8 @@ handle_info({timeout, Timer, emit_stats}, stats_timer = Timer}) -> _ = emqx_sm:set_session_stats(ClientId, stats(State)), NewState = State#state{stats_timer = undefined}, - case emqx_misc:conn_proc_mng_policy() of + Limits = erlang:get(force_shutdown_policy), + case emqx_misc:conn_proc_mng_policy(Limits) of continue -> {noreply, NewState}; hibernate -> diff --git a/test/emqx_misc_tests.erl b/test/emqx_misc_tests.erl index a4abac683..40a9aae87 100644 --- a/test/emqx_misc_tests.erl +++ b/test/emqx_misc_tests.erl @@ -23,39 +23,24 @@ timer_cancel_flush_test() -> end. shutdown_disabled_test() -> - with_env( - [{conn_max_msg_queue_len, 0}, - {conn_max_total_heap_size, 0}], - fun() -> - self() ! foo, - ?assertEqual(continue, conn_proc_mng_policy()), - receive foo -> ok end, - ?assertEqual(hibernate, conn_proc_mng_policy()) - end). + self() ! foo, + ?assertEqual(continue, conn_proc_mng_policy(0, 0)), + receive foo -> ok end, + ?assertEqual(hibernate, conn_proc_mng_policy(0, 0)). message_queue_too_long_test() -> - with_env( - [{conn_max_msg_queue_len, 1}, - {conn_max_total_heap_size, 0}], - fun() -> - self() ! foo, - self() ! bar, - ?assertEqual({shutdown, message_queue_too_long}, - conn_proc_mng_policy()), - receive foo -> ok end, - ?assertEqual(continue, conn_proc_mng_policy()), - receive bar -> ok end - end). + self() ! foo, + self() ! bar, + ?assertEqual({shutdown, message_queue_too_long}, + conn_proc_mng_policy(1, 0)), + receive foo -> ok end, + ?assertEqual(continue, conn_proc_mng_policy(1, 0)), + receive bar -> ok end. total_heap_size_too_large_test() -> - with_env( - [{conn_max_msg_queue_len, 0}, - {conn_max_total_heap_size, 1}], - fun() -> - ?assertEqual({shutdown, total_heap_size_too_large}, - conn_proc_mng_policy()) - end). + ?assertEqual({shutdown, total_heap_size_too_large}, + conn_proc_mng_policy(0, 1)). -with_env(Envs, F) -> emqx_test_lib:with_env(Envs, F). - -conn_proc_mng_policy() -> emqx_misc:conn_proc_mng_policy(). +conn_proc_mng_policy(L, S) -> + emqx_misc:conn_proc_mng_policy(#{message_queue_len => L, + total_heap_size => S}). diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 29a6edc61..2b60b747d 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -25,7 +25,7 @@ all() -> [t_session_all]. init_per_suite(Config) -> emqx_ct_broker_helpers:run_setup_steps(), Config. - + end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). diff --git a/test/emqx_test_lib.erl b/test/emqx_test_lib.erl deleted file mode 100644 index dfb98d598..000000000 --- a/test/emqx_test_lib.erl +++ /dev/null @@ -1,37 +0,0 @@ -%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. - --module(emqx_test_lib). - --export([ with_env/2 - ]). - -with_env([], F) -> F(); -with_env([{Key, Val} | Rest], F) -> - Origin = get_env(Key), - try - ok = set_env(Key, Val), - with_env(Rest, F) - after - case Origin of - undefined -> ok = unset_env(Key); - _ -> ok = set_env(Key, Origin) - end - end. - -get_env(Key) -> application:get_env(?APPLICATION, Key). -set_env(Key, Val) -> application:set_env(?APPLICATION, Key, Val). -unset_env(Key) -> application:unset_env(?APPLICATION, Key). - - From ee7a7e24794df53d86798582a5938ea16aa27b5a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 22 Sep 2018 06:05:29 +0800 Subject: [PATCH 09/11] Fix typo --- src/emqx_connection.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 4290ae0d9..dc8439cca 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -217,7 +217,7 @@ handle_info({timeout, Timer, emit_stats}, emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), NewState = State#state{stats_timer = undefined}, Limits = erlang:get(force_shutdown_policy), - case meqx_misc:conn_proc_mng_policy(Limits) of + case emqx_misc:conn_proc_mng_policy(Limits) of continue -> {noreply, NewState}; hibernate -> From 29787d8945de7225e75b157fbeb7fac23ee892dc Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 22 Sep 2018 06:08:53 +0800 Subject: [PATCH 10/11] Use '0MB' to configure bytes of force_gc_policy --- priv/emqx.schema | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/priv/emqx.schema b/priv/emqx.schema index 9fe26ead3..681f18705 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -828,7 +828,7 @@ end}. %% messages | bytes passed through. %% Numbers delimited by `|'. Zero or negative is to disable. {mapping, "zone.$name.force_gc_policy", "emqx.zones", [ - {default, "0 | 0"}, + {default, "0 | 0MB"}, {datatype, string} ]}. From 05a5ad0f8ce0a6a9e7c64636d702cd2beeaaa24f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 22 Sep 2018 06:14:43 +0800 Subject: [PATCH 11/11] Use '0MB' to configure size of force_shutdown_policy --- priv/emqx.schema | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/priv/emqx.schema b/priv/emqx.schema index 681f18705..04ce8f076 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -839,7 +839,7 @@ end}. %% Total heap size is the in Erlang 'words' not in 'bytes'. %% Zero or negative is to disable. {mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [ - {default, "0 | 0"}, + {default, "0 | 0MB"}, {datatype, string} ]}. @@ -856,8 +856,14 @@ end}. bytes => list_to_integer(Bytes)}}; ("force_shutdown_policy", Val) -> [Len, Siz] = string:tokens(Val, "| "), - {force_shutdown_policy, #{message_queue_len => list_to_integer(Len), - total_heap_size => list_to_integer(Siz)}}; + ShutdownPolicy = case cuttlefish_bytesize:parse(Siz) of + {error, Reason} -> + error(Reason); + Siz1 -> + #{message_queue_len => list_to_integer(Len), + total_heap_size => Siz1} + end, + {force_shutdown_policy, ShutdownPolicy}; (Opt, Val) -> {list_to_atom(Opt), Val} end,