From 67ff54d5d8177f8b91a50723d286c1ee73ea99d6 Mon Sep 17 00:00:00 2001 From: spring2maz Date: Sat, 22 Sep 2018 08:53:49 +0200 Subject: [PATCH] Change from customized total heap size check to set process flag The `max_heap_size` process flag can be used to limit total heap size of a process, and it gives much more detailed crash log if the limit is hit. --- priv/emqx.schema | 6 +++--- src/emqx_connection.erl | 2 +- src/emqx_misc.erl | 30 ++++++++++++++++-------------- src/emqx_session.erl | 2 +- test/emqx_misc_tests.erl | 16 ++++++---------- 5 files changed, 27 insertions(+), 29 deletions(-) diff --git a/priv/emqx.schema b/priv/emqx.schema index dc2acdd0a..521b9ee22 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -808,7 +808,6 @@ end}. %% connection/session process. %% Message queue here is the Erlang process mailbox, but not the number %% of queued MQTT messages of QoS 1 and 2. -%% Total heap size is the in Erlang 'words' not in 'bytes'. %% Zero or negative is to disable. {mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [ {default, "0 | 0MB"}, @@ -841,7 +840,8 @@ end}. {error, Reason} -> error(Reason); Bytes1 -> - #{bytes => Bytes1, count => list_to_integer(Count)} + #{bytes => Bytes1, + count => list_to_integer(Count)} end, {force_gc_policy, GcPolicy}; ("force_shutdown_policy", Val) -> @@ -851,7 +851,7 @@ end}. error(Reason); Siz1 -> #{message_queue_len => list_to_integer(Len), - total_heap_size => Siz1} + max_heap_size => Siz1} end, {force_shutdown_policy, ShutdownPolicy}; (Opt, Val) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 3176ad443..ccb5f59fa 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -152,7 +152,7 @@ init([Transport, RawSocket, Options]) -> }), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), ok = emqx_gc:init(GcPolicy), - erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)), + ok = emqx_misc:init_proc_mng_policy(Zone), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State, self(), IdleTimout); {error, Reason} -> diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 656b0fca3..03c42510c 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -15,7 +15,9 @@ -module(emqx_misc). -export([merge_opts/2, start_timer/2, start_timer/3, cancel_timer/1, - proc_name/2, proc_stats/0, proc_stats/1, conn_proc_mng_policy/1]). + proc_name/2, proc_stats/0, proc_stats/1]). + +-export([init_proc_mng_policy/1, conn_proc_mng_policy/1]). %% @doc Merge options -spec(merge_opts(list(), list()) -> list()). @@ -60,32 +62,35 @@ proc_stats(Pid) -> -define(DISABLED, 0). +init_proc_mng_policy(Zone) -> + #{max_heap_size := MaxHeapSizeInBytes} = ShutdownPolicy = + emqx_zone:get_env(Zone, force_shutdown_policy), + MaxHeapSize = MaxHeapSizeInBytes div erlang:system_info(wordsize), + _ = erlang:process_flag(max_heap_size, MaxHeapSize), % zero is discarded + erlang:put(force_shutdown_policy, ShutdownPolicy), + ok. + %% @doc Check self() process status against connection/session process management policy, %% return `continue | hibernate | {shutdown, Reason}' accordingly. %% `continue': There is nothing out of the ordinary. %% `hibernate': Nothing to process in my mailbox, and since this check is triggered %% by a timer, we assume it is a fat chance to continue idel, hence hibernate. -%% `shutdown': Some numbers (message queue length or heap size have hit the limit), +%% `shutdown': Some numbers (message queue length hit the limit), %% hence shutdown for greater good (system stability). --spec(conn_proc_mng_policy(#{message_queue_len := integer(), - total_heap_size := integer() - } | undefined) -> continue | hibernate | {shutdown, _}). -conn_proc_mng_policy(#{message_queue_len := MaxMsgQueueLen, - total_heap_size := MaxTotalHeapSize - }) -> +-spec(conn_proc_mng_policy(#{message_queue_len => integer()} | false) -> + continue | hibernate | {shutdown, _}). +conn_proc_mng_policy(#{message_queue_len := MaxMsgQueueLen}) -> Qlength = proc_info(message_queue_len), Checks = [{fun() -> is_message_queue_too_long(Qlength, MaxMsgQueueLen) end, {shutdown, message_queue_too_long}}, - {fun() -> is_heap_size_too_large(MaxTotalHeapSize) end, - {shutdown, total_heap_size_too_large}}, {fun() -> Qlength > 0 end, continue}, {fun() -> true end, hibernate} ], check(Checks); conn_proc_mng_policy(_) -> %% disable by default - conn_proc_mng_policy(#{message_queue_len => 0, total_heap_size => 0}). + conn_proc_mng_policy(#{message_queue_len => 0}). check([{Pred, Result} | Rest]) -> case Pred() of @@ -96,9 +101,6 @@ check([{Pred, Result} | Rest]) -> is_message_queue_too_long(Qlength, Max) -> is_enabled(Max) andalso Qlength > Max. -is_heap_size_too_large(Max) -> - is_enabled(Max) andalso proc_info(total_heap_size) > Max. - is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED. proc_info(Key) -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 9a5689401..d327723f5 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -369,7 +369,7 @@ init([Parent, #{zone := Zone, emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false), ok = emqx_gc:init(GcPolicy), - erlang:put(force_shutdown_policy, emqx_zone:get_env(Zone, force_shutdown_policy)), + ok = emqx_misc:init_proc_mng_policy(Zone), ok = proc_lib:init_ack(Parent, {ok, self()}), gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). diff --git a/test/emqx_misc_tests.erl b/test/emqx_misc_tests.erl index 40a9aae87..50513ee86 100644 --- a/test/emqx_misc_tests.erl +++ b/test/emqx_misc_tests.erl @@ -24,23 +24,19 @@ timer_cancel_flush_test() -> shutdown_disabled_test() -> self() ! foo, - ?assertEqual(continue, conn_proc_mng_policy(0, 0)), + ?assertEqual(continue, conn_proc_mng_policy(0)), receive foo -> ok end, - ?assertEqual(hibernate, conn_proc_mng_policy(0, 0)). + ?assertEqual(hibernate, conn_proc_mng_policy(0)). message_queue_too_long_test() -> self() ! foo, self() ! bar, ?assertEqual({shutdown, message_queue_too_long}, - conn_proc_mng_policy(1, 0)), + conn_proc_mng_policy(1)), receive foo -> ok end, - ?assertEqual(continue, conn_proc_mng_policy(1, 0)), + ?assertEqual(continue, conn_proc_mng_policy(1)), receive bar -> ok end. -total_heap_size_too_large_test() -> - ?assertEqual({shutdown, total_heap_size_too_large}, - conn_proc_mng_policy(0, 1)). +conn_proc_mng_policy(L) -> + emqx_misc:conn_proc_mng_policy(#{message_queue_len => L}). -conn_proc_mng_policy(L, S) -> - emqx_misc:conn_proc_mng_policy(#{message_queue_len => L, - total_heap_size => S}).