diff --git a/etc/emqx.conf b/etc/emqx.conf index 1d623953b..df50884e9 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -371,8 +371,8 @@ log.file = emqx.log ## Limits the total number of characters printed for each log event. ## ## Value: Integer -## Default: 1024 -log.chars_limit = 1024 +## Default: 8192 +log.chars_limit = 8192 ## Maximum size of each log file. ## @@ -550,6 +550,18 @@ zone.external.acl_deny_action = ignore ## Numbers delimited by `|'. Zero or negative is to disable. zone.external.force_gc_policy = 1000|1MB +## Max message queue length and total heap size to force shutdown +## connection/session process. +## Message queue here is the Erlang process mailbox, but not the number +## of queued MQTT messages of QoS 1 and 2. +## +## Numbers delimited by `|'. Zero or negative is to disable. +## +## Default: +## - 8000|800MB on ARCH_64 system +## - 1000|100MB on ARCH_32 sytem +## zone.external.force_shutdown_policy = 8000|800MB + ## Maximum MQTT packet size allowed. ## ## Value: Bytes @@ -2210,19 +2222,43 @@ broker.route_batch_clean = off ## System Monitor ##-------------------------------------------------------------------- -## Enable Long GC monitoring. +## Enable Long GC monitoring. Disable if the value is 0. ## Notice: don't enable the monitor in production for: ## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 ## -## Value: true | false -sysmon.long_gc = false +## Value: Duration +## - h: hour +## - m: minute +## - s: second +## - ms: milliseconds +## +## Examples: +## - 2h: 2 hours +## - 30m: 30 minutes +## - 0.1s: 0.1 seconds +## - 100ms : 100 milliseconds +## +## Default: 0ms +sysmon.long_gc = 0 ## Enable Long Schedule(ms) monitoring. ## ## See: http://erlang.org/doc/man/erlang.html#system_monitor-2 ## -## Value: Number -sysmon.long_schedule = 240 +## Value: Duration +## - h: hour +## - m: minute +## - s: second +## - ms: milliseconds +## +## Examples: +## - 2h: 2 hours +## - 30m: 30 minutes +## - 0.1s: 0.1 seconds +## - 100ms: 100 milliseconds +## +## Default: 0ms +sysmon.long_schedule = 240ms ## Enable Large Heap monitoring. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 5c7f421ce..804ab041d 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -400,7 +400,7 @@ end}. {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} ]}. -{mapping, "log.primary_level", "emqx.primary_log_level", [ +{mapping, "log.primary_level", "kernel.primary_log_level", [ {default, error}, {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} ]}. @@ -420,8 +420,8 @@ end}. {datatype, file} ]}. -{mapping, "log.chars_limit", "log.chars_limit", [ - {default, 1024}, +{mapping, "log.chars_limit", "kernel.logger", [ + {default, 8192}, {datatype, integer} ]}. @@ -462,10 +462,6 @@ end}. hidden ]}. -{translation, "emqx.primary_log_level", fun(Conf) -> - cuttlefish:conf_get("log.level", Conf) -end}. - {translation, "kernel.logger", fun(Conf) -> LogTo = cuttlefish:conf_get("log.to", Conf), LogLevel = cuttlefish:conf_get("log.level", Conf), @@ -854,7 +850,7 @@ end}. %% of queued MQTT messages of QoS 1 and 2. %% Zero or negative is to disable. {mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [ - {default, "0 | 0MB"}, + {default, "default"}, {datatype, string} ]}. @@ -903,15 +899,34 @@ end}. count => list_to_integer(Count)} end, {force_gc_policy, GcPolicy}; + ("force_shutdown_policy", "default") -> + {DefaultLen, DefaultSize} = + case erlang:system_info(wordsize) of + 8 -> % arch_64 + {8000, cuttlefish_bytesize:parse("800MB")}; + 4 -> % arch_32 + {1000, cuttlefish_bytesize:parse("100MB")} + end, + {force_shutdown_policy, #{message_queue_len => DefaultLen, + max_heap_size => DefaultSize}}; ("force_shutdown_policy", Val) -> [Len, Siz] = string:tokens(Val, "| "), - ShutdownPolicy = case cuttlefish_bytesize:parse(Siz) of - {error, Reason} -> - error(Reason); - Siz1 -> - #{message_queue_len => list_to_integer(Len), - max_heap_size => Siz1} - end, + MaxSiz = case erlang:system_info(wordsize) of + 8 -> % arch_64 + (1 bsl 59) - 1; + 4 -> % arch_32 + (1 bsl 27) - 1 + end, + ShutdownPolicy = + case cuttlefish_bytesize:parse(Siz) of + {error, Reason} -> + error(Reason); + Siz1 when Siz1 > MaxSiz -> + cuttlefish:invalid(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz])); + Siz1 -> + #{message_queue_len => list_to_integer(Len), + max_heap_size => Siz1} + end, {force_shutdown_policy, ShutdownPolicy}; ("mqueue_priorities", Val) -> case Val of @@ -2059,14 +2074,14 @@ end}. %% @doc Long GC, don't monitor in production mode for: %% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 {mapping, "sysmon.long_gc", "emqx.sysmon", [ - {default, false}, - {datatype, {enum, [true, false]}} + {default, 0}, + {datatype, [integer, {duration, ms}]} ]}. %% @doc Long Schedule(ms) {mapping, "sysmon.long_schedule", "emqx.sysmon", [ - {default, 1000}, - {datatype, integer} + {default, 240}, + {datatype, [integer, {duration, ms}]} ]}. %% @doc Large Heap diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 2ef660521..bc9d58408 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -31,7 +31,7 @@ start(_Type, _Args) -> %% kernel config `logger_level` before starting the erlang vm. %% This is because the latter approach an annoying debug msg will be printed out: %% "[debug] got_unexpected_message {'EXIT',<0.1198.0>,normal}" - logger:set_primary_config(level, application:get_env(emqx, primary_log_level, error)), + logger:set_primary_config(level, application:get_env(kernel, primary_log_level, error)), print_banner(), ekka:start(), diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index bec56f142..771069704 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -73,6 +73,8 @@ -define(PUBSUB_STATS, [ 'topics/count', 'topics/max', + 'suboptions/count', + 'suboptions/max', 'subscribers/count', 'subscribers/max', 'subscriptions/count', @@ -242,9 +244,12 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ safe_update_element(Key, Val) -> - try ets:update_element(?TAB, Key, {2, Val}) + try ets:update_element(?TAB, Key, {2, Val}) of + false -> + ets:insert_new(?TAB, {Key, Val}); + true -> + true catch error:badarg -> - ets:insert_new(?TAB, {Key, Val}) + ?LOG(warning, "[Stats] Update ~p to ~p failed", [Key, Val]) end. - diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index 03cc95819..b75503b53 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -33,9 +33,9 @@ , code_change/3 ]). --type(option() :: {long_gc, false | pos_integer()} - | {long_schedule, false | pos_integer()} - | {large_heap, pos_integer()} +-type(option() :: {long_gc, non_neg_integer()} + | {long_schedule, non_neg_integer()} + | {large_heap, non_neg_integer()} | {busy_port, boolean()} | {busy_dist_port, boolean()}). @@ -66,11 +66,11 @@ parse_opt(Opts) -> parse_opt(Opts, []). parse_opt([], Acc) -> Acc; -parse_opt([{long_gc, false}|Opts], Acc) -> +parse_opt([{long_gc, 0}|Opts], Acc) -> parse_opt(Opts, Acc); parse_opt([{long_gc, Ms}|Opts], Acc) when is_integer(Ms) -> parse_opt(Opts, [{long_gc, Ms}|Acc]); -parse_opt([{long_schedule, false}|Opts], Acc) -> +parse_opt([{long_schedule, 0}|Opts], Acc) -> parse_opt(Opts, Acc); parse_opt([{long_schedule, Ms}|Opts], Acc) when is_integer(Ms) -> parse_opt(Opts, [{long_schedule, Ms}|Acc]); @@ -177,4 +177,3 @@ safe_publish(Event, WarnMsg) -> sysmon_msg(Topic, Payload) -> Msg = emqx_message:make(?SYSMON, Topic, Payload), emqx_message:set_flag(sys, Msg). - diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 44e1eb681..e828dd0a2 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -160,6 +160,7 @@ websocket_init(#state{request = Req, options = Options}) -> EnableStats = emqx_zone:get_env(Zone, enable_stats, true), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), emqx_logger:set_metadata_peername(esockd_net:format(Peername)), + ok = emqx_misc:init_proc_mng_policy(Zone), {ok, #state{peername = Peername, sockname = Sockname, parse_state = ParserState,