From 2ba624ac3189ffc24ea3ccb3b819e9d13d1964ba Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 13 Jan 2020 21:00:40 +0800 Subject: [PATCH 01/16] Add 'compose/1', 'compose/2' functions --- src/emqx_misc.erl | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 0d4270dc9..f5fe5029f 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -23,6 +23,8 @@ -export([ merge_opts/2 , maybe_apply/2 + , compose/1 + , compose/2 , run_fold/3 , pipeline/3 , start_timer/2 @@ -61,6 +63,15 @@ maybe_apply(_Fun, undefined) -> maybe_apply(Fun, Arg) when is_function(Fun) -> erlang:apply(Fun, [Arg]). +-spec(compose(list(F)) -> G when F :: fun((any()) -> any()), + G :: fun((any()) -> any())). +compose([F|More]) -> compose(F, More). + +-spec(compose(fun((X) -> Y), fun((Y) -> Z)) -> fun((X) -> Z)). +compose(F, G) when is_function(G) -> fun(X) -> G(F(X)) end; +compose(F, [G]) -> compose(F, G); +compose(F, [G|More]) -> compose(compose(F, G), More). + %% @doc RunFold run_fold([], Acc, _State) -> Acc; From 40ed0ce7add31cac7df21cb52b84fe2e14e8e045 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Wed, 15 Jan 2020 17:18:34 +0800 Subject: [PATCH 02/16] Defaults to disable force_shutdown_policy --- etc/emqx.conf | 12 ++++++------ priv/emqx.schema | 12 ------------ 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 0162fae7c..99934f3ab 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -820,12 +820,12 @@ zone.internal.mqueue_store_qos0 = true ## Value: on | off zone.internal.enable_flapping_detect = off -## See zone.$name.force_shutdown_policy -## -## Default: -## - 10000|32MB on ARCH_64 system -## - 10000|16MB on ARCH_32 sytem -zone.internal.force_shutdown_policy = 100000|64MB +# ## See zone.$name.force_shutdown_policy +# ## +# ## Default: +# ## - 10000|32MB on ARCH_64 system +# ## - 10000|16MB on ARCH_32 sytem +# zone.internal.force_shutdown_policy = 100000|64MB ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index bbd0fb4ab..527e36a7a 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -923,7 +923,6 @@ end}. %% of queued MQTT messages of QoS 1 and 2. %% Zero or negative is to disable. {mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [ - {default, "default"}, {datatype, string} ]}. @@ -963,17 +962,6 @@ end}. count => list_to_integer(Count)} end, {force_gc_policy, GcPolicy}; - ("force_shutdown_policy", "default") -> - {DefaultLen, DefaultSize} = - case WordSize = erlang:system_info(wordsize) of - 8 -> % arch_64 - {10000, cuttlefish_bytesize:parse("32MB")}; - 4 -> % arch_32 - {10000, cuttlefish_bytesize:parse("16MB")} - end, - {force_shutdown_policy, #{message_queue_len => DefaultLen, - max_heap_size => DefaultSize div WordSize - }}; ("force_shutdown_policy", Val) -> [Len, Siz] = string:tokens(Val, "| "), MaxSiz = case WordSize = erlang:system_info(wordsize) of From 9a76164e65b5cea545f08271dddcaa9399260b47 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Wed, 15 Jan 2020 18:34:10 +0800 Subject: [PATCH 03/16] Update emqx.conf --- etc/emqx.conf | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 99934f3ab..8a2fc1996 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -820,12 +820,12 @@ zone.internal.mqueue_store_qos0 = true ## Value: on | off zone.internal.enable_flapping_detect = off -# ## See zone.$name.force_shutdown_policy -# ## -# ## Default: -# ## - 10000|32MB on ARCH_64 system -# ## - 10000|16MB on ARCH_32 sytem -# zone.internal.force_shutdown_policy = 100000|64MB +## See zone.$name.force_shutdown_policy +## +## Default: +## - 10000|32MB on ARCH_64 system +## - 10000|16MB on ARCH_32 sytem +## zone.internal.force_shutdown_policy = 100000|64MB ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## From f6b2c9a69fced22851d5d39de546dd9fd2da430c Mon Sep 17 00:00:00 2001 From: turtleDeng Date: Thu, 16 Jan 2020 23:11:19 +0800 Subject: [PATCH 04/16] Correct timestamp for banned (#3188) --- priv/emqx.schema | 8 ++++---- src/emqx_banned.erl | 2 +- src/emqx_flapping.erl | 2 +- test/emqx_flapping_SUITE.erl | 4 ++-- test/mqtt_protocol_v5_SUITE.erl | 17 +++++++++++------ 5 files changed, 19 insertions(+), 14 deletions(-) diff --git a/priv/emqx.schema b/priv/emqx.schema index 527e36a7a..2763078f7 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -640,15 +640,15 @@ end}. {translation, "emqx.flapping_detect_policy", fun(Conf) -> Policy = cuttlefish:conf_get("flapping_detect_policy", Conf), [Threshold, Duration, Interval] = string:tokens(Policy, ", "), - ParseDuration = fun(S) -> - case cuttlefish_duration:parse(S, ms) of + ParseDuration = fun(S, Dur) -> + case cuttlefish_duration:parse(S, Dur) of I when is_integer(I) -> I; {error, Reason} -> error(Reason) end end, #{threshold => list_to_integer(Threshold), - duration => ParseDuration(Duration), - banned_interval => ParseDuration(Interval) + duration => ParseDuration(Duration, ms), + banned_interval => ParseDuration(Interval, s) } end}. diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index 1ca0ecf8f..80f93be70 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -85,7 +85,7 @@ do_check(Who) when is_tuple(Who) -> case mnesia:dirty_read(?BANNED_TAB, Who) of [] -> false; [#banned{until = Until}] -> - Until > erlang:system_time(millisecond) + Until > erlang:system_time(second) end. -spec(create(emqx_types:banned()) -> ok). diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index c1faa7e95..2256ef015 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -124,7 +124,7 @@ handle_cast({detected, #flapping{clientid = ClientId, true -> %% Flapping happened:( ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms", [ClientId, inet:ntoa(PeerHost), DetectCnt, Duration]), - Now = erlang:system_time(millisecond), + Now = erlang:system_time(second), Banned = #banned{who = {clientid, ClientId}, by = <<"flapping detector">>, reason = <<"flapping is detected">>, diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl index 4d9fd5907..223fd5d7e 100644 --- a/test/emqx_flapping_SUITE.erl +++ b/test/emqx_flapping_SUITE.erl @@ -31,7 +31,7 @@ set_special_configs(emqx) -> application:set_env(emqx, flapping_detect_policy, #{threshold => 3, duration => 100, - banned_interval => 200 + banned_interval => 2 }); set_special_configs(_App) -> ok. @@ -52,7 +52,7 @@ t_detect_check(_) -> true = emqx_flapping:detect(ClientInfo), timer:sleep(100), true = emqx_banned:check(ClientInfo), - timer:sleep(200), + timer:sleep(3000), false = emqx_banned:check(ClientInfo), Childrens = supervisor:which_children(emqx_cm_sup), {flapping, Pid, _, _} = lists:keyfind(flapping, 1, Childrens), diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/mqtt_protocol_v5_SUITE.erl index 0f5fe3f07..01d4391f0 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/mqtt_protocol_v5_SUITE.erl @@ -147,8 +147,8 @@ t_connect_keepalive_timeout(_) -> Msg -> ReasonCode = 141, ?assertMatch({disconnected, ReasonCode, _Channel}, Msg) - after - round(timer:seconds(Keepalive) * 2 * 1.5 ) -> error("keepalive timeout") + after round(timer:seconds(Keepalive) * 2 * 1.5 ) -> + error("keepalive timeout") end. %%-------------------------------------------------------------------- @@ -160,7 +160,7 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) -> application:set_env(emqx, shared_dispatch_ack_enabled, true), Topic = nth(1, ?TOPICS), - Shared_topic = list_to_binary("$share/sharename/" ++ binary_to_list(<<"TopicA">>)), + SharedTopic = list_to_binary("$share/sharename/" ++ binary_to_list(<<"TopicA">>)), CRef = counters:new(1, [atomics]), meck:expect(emqtt, connected, @@ -174,18 +174,23 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) -> {clientid, <<"sub_client_1">>}, {keepalive, 5}]), {ok, _} = emqtt:connect(Sub1), - {ok, _, [2]} = emqtt:subscribe(Sub1, Shared_topic, qos2), + {ok, _, [2]} = emqtt:subscribe(Sub1, SharedTopic, qos2), + {ok, Sub2} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"sub_client_2">>}, {keepalive, 5}]), {ok, _} = emqtt:connect(Sub2), - {ok, _, [2]} = emqtt:subscribe(Sub2, Shared_topic, qos2), + {ok, _, [2]} = emqtt:subscribe(Sub2, SharedTopic, qos2), {ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>}]), {ok, _} = emqtt:connect(Pub), {ok, _} = emqtt:publish(Pub, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2), receive - {disconnected,shutdown,for_testiong} -> ok + {'EXIT', _,{shutdown, for_testiong}} -> + ok + after 1000 -> + error("disconnected timeout") end, + ?assertEqual(1, counters:get(CRef, 1)). From 2b3003b323ab2afe9683bd7ff579f58735a052b6 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 16 Jan 2020 23:45:17 +0800 Subject: [PATCH 05/16] Move the global GC server from 'emqx-recon' to 'emqx' project (#3190) --- etc/emqx.conf | 12 +++++ priv/emqx.schema | 7 ++- src/emqx_global_gc.erl | 97 +++++++++++++++++++++++++++++++++++ src/emqx_kernel_sup.erl | 10 ++-- test/emqx_global_gc_SUITE.erl | 33 ++++++++++++ 5 files changed, 153 insertions(+), 6 deletions(-) create mode 100644 src/emqx_global_gc.erl create mode 100644 test/emqx_global_gc_SUITE.erl diff --git a/etc/emqx.conf b/etc/emqx.conf index 8a2fc1996..1564ed73d 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -249,6 +249,18 @@ node.dist_buffer_size = 8MB ## vm.args: +e Number node.max_ets_tables = 256000 +## Global GC Interval. +## +## Value: Duration +## +## Examples: +## - 2h: 2 hours +## - 30m: 30 minutes +## - 20s: 20 seconds +## +## Defaut: 15 minutes +node.global_gc_interval = 15m + ## Tweak GC to run more often. ## ## Value: Number [0-65535] diff --git a/priv/emqx.schema b/priv/emqx.schema index 2763078f7..642da69a2 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -205,8 +205,6 @@ end}. {default, "emqx@127.0.0.1"} ]}. - - %% @doc Specify SSL Options in the file if using SSL for erlang distribution {mapping, "node.ssl_dist_optfile", "vm_args.-ssl_dist_optfile", [ {datatype, string}, @@ -287,6 +285,11 @@ end}. end }. +%% @doc Global GC Interval +{mapping, "node.global_gc_interval", "emqx.global_gc_interval", [ + {datatype, {duration, s}} +]}. + %% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2 {mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [ {default, 1000}, diff --git a/src/emqx_global_gc.erl b/src/emqx_global_gc.erl new file mode 100644 index 000000000..a3be46a3d --- /dev/null +++ b/src/emqx_global_gc.erl @@ -0,0 +1,97 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_global_gc). + +-behaviour(gen_server). + +-include("types.hrl"). + +-export([start_link/0, stop/0]). + +-export([run/0]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +%% 5 minutes +%% -define(DEFAULT_INTERVAL, 300000). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +-spec(start_link() -> startlink_ret()). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec(run() -> {ok, timer:time()}). +run() -> gen_server:call(?MODULE, run, infinity). + +-spec(stop() -> ok). +stop() -> gen_server:stop(?MODULE). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + {ok, ensure_timer(#{timer => undefined})}. + +handle_call(run, _From, State) -> + {Time, _} = timer:tc(fun run_gc/0), + {reply, {ok, Time div 1000}, State, hibernate}; + +handle_call(_Req, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({timeout, TRef, run}, State = #{timer := TRef}) -> + run_gc(), + {noreply, ensure_timer(State), hibernate}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internel function +%%-------------------------------------------------------------------- + +ensure_timer(State) -> + case emqx:get_env(global_gc_interval) of + undefined -> State; + Interval -> TRef = emqx_misc:start_timer(timer:seconds(Interval), run), + State#{timer := TRef} + end. + +run_gc() -> + [garbage_collect(P) || P <- processes(), + {status, waiting} == process_info(P, status)]. + diff --git a/src/emqx_kernel_sup.erl b/src/emqx_kernel_sup.erl index 87beda11d..a1ba9cfe4 100644 --- a/src/emqx_kernel_sup.erl +++ b/src/emqx_kernel_sup.erl @@ -27,7 +27,8 @@ start_link() -> init([]) -> {ok, {{one_for_one, 10, 100}, - [child_spec(emqx_pool_sup, supervisor), + [child_spec(emqx_global_gc, worker), + child_spec(emqx_pool_sup, supervisor), child_spec(emqx_hooks, worker), child_spec(emqx_stats, worker), child_spec(emqx_metrics, worker), @@ -40,7 +41,8 @@ child_spec(M, worker) -> restart => permanent, shutdown => 5000, type => worker, - modules => [M]}; + modules => [M] + }; child_spec(M, supervisor) -> #{id => M, @@ -48,6 +50,6 @@ child_spec(M, supervisor) -> restart => permanent, shutdown => infinity, type => supervisor, - modules => [M]}. - + modules => [M] + }. diff --git a/test/emqx_global_gc_SUITE.erl b/test/emqx_global_gc_SUITE.erl new file mode 100644 index 000000000..528189300 --- /dev/null +++ b/test/emqx_global_gc_SUITE.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_global_gc_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +t_run_gc(_) -> + ok = application:set_env(emqx, global_gc_interval, 1), + {ok, _} = emqx_global_gc:start_link(), + ok = timer:sleep(1500), + {ok, MilliSecs} = emqx_global_gc:run(), + ct:print("Global GC: ~w(ms)~n", [MilliSecs]), + emqx_global_gc:stop(). + From a71486cac7545f8155dab9c1db6f635eb5e41ba9 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 17 Jan 2020 09:32:01 +0800 Subject: [PATCH 06/16] Tuning the number of threads in async thread pool (#3193) --- etc/emqx.conf | 2 +- etc/vm.args | 15 ++++++++++----- priv/emqx.schema | 1 - 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 1564ed73d..0c4ea81a5 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -211,7 +211,7 @@ node.data_dir = {{ platform_data_dir }} ## Value: 0-1024 ## ## vm.args: +A Number -node.async_threads = 32 +## node.async_threads = 4 ## Sets the maximum number of simultaneously existing processes for this ## system if a Number is passed as value. diff --git a/etc/vm.args b/etc/vm.args index 43e6467d9..a32619be1 100644 --- a/etc/vm.args +++ b/etc/vm.args @@ -1,6 +1,6 @@ -############################## -# Erlang VM Args -############################## +###################################################################### +## Erlang VM Args +###################################################################### ## NOTE: ## @@ -52,7 +52,8 @@ +spp true ## Sets the number of threads in async thread pool. Valid range is 0-1024. -#+A 8 +## Increase the parameter if there are many simultaneous file I/O operations. ++A 4 ## Sets the default heap size of processes to the size Size. #+hms 233 @@ -92,4 +93,8 @@ #+sct L0-3c0-3p0N0:L4-7c0-3p1N1 ## Sets the mapping of warning messages for error_logger -#+W w \ No newline at end of file +#+W w + +## Specifies how long time (in milliseconds) to spend shutting down the system. +## See: http://erlang.org/doc/man/erl.html +#-shutdown_time 15000 diff --git a/priv/emqx.schema b/priv/emqx.schema index 642da69a2..a6966f0a5 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -235,7 +235,6 @@ end}. %% @doc More information at: http://erlang.org/doc/man/erl.html {mapping, "node.async_threads", "vm_args.+A", [ - {default, 64}, {datatype, integer}, {validators, ["range:0-1024"]} ]}. From a318532bb04abf014b2e668f273c529675c84579 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 17 Jan 2020 09:42:16 +0800 Subject: [PATCH 07/16] Tuning the 'force_gc_policy' of MQTT connections (#3192) Tuning the 'force_gc_policy' of MQTT connections --- etc/emqx.conf | 7 +++++-- priv/emqx.schema | 1 - src/emqx_misc.erl | 3 +-- src/emqx_ws_connection.erl | 4 ++-- src/emqx_zone.erl | 4 +++- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 0c4ea81a5..b331fefd7 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -601,11 +601,11 @@ zone.external.enable_stats = on ## Default: ignore zone.external.acl_deny_action = ignore -## Force MQTT connection/session process GC after this number of +## Force the MQTT connection process GC after this number of ## messages | bytes passed through. ## ## Numbers delimited by `|'. Zero or negative is to disable. -zone.external.force_gc_policy = 1000|1MB +zone.external.force_gc_policy = 10000|10MB ## Max message queue length and total heap size to force shutdown ## connection/session process. @@ -792,6 +792,9 @@ zone.internal.enable_acl = off ## Default: ignore zone.internal.acl_deny_action = ignore +## See zone.$name.force_gc_policy +## zone.internal.force_gc_policy = 100000|100MB + ## See zone.$name.wildcard_subscription. ## ## Value: boolean diff --git a/priv/emqx.schema b/priv/emqx.schema index a6966f0a5..662415a68 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -915,7 +915,6 @@ end}. %% messages | bytes passed through. %% Numbers delimited by `|'. Zero or negative is to disable. {mapping, "zone.$name.force_gc_policy", "emqx.zones", [ - {default, "0 | 0MB"}, {datatype, string} ]}. diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index f5fe5029f..d08d3ba35 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -58,8 +58,7 @@ merge_opts(Defaults, Options) -> %% @doc Apply a function to a maybe argument. -spec(maybe_apply(fun((maybe(A)) -> maybe(A)), maybe(A)) -> maybe(A) when A :: any()). -maybe_apply(_Fun, undefined) -> - undefined; +maybe_apply(_Fun, undefined) -> undefined; maybe_apply(Fun, Arg) when is_function(Fun) -> erlang:apply(Fun, [Arg]). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index ceae32823..204dc82ee 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -429,9 +429,9 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) -> run_gc(Stats, State = #state{gc_state = GcSt}) -> case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of + false -> State; {_IsGC, GcSt1} -> - State#state{gc_state = GcSt1}; - false -> State + State#state{gc_state = GcSt1} end. check_oom(State = #state{channel = Channel}) -> diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 0304f4c8a..68befdfdc 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -45,6 +45,8 @@ , session_expiry_interval/1 , force_gc_policy/1 , force_shutdown_policy/1 + , get_env/2 + , get_env/3 ]}). %% APIs @@ -114,7 +116,7 @@ start_link() -> stop() -> gen_server:stop(?SERVER). --spec(init_gc_state(zone()) -> emqx_gc:gc_state()). +-spec(init_gc_state(zone()) -> maybe(emqx_gc:gc_state())). init_gc_state(Zone) -> maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)). From 3ae3d8a40d9d0d38faecf43775a28d72051c0ac8 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 17 Jan 2020 09:58:34 +0800 Subject: [PATCH 08/16] Tune and optimize the Erlang VM (#3195) --- etc/emqx.conf | 12 ++++++------ etc/vm.args | 11 ++++++----- priv/emqx.schema | 9 ++------- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index b331fefd7..4c8195b1a 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -221,7 +221,7 @@ node.data_dir = {{ platform_data_dir }} ## Value: Number [1024-134217727] ## ## vm.args: +P Number -node.process_limit = 2048000 +## node.process_limit = 2048000 ## Sets the maximum number of simultaneously existing ports for this system. ## @@ -230,7 +230,7 @@ node.process_limit = 2048000 ## Value: Number [1024-134217727] ## ## vm.args: +Q Number -node.max_ports = 1024000 +## node.max_ports = 1024000 ## Set the distribution buffer busy limit (dist_buf_busy_limit). ## @@ -239,7 +239,7 @@ node.max_ports = 1024000 ## Value: Number [1KB-2GB] ## ## vm.args: +zdbbl size -node.dist_buffer_size = 8MB +## node.dist_buffer_size = 8MB ## Sets the maximum number of ETS tables. Note that mnesia and SSL will ## create temporary ETS tables. @@ -247,7 +247,7 @@ node.dist_buffer_size = 8MB ## Value: Number ## ## vm.args: +e Number -node.max_ets_tables = 256000 +## node.max_ets_tables = 256000 ## Global GC Interval. ## @@ -266,7 +266,7 @@ node.global_gc_interval = 15m ## Value: Number [0-65535] ## ## vm.args: -env ERL_FULLSWEEP_AFTER Number -node.fullsweep_after = 1000 +## node.fullsweep_after = 1000 ## Crash dump log file. ## @@ -289,7 +289,7 @@ node.crash_dump = {{ platform_log_dir }}/crash.dump ## Value: Number ## ## vm.args: -kernel net_ticktime Number -node.dist_net_ticktime = 60 +## node.dist_net_ticktime = 120 ## Sets the port range for the listener socket of a distributed Erlang node. ## Note that if there are firewalls between clustered nodes, this port segment diff --git a/etc/vm.args b/etc/vm.args index a32619be1..37c09da31 100644 --- a/etc/vm.args +++ b/etc/vm.args @@ -10,13 +10,13 @@ ## such as `node.name` for `-name` and `node.cooke` for `-setcookie`. ## Sets the maximum number of simultaneously existing processes for this system. -#+P 2048000 ++P 2048000 ## Sets the maximum number of simultaneously existing ports for this system. -#+Q 1024000 ++Q 1024000 ## Sets the maximum number of ETS tables -#+e 256000 ++e 256000 ## Sets the maximum number of atoms the virtual machine can handle. #+t 1048576 @@ -26,7 +26,7 @@ ## Set how many times generational garbages collections can be done without ## forcing a fullsweep collection. -#-env ERL_FULLSWEEP_AFTER 1000 +-env ERL_FULLSWEEP_AFTER 1000 ## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive ## (Disabled by default..use with caution!) @@ -43,7 +43,7 @@ ## Specifies the net_kernel tick time in seconds. ## This is the approximate time a connected node may be unresponsive until ## it is considered down and thereby disconnected. -#-kernel net_ticktime 60 +-kernel net_ticktime 120 ## Sets the distribution buffer busy limit (dist_buf_busy_limit). #+zdbbl 8192 @@ -98,3 +98,4 @@ ## Specifies how long time (in milliseconds) to spend shutting down the system. ## See: http://erlang.org/doc/man/erl.html #-shutdown_time 15000 + diff --git a/priv/emqx.schema b/priv/emqx.schema index 662415a68..fb05700c5 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -242,16 +242,12 @@ end}. %% @doc Erlang Process Limit {mapping, "node.process_limit", "vm_args.+P", [ {datatype, integer}, - {default, 256000}, hidden ]}. -%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q -%% @doc The number of concurrent ports/sockets +%% @doc The maximum number of concurrent ports/sockets. %% Valid range is 1024-134217727 -{mapping, "node.max_ports", - cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [ - {default, 262144}, +{mapping, "node.max_ports", "vm_args.+Q", [ {datatype, integer}, {validators, ["range4ports"]} ]}. @@ -319,7 +315,6 @@ end}. %% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime {mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [ - {commented, 60}, {datatype, integer}, hidden ]}. From 2897e4fa06c287a621e4627631013523d33995df Mon Sep 17 00:00:00 2001 From: turtleDeng Date: Fri, 17 Jan 2020 10:08:34 +0800 Subject: [PATCH 09/16] Fix the bug that cannot add a second MQTT-WS TCP Port (#3196) --- src/emqx_listeners.erl | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 39f0c29c8..2990878dd 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -82,7 +82,7 @@ start_mqtt_listener(Name, ListenOn, Options) -> {emqx_connection, start_link, [Options -- SockOpts]}). start_http_listener(Start, Name, ListenOn, RanchOpts, ProtoOpts) -> - Start(Name, with_port(ListenOn, RanchOpts), ProtoOpts). + Start(ws_name(Name, ListenOn), with_port(ListenOn, RanchOpts), ProtoOpts). mqtt_path(Options) -> proplists:get_value(mqtt_path, Options, "/mqtt"). @@ -125,10 +125,10 @@ restart_listener(tcp, ListenOn, _Options) -> restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls -> esockd:reopen('mqtt:ssl', ListenOn); restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> - cowboy:stop_listener('mqtt:ws'), + cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)), start_listener(Proto, ListenOn, Options); restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> - cowboy:stop_listener('mqtt:wss'), + cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)), start_listener(Proto, ListenOn, Options); restart_listener(Proto, ListenOn, _Opts) -> esockd:reopen(Proto, ListenOn). @@ -156,10 +156,10 @@ stop_listener(tcp, ListenOn, _Opts) -> esockd:close('mqtt:tcp', ListenOn); stop_listener(Proto, ListenOn, _Opts) when Proto == ssl; Proto == tls -> esockd:close('mqtt:ssl', ListenOn); -stop_listener(Proto, _ListenOn, _Opts) when Proto == http; Proto == ws -> - cowboy:stop_listener('mqtt:ws'); -stop_listener(Proto, _ListenOn, _Opts) when Proto == https; Proto == wss -> - cowboy:stop_listener('mqtt:wss'); +stop_listener(Proto, ListenOn, _Opts) when Proto == http; Proto == ws -> + cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)); +stop_listener(Proto, ListenOn, _Opts) when Proto == https; Proto == wss -> + cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)); stop_listener(Proto, ListenOn, _Opts) -> esockd:close(Proto, ListenOn). @@ -178,3 +178,7 @@ format({Addr, Port}) when is_list(Addr) -> format({Addr, Port}) when is_tuple(Addr) -> io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). +ws_name(Name, {_Addr, Port}) -> + ws_name(Name, Port); +ws_name(Name, Port) -> + list_to_atom(lists:concat([Name, ":", Port])). From 8734922abbe11196eb6006dee2d93cd3d5dd7386 Mon Sep 17 00:00:00 2001 From: tigercl Date: Fri, 17 Jan 2020 11:37:28 +0800 Subject: [PATCH 10/16] Fix rap handling and keep the value of retain flag in bridge mode (#3189) --- src/emqx_session.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 849491952..2b03b7868 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -516,12 +516,12 @@ enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos = false}) -> enrich_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session); -enrich_subopts([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session) -> - enrich_subopts(Opts, emqx_message:set_flag(retain, true, Msg), Session); -enrich_subopts([{rap, 0}|Opts], Msg, Session) -> - enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session); enrich_subopts([{rap, 1}|Opts], Msg, Session) -> enrich_subopts(Opts, Msg, Session); +enrich_subopts([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, Session) -> + enrich_subopts(Opts, Msg, Session); +enrich_subopts([{rap, 0}|Opts], Msg, Session) -> + enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session); enrich_subopts([{subid, SubId}|Opts], Msg, Session) -> Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg), enrich_subopts(Opts, Msg1, Session). From 9049c8de55187be14ccadef44895d621742b1b04 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Fri, 17 Jan 2020 15:46:08 +0800 Subject: [PATCH 11/16] Fix coverage status --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 816b787dd..c13895c2c 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![GitHub Release](https://img.shields.io/github/release/emqx/emqx?color=brightgreen)](https://github.com/emqx/emqx/releases) [![Build Status](https://travis-ci.org/emqx/emqx.svg)](https://travis-ci.org/emqx/emqx) -[![Coverage Status](https://coveralls.io/repos/github/emqx/emqx/badge.svg)](https://coveralls.io/github/emqx/emqx) +[![Coverage Status](https://coveralls.io/repos/github/emqx/emqx/badge.svg?branch=master)](https://coveralls.io/github/emqx/emqx?branch=master) [![Docker Pulls](https://img.shields.io/docker/pulls/emqx/emqx)](https://hub.docker.com/r/emqx/emqx) [![Slack Invite]()](https://slack-invite.emqx.io) [![Twitter](https://img.shields.io/badge/Twitter-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt) From 4b8b5bfa2cba880965cd79558c31a2d14d07922b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 17 Jan 2020 15:47:13 +0800 Subject: [PATCH 12/16] Tuning the Erlang VM for the broker and edge (#3197) * Tuning the Erlang VM for the broker and edge * Remove +stbt and use +sbt instead --- etc/vm.args | 28 +++++++++++++++++++++------- etc/vm.args.edge | 32 ++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 15 deletions(-) diff --git a/etc/vm.args b/etc/vm.args index 37c09da31..435f443cb 100644 --- a/etc/vm.args +++ b/etc/vm.args @@ -1,5 +1,5 @@ ###################################################################### -## Erlang VM Args +## Erlang VM Args for EMQ X Broker ###################################################################### ## NOTE: @@ -10,13 +10,13 @@ ## such as `node.name` for `-name` and `node.cooke` for `-setcookie`. ## Sets the maximum number of simultaneously existing processes for this system. -+P 2048000 ++P 2097152 ## Sets the maximum number of simultaneously existing ports for this system. -+Q 1024000 ++Q 1048576 ## Sets the maximum number of ETS tables -+e 256000 ++e 262144 ## Sets the maximum number of atoms the virtual machine can handle. #+t 1048576 @@ -61,11 +61,20 @@ ## Sets the default binary virtual heap size of processes to the size Size. #+hmbs 46422 +## Sets the default maximum heap size of processes to the size Size. +## Defaults to 0, which means that no maximum heap size is used. +##For more information, see process_flag(max_heap_size, MaxHeapSize). +#+hmax 0 + +## Sets the default value for process flag message_queue_data. Defaults to on_heap. +#+hmqd on_heap | off_heap + ## Sets the number of IO pollsets to use when polling for I/O. #+IOp 1 ## Sets the number of IO poll threads to use when polling for I/O. -#+IOt 1 +## Increase this for the busy systems with many concurrent connection. ++IOt 4 ## Sets the number of scheduler threads to create and scheduler threads to set online. #+S 8:8 @@ -74,7 +83,7 @@ #+SDcpu 8:8 ## Sets the number of dirty I/O scheduler threads to create. -#+SDio 10 ++SDio 8 ## Suggested stack size, in kilowords, for scheduler threads. #+sss 32 @@ -95,7 +104,12 @@ ## Sets the mapping of warning messages for error_logger #+W w +## Sets time warp mode: no_time_warp | single_time_warp | multi_time_warp +#+C no_time_warp + +## Prevents loading information about source filenames and line numbers. +#+L + ## Specifies how long time (in milliseconds) to spend shutting down the system. ## See: http://erlang.org/doc/man/erl.html #-shutdown_time 15000 - diff --git a/etc/vm.args.edge b/etc/vm.args.edge index ccf4969e3..9f722d1dd 100644 --- a/etc/vm.args.edge +++ b/etc/vm.args.edge @@ -1,6 +1,6 @@ -############################## -# Erlang VM Args -############################## +###################################################################### +## Erlang VM Args for EMQ X Edge +###################################################################### ## NOTE: ## @@ -10,8 +10,7 @@ ## such as `node.name` for `-name` and `node.cooke` for `-setcookie`. ## Sets the maximum number of simultaneously existing processes for this system. -+P 20480 - ++P 16384 ## Sets the maximum number of simultaneously existing ports for this system. +Q 4096 @@ -19,7 +18,7 @@ +e 512 ## Sets the maximum number of atoms the virtual machine can handle. -+t 65536 ++t 262144 ## Set the location of crash dumps -env ERL_CRASH_DUMP {{ platform_log_dir }}/crash.dump @@ -30,7 +29,7 @@ ## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive ## (Disabled by default..use with caution!) -#-heart +-heart ## Specify the erlang distributed protocol. ## Can be one of: inet_tcp, inet6_tcp, inet_tls @@ -52,6 +51,7 @@ +spp false ## Sets the number of threads in async thread pool. Valid range is 0-1024. +## Increase the parameter if there are many simultaneous file I/O operations. +A 1 ## Sets the default heap size of processes to the size Size. @@ -60,6 +60,14 @@ ## Sets the default binary virtual heap size of processes to the size Size. #+hmbs 46422 +## Sets the default maximum heap size of processes to the size Size. +## Defaults to 0, which means that no maximum heap size is used. +##For more information, see process_flag(max_heap_size, MaxHeapSize). +#+hmax 0 + +## Sets the default value for process flag message_queue_data. Defaults to on_heap. +#+hmqd on_heap | off_heap + ## Sets the number of IO pollsets to use when polling for I/O. +IOp 1 @@ -94,5 +102,13 @@ ## Sets the mapping of warning messages for error_logger #+W w -#Prevents loading information about source filenames and line numbers. +## Sets time warp mode: no_time_warp | single_time_warp | multi_time_warp +#+C no_time_warp + +## Prevents loading information about source filenames and line numbers. +L + +## Specifies how long time (in milliseconds) to spend shutting down the system. +## See: http://erlang.org/doc/man/erl.html +-shutdown_time 10000 + From 46479ed6088d7f211844e8afce74d45727510b49 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 17 Jan 2020 16:18:11 +0800 Subject: [PATCH 13/16] Tune the default config for node --- etc/emqx.conf | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 4c8195b1a..a603e5678 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -221,7 +221,7 @@ node.data_dir = {{ platform_data_dir }} ## Value: Number [1024-134217727] ## ## vm.args: +P Number -## node.process_limit = 2048000 +## node.process_limit = 2097152 ## Sets the maximum number of simultaneously existing ports for this system. ## @@ -230,9 +230,9 @@ node.data_dir = {{ platform_data_dir }} ## Value: Number [1024-134217727] ## ## vm.args: +Q Number -## node.max_ports = 1024000 +## node.max_ports = 1048576 -## Set the distribution buffer busy limit (dist_buf_busy_limit). +## Sets the distribution buffer busy limit (dist_buf_busy_limit). ## ## See: http://erlang.org/doc/man/erl.html ## @@ -247,7 +247,7 @@ node.data_dir = {{ platform_data_dir }} ## Value: Number ## ## vm.args: +e Number -## node.max_ets_tables = 256000 +## node.max_ets_tables = 262144 ## Global GC Interval. ## @@ -327,7 +327,7 @@ rpc.tcp_server_port = 5369 ## Value: Port [1024-65535] rpc.tcp_client_port = 5369 -## Number of utgoing RPC connections. +## Number of outgoing RPC connections. ## ## Value: Interger [1-256] rpc.tcp_client_num = 32 From 7a2234c608a98cd47bebb1726651cc09dbdab815 Mon Sep 17 00:00:00 2001 From: turtleDeng Date: Fri, 17 Jan 2020 16:29:46 +0800 Subject: [PATCH 14/16] Improve emqx_mqtt_caps:get_caps/1 (#3198) --- src/emqx_mqtt_caps.erl | 41 ++++++++++++++++++++++++++--------- test/emqx_mqtt_caps_SUITE.erl | 18 +++++---------- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 4f0998327..47f02edda 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -29,6 +29,8 @@ , get_caps/3 ]). +-export([default_caps/0]). + -export([default/0]). -export_type([caps/0]). @@ -116,23 +118,42 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; do_check_sub(_Flags, _Caps) -> ok. --spec(get_caps(emqx_zone:zone()) -> caps()). -get_caps(Zone) -> - maps:map(fun(Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def) end, ?DEFAULT_CAPS). +default_caps() -> + ?DEFAULT_CAPS. --spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()). -get_caps(Zone, publish) -> - filter_caps(?PUBCAP_KEYS, get_caps(Zone)); -get_caps(Zone, subscribe) -> - filter_caps(?SUBCAP_KEYS, get_caps(Zone)). - --spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()). get_caps(Zone, Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def). +get_caps(Zone, publish) -> + with_env(Zone, '$mqtt_pub_caps', + fun() -> + filter_caps(?PUBCAP_KEYS, get_caps(Zone)) + end); + +get_caps(Zone, subscribe) -> + with_env(Zone, '$mqtt_sub_caps', + fun() -> + filter_caps(?SUBCAP_KEYS, get_caps(Zone)) + end). + +get_caps(Zone) -> + with_env(Zone, '$mqtt_caps', + fun() -> + maps:map(fun(Cap, Def) -> + emqx_zone:get_env(Zone, Cap, Def) + end, ?DEFAULT_CAPS) + end). + filter_caps(Keys, Caps) -> maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). -spec(default() -> caps()). default() -> ?DEFAULT_CAPS. +with_env(Zone, Key, InitFun) -> + case emqx_zone:get_env(Zone, Key) of + undefined -> Caps = InitFun(), + ok = emqx_zone:set_env(Zone, Key, Caps), + Caps; + ZoneCaps -> ZoneCaps + end. \ No newline at end of file diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index fb1d3ab3d..695d69642 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -28,9 +28,8 @@ t_check_pub(_) -> PubCaps = #{max_qos_allowed => ?QOS_1, retain_available => false }, - lists:foreach(fun({Key, Val}) -> - ok = emqx_zone:set_env(zone, Key, Val) - end, maps:to_list(PubCaps)), + emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), + timer:sleep(50), ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1, retain => false}), PubFlags1 = #{qos => ?QOS_2, retain => false}, @@ -39,9 +38,7 @@ t_check_pub(_) -> PubFlags2 = #{qos => ?QOS_1, retain => true}, ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED}, emqx_mqtt_caps:check_pub(zone, PubFlags2)), - lists:foreach(fun({Key, _Val}) -> - true = emqx_zone:unset_env(zone, Key) - end, maps:to_list(PubCaps)). + emqx_zone:unset_env(zone, '$mqtt_pub_caps'). t_check_sub(_) -> SubOpts = #{rh => 0, @@ -54,9 +51,8 @@ t_check_sub(_) -> shared_subscription => false, wildcard_subscription => false }, - lists:foreach(fun({Key, Val}) -> - ok = emqx_zone:set_env(zone, Key, Val) - end, maps:to_list(SubCaps)), + emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), + timer:sleep(50), ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts), ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)), @@ -64,6 +60,4 @@ t_check_sub(_) -> emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)), ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})), - lists:foreach(fun({Key, _Val}) -> - true = emqx_zone:unset_env(zone, Key) - end, maps:to_list(SubCaps)). + emqx_zone:unset_env(zone, '$mqtt_pub_caps'). From 050a3feab23ff8ab308c2f16e7194e39ff21f681 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 17 Jan 2020 16:59:36 +0800 Subject: [PATCH 15/16] Tune the 'force_gc_policy' and 'force_shutdown_policy' parameters (#3201) --- etc/emqx.conf | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 4c8195b1a..1e5136228 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -605,7 +605,7 @@ zone.external.acl_deny_action = ignore ## messages | bytes passed through. ## ## Numbers delimited by `|'. Zero or negative is to disable. -zone.external.force_gc_policy = 10000|10MB +zone.external.force_gc_policy = 16000|16MB ## Max message queue length and total heap size to force shutdown ## connection/session process. @@ -617,7 +617,7 @@ zone.external.force_gc_policy = 10000|10MB ## Default: ## - 10000|32MB on ARCH_64 system ## - 10000|16MB on ARCH_32 sytem -## zone.external.force_shutdown_policy = 10000|32MB +## zone.external.force_shutdown_policy = 32000|32MB ## Maximum MQTT packet size allowed. ## @@ -793,7 +793,7 @@ zone.internal.enable_acl = off zone.internal.acl_deny_action = ignore ## See zone.$name.force_gc_policy -## zone.internal.force_gc_policy = 100000|100MB +## zone.internal.force_gc_policy = 128000|128MB ## See zone.$name.wildcard_subscription. ## @@ -840,7 +840,7 @@ zone.internal.enable_flapping_detect = off ## Default: ## - 10000|32MB on ARCH_64 system ## - 10000|16MB on ARCH_32 sytem -## zone.internal.force_shutdown_policy = 100000|64MB +## zone.internal.force_shutdown_policy = 128000|128MB ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## From 36b3a443b74bb3ef966fc9fb0fdc9ea4c4a8cebf Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 17 Jan 2020 16:31:14 +0800 Subject: [PATCH 16/16] Reduce default tcp client nums to schedulers/2 --- etc/emqx.conf | 5 +++-- priv/emqx.schema | 11 +++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 1e5136228..e8d59cb65 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -327,10 +327,11 @@ rpc.tcp_server_port = 5369 ## Value: Port [1024-65535] rpc.tcp_client_port = 5369 -## Number of utgoing RPC connections. +## Number of Outgoing RPC connections. ## ## Value: Interger [1-256] -rpc.tcp_client_num = 32 +## Defaults to NumberOfCPUSchedulers / 2 +#rpc.tcp_client_num = 1 ## RCP Client connect timeout. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index fb05700c5..a7f53ba1f 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -362,11 +362,18 @@ end}. %% Default TCP port for outgoing connections {mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [ - {default, 32}, + {default, 0}, {datatype, integer}, {validators, ["range:gt_0_lt_256"]} ]}. +{translation, "gen_rpc.tcp_client_num", fun(Conf) -> + case cuttlefish:conf_get("rpc.tcp_client_num", Conf) of + 0 -> max(1, erlang:system_info(schedulers) div 2); + V -> V + end +end}. + %% Client connect timeout {mapping, "rpc.connect_timeout", "gen_rpc.connect_timeout", [ {default, "5s"}, @@ -428,7 +435,7 @@ end}. ]}. {validator, "range:gt_0_lt_256", "must greater than 0 and less than 256", - fun(X) -> X > 0 andalso X < 256 end + fun(X) -> X >= 0 andalso X < 256 end }. %%--------------------------------------------------------------------