From 7a2234c608a98cd47bebb1726651cc09dbdab815 Mon Sep 17 00:00:00 2001 From: turtleDeng Date: Fri, 17 Jan 2020 16:29:46 +0800 Subject: [PATCH 1/3] Improve emqx_mqtt_caps:get_caps/1 (#3198) --- src/emqx_mqtt_caps.erl | 41 ++++++++++++++++++++++++++--------- test/emqx_mqtt_caps_SUITE.erl | 18 +++++---------- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 4f0998327..47f02edda 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -29,6 +29,8 @@ , get_caps/3 ]). +-export([default_caps/0]). + -export([default/0]). -export_type([caps/0]). @@ -116,23 +118,42 @@ do_check_sub(#{is_shared := true}, #{shared_subscription := false}) -> {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; do_check_sub(_Flags, _Caps) -> ok. --spec(get_caps(emqx_zone:zone()) -> caps()). -get_caps(Zone) -> - maps:map(fun(Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def) end, ?DEFAULT_CAPS). +default_caps() -> + ?DEFAULT_CAPS. --spec(get_caps(emqx_zone:zone(), publish|subscribe) -> caps()). -get_caps(Zone, publish) -> - filter_caps(?PUBCAP_KEYS, get_caps(Zone)); -get_caps(Zone, subscribe) -> - filter_caps(?SUBCAP_KEYS, get_caps(Zone)). - --spec(get_caps(emqx_zone:zone(), atom(), term()) -> term()). get_caps(Zone, Cap, Def) -> emqx_zone:get_env(Zone, Cap, Def). +get_caps(Zone, publish) -> + with_env(Zone, '$mqtt_pub_caps', + fun() -> + filter_caps(?PUBCAP_KEYS, get_caps(Zone)) + end); + +get_caps(Zone, subscribe) -> + with_env(Zone, '$mqtt_sub_caps', + fun() -> + filter_caps(?SUBCAP_KEYS, get_caps(Zone)) + end). + +get_caps(Zone) -> + with_env(Zone, '$mqtt_caps', + fun() -> + maps:map(fun(Cap, Def) -> + emqx_zone:get_env(Zone, Cap, Def) + end, ?DEFAULT_CAPS) + end). + filter_caps(Keys, Caps) -> maps:filter(fun(Key, _Val) -> lists:member(Key, Keys) end, Caps). -spec(default() -> caps()). default() -> ?DEFAULT_CAPS. +with_env(Zone, Key, InitFun) -> + case emqx_zone:get_env(Zone, Key) of + undefined -> Caps = InitFun(), + ok = emqx_zone:set_env(Zone, Key, Caps), + Caps; + ZoneCaps -> ZoneCaps + end. \ No newline at end of file diff --git a/test/emqx_mqtt_caps_SUITE.erl b/test/emqx_mqtt_caps_SUITE.erl index fb1d3ab3d..695d69642 100644 --- a/test/emqx_mqtt_caps_SUITE.erl +++ b/test/emqx_mqtt_caps_SUITE.erl @@ -28,9 +28,8 @@ t_check_pub(_) -> PubCaps = #{max_qos_allowed => ?QOS_1, retain_available => false }, - lists:foreach(fun({Key, Val}) -> - ok = emqx_zone:set_env(zone, Key, Val) - end, maps:to_list(PubCaps)), + emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), + timer:sleep(50), ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1, retain => false}), PubFlags1 = #{qos => ?QOS_2, retain => false}, @@ -39,9 +38,7 @@ t_check_pub(_) -> PubFlags2 = #{qos => ?QOS_1, retain => true}, ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED}, emqx_mqtt_caps:check_pub(zone, PubFlags2)), - lists:foreach(fun({Key, _Val}) -> - true = emqx_zone:unset_env(zone, Key) - end, maps:to_list(PubCaps)). + emqx_zone:unset_env(zone, '$mqtt_pub_caps'). t_check_sub(_) -> SubOpts = #{rh => 0, @@ -54,9 +51,8 @@ t_check_sub(_) -> shared_subscription => false, wildcard_subscription => false }, - lists:foreach(fun({Key, Val}) -> - ok = emqx_zone:set_env(zone, Key, Val) - end, maps:to_list(SubCaps)), + emqx_zone:set_env(zone, '$mqtt_sub_caps', SubCaps), + timer:sleep(50), ok = emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts), ?assertEqual({error, ?RC_TOPIC_FILTER_INVALID}, emqx_mqtt_caps:check_sub(zone, <<"a/b/c/d">>, SubOpts)), @@ -64,6 +60,4 @@ t_check_sub(_) -> emqx_mqtt_caps:check_sub(zone, <<"+/#">>, SubOpts)), ?assertEqual({error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}, emqx_mqtt_caps:check_sub(zone, <<"topic">>, SubOpts#{share => true})), - lists:foreach(fun({Key, _Val}) -> - true = emqx_zone:unset_env(zone, Key) - end, maps:to_list(SubCaps)). + emqx_zone:unset_env(zone, '$mqtt_pub_caps'). From 050a3feab23ff8ab308c2f16e7194e39ff21f681 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 17 Jan 2020 16:59:36 +0800 Subject: [PATCH 2/3] Tune the 'force_gc_policy' and 'force_shutdown_policy' parameters (#3201) --- etc/emqx.conf | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 4c8195b1a..1e5136228 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -605,7 +605,7 @@ zone.external.acl_deny_action = ignore ## messages | bytes passed through. ## ## Numbers delimited by `|'. Zero or negative is to disable. -zone.external.force_gc_policy = 10000|10MB +zone.external.force_gc_policy = 16000|16MB ## Max message queue length and total heap size to force shutdown ## connection/session process. @@ -617,7 +617,7 @@ zone.external.force_gc_policy = 10000|10MB ## Default: ## - 10000|32MB on ARCH_64 system ## - 10000|16MB on ARCH_32 sytem -## zone.external.force_shutdown_policy = 10000|32MB +## zone.external.force_shutdown_policy = 32000|32MB ## Maximum MQTT packet size allowed. ## @@ -793,7 +793,7 @@ zone.internal.enable_acl = off zone.internal.acl_deny_action = ignore ## See zone.$name.force_gc_policy -## zone.internal.force_gc_policy = 100000|100MB +## zone.internal.force_gc_policy = 128000|128MB ## See zone.$name.wildcard_subscription. ## @@ -840,7 +840,7 @@ zone.internal.enable_flapping_detect = off ## Default: ## - 10000|32MB on ARCH_64 system ## - 10000|16MB on ARCH_32 sytem -## zone.internal.force_shutdown_policy = 100000|64MB +## zone.internal.force_shutdown_policy = 128000|128MB ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## From 36b3a443b74bb3ef966fc9fb0fdc9ea4c4a8cebf Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Fri, 17 Jan 2020 16:31:14 +0800 Subject: [PATCH 3/3] Reduce default tcp client nums to schedulers/2 --- etc/emqx.conf | 5 +++-- priv/emqx.schema | 11 +++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 1e5136228..e8d59cb65 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -327,10 +327,11 @@ rpc.tcp_server_port = 5369 ## Value: Port [1024-65535] rpc.tcp_client_port = 5369 -## Number of utgoing RPC connections. +## Number of Outgoing RPC connections. ## ## Value: Interger [1-256] -rpc.tcp_client_num = 32 +## Defaults to NumberOfCPUSchedulers / 2 +#rpc.tcp_client_num = 1 ## RCP Client connect timeout. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index fb05700c5..a7f53ba1f 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -362,11 +362,18 @@ end}. %% Default TCP port for outgoing connections {mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [ - {default, 32}, + {default, 0}, {datatype, integer}, {validators, ["range:gt_0_lt_256"]} ]}. +{translation, "gen_rpc.tcp_client_num", fun(Conf) -> + case cuttlefish:conf_get("rpc.tcp_client_num", Conf) of + 0 -> max(1, erlang:system_info(schedulers) div 2); + V -> V + end +end}. + %% Client connect timeout {mapping, "rpc.connect_timeout", "gen_rpc.connect_timeout", [ {default, "5s"}, @@ -428,7 +435,7 @@ end}. ]}. {validator, "range:gt_0_lt_256", "must greater than 0 and less than 256", - fun(X) -> X > 0 andalso X < 256 end + fun(X) -> X >= 0 andalso X < 256 end }. %%--------------------------------------------------------------------