diff --git a/.github/workflows/run_emqx_app_tests.yaml b/.github/workflows/run_emqx_app_tests.yaml index 24c3d2b42..b37070775 100644 --- a/.github/workflows/run_emqx_app_tests.yaml +++ b/.github/workflows/run_emqx_app_tests.yaml @@ -71,10 +71,10 @@ jobs: ./rebar3 xref ./rebar3 dialyzer ./rebar3 eunit -v - ./rebar3 ct -v --readable=true + ./rebar3 ct --name 'test@127.0.0.1' -v --readable=true ./rebar3 proper -d test/props - uses: actions/upload-artifact@v3 if: failure() with: - name: logs + name: logs-${{ matrix.runs-on }} path: apps/emqx/_build/test/logs diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 91809134c..b43cd5003 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -189,23 +189,11 @@ find_raw(KeyPath) -> -spec get_zone_conf(atom(), emqx_utils_maps:config_key_path()) -> term(). get_zone_conf(Zone, KeyPath) -> - case find(?ZONE_CONF_PATH(Zone, KeyPath)) of - %% not found in zones, try to find the global config - {not_found, _, _} -> - ?MODULE:get(KeyPath); - {ok, Value} -> - Value - end. + ?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath)). -spec get_zone_conf(atom(), emqx_utils_maps:config_key_path(), term()) -> term(). get_zone_conf(Zone, KeyPath, Default) -> - case find(?ZONE_CONF_PATH(Zone, KeyPath)) of - %% not found in zones, try to find the global config - {not_found, _, _} -> - ?MODULE:get(KeyPath, Default); - {ok, Value} -> - Value - end. + ?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath), Default). -spec put_zone_conf(atom(), emqx_utils_maps:config_key_path(), term()) -> ok. put_zone_conf(Zone, KeyPath, Conf) -> @@ -230,6 +218,9 @@ find_listener_conf(Type, Listener, KeyPath) -> -spec put(map()) -> ok. put(Config) -> + put_with_order(Config). + +put1(Config) -> maps:fold( fun(RootName, RootValue, _) -> ?MODULE:put([atom(RootName)], RootValue) @@ -245,8 +236,8 @@ erase(RootName) -> -spec put(emqx_utils_maps:config_key_path(), term()) -> ok. put(KeyPath, Config) -> - Putter = fun(Path, Map, Value) -> - emqx_utils_maps:deep_put(Path, Map, Value) + Putter = fun(_Path, Map, Value) -> + maybe_update_zone(KeyPath, Map, Value) end, do_put(?CONF, Putter, KeyPath, Config). @@ -339,7 +330,9 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) -> %% check configs against the schema {AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf, #{}), save_to_app_env(AppEnvs), - ok = save_to_config_map(CheckedConf, RawConf). + ok = save_to_config_map(CheckedConf, RawConf), + maybe_init_default_zone(), + ok. %% Merge environment variable overrides on top, then merge with overrides. overlay_v0(SchemaMod, RawConf) when is_map(RawConf) -> @@ -788,3 +781,130 @@ to_atom_conf_path(Path, OnFail) -> V end end. + +%% @doc Init zones under root `zones' +%% 1. ensure one `default' zone as it is referenced by listeners. +%% if default zone is unset, clone all default values from `GlobalDefaults' +%% if default zone is set, values are merged with `GlobalDefaults' +%% 2. For any user defined zones, merge with `GlobalDefaults' +%% +%% note1, this should be called as post action after emqx_config terms (zones, and GlobalDefaults) +%% are written in the PV storage during emqx config loading/initialization. +-spec maybe_init_default_zone() -> skip | ok. +maybe_init_default_zone() -> + case emqx_config:get([zones], ?CONFIG_NOT_FOUND_MAGIC) of + ?CONFIG_NOT_FOUND_MAGIC -> + skip; + Zones0 when is_map(Zones0) -> + Zones = + case Zones0 of + #{default := _DefaultZone} = Z1 -> + Z1; + Z2 -> + Z2#{default => #{}} + end, + GLD = zone_global_defaults(), + NewZones = maps:map( + fun(_ZoneName, ZoneVal) -> + merge_with_global_defaults(GLD, ZoneVal) + end, + Zones + ), + ?MODULE:put([zones], NewZones) + end. + +-spec merge_with_global_defaults(map(), map()) -> map(). +merge_with_global_defaults(GlobalDefaults, ZoneVal) -> + emqx_utils_maps:deep_merge(GlobalDefaults, ZoneVal). + +%% @doc Update zones +%% when 1) zone updates, return *new* zones +%% when 2) zone global config updates, write to PT directly. +%% Zone global defaults are always presented in the configmap (PT) when updating zone +-spec maybe_update_zone(runtime_config_key_path(), RootValue :: map(), Val :: term()) -> + NewZoneVal :: map(). +maybe_update_zone([zones | T], ZonesValue, Value) -> + %% note, do not write to PT, return *New value* instead + NewZonesValue = emqx_utils_maps:deep_put(T, ZonesValue, Value), + ExistingZoneNames = maps:keys(?MODULE:get([zones], #{})), + %% Update only new zones with global defaults + GLD = zone_global_defaults(), + maps:fold( + fun(ZoneName, ZoneValue, Acc) -> + Acc#{ZoneName := merge_with_global_defaults(GLD, ZoneValue)} + end, + NewZonesValue, + maps:without(ExistingZoneNames, NewZonesValue) + ); +maybe_update_zone([RootName | T], RootValue, Value) when is_atom(RootName) -> + NewRootValue = emqx_utils_maps:deep_put(T, RootValue, Value), + case is_zone_root(RootName) of + false -> + skip; + true -> + %% When updates on global default roots. + ExistingZones = ?MODULE:get([zones], #{}), + RootNameBin = atom_to_binary(RootName), + NewZones = maps:map( + fun(ZoneName, ZoneVal) -> + BinPath = [<<"zones">>, atom_to_binary(ZoneName), RootNameBin], + case + %% look for user defined value from RAWCONF + ?MODULE:get_raw( + BinPath, + ?CONFIG_NOT_FOUND_MAGIC + ) + of + ?CONFIG_NOT_FOUND_MAGIC -> + ZoneVal#{RootName => NewRootValue}; + RawUserZoneRoot -> + UserDefinedValues = rawconf_to_conf( + emqx_schema, BinPath, RawUserZoneRoot + ), + ZoneVal#{ + RootName := + emqx_utils_maps:deep_merge( + NewRootValue, + UserDefinedValues + ) + } + end + end, + ExistingZones + ), + persistent_term:put(?PERSIS_KEY(?CONF, zones), NewZones) + end, + NewRootValue. + +zone_global_defaults() -> + maps:from_list([{K, ?MODULE:get([K])} || K <- zone_roots()]). + +-spec is_zone_root(atom) -> boolean(). +is_zone_root(Name) -> + lists:member(Name, zone_roots()). + +-spec zone_roots() -> [atom()]. +zone_roots() -> + lists:map(fun list_to_atom/1, emqx_zone_schema:roots()). + +%%% +%%% @doc During init, ensure order of puts that zone is put after the other global defaults. +%%% +put_with_order(#{zones := _Zones} = Conf) -> + put1(maps:without([zones], Conf)), + put1(maps:with([zones], Conf)); +put_with_order(Conf) -> + put1(Conf). + +%% +%% @doc Helper function that converts raw conf val to runtime conf val +%% with the types info from schema module +-spec rawconf_to_conf(module(), RawPath :: [binary()], RawValue :: term()) -> term(). +rawconf_to_conf(SchemaModule, RawPath, RawValue) -> + {_, RawUserDefinedValues} = + check_config( + SchemaModule, + emqx_utils_maps:deep_put(RawPath, #{}, RawValue) + ), + AtomPath = to_atom_conf_path(RawPath, {raise_error, maybe_update_zone_error}), + emqx_utils_maps:deep_get(AtomPath, RawUserDefinedValues). diff --git a/apps/emqx/src/emqx_mqtt_caps.erl b/apps/emqx/src/emqx_mqtt_caps.erl index 897bb93c4..bf544280f 100644 --- a/apps/emqx/src/emqx_mqtt_caps.erl +++ b/apps/emqx/src/emqx_mqtt_caps.erl @@ -41,8 +41,6 @@ exclusive_subscription => boolean() }. --define(MAX_TOPIC_LEVELS, 65535). - -define(PUBCAP_KEYS, [ max_topic_levels, max_qos_allowed, @@ -154,8 +152,5 @@ get_caps(Zone) -> get_caps(Keys, Zone) -> maps:with( Keys, - maps:merge( - emqx_config:get([mqtt]), - emqx_config:get_zone_conf(Zone, [mqtt]) - ) + emqx_config:get_zone_conf(Zone, [mqtt]) ). diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index ce02f16c3..5653cd2d2 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -27,176 +27,6 @@ all() -> emqx_common_test_helpers:all(?MODULE). -force_gc_conf() -> - #{bytes => 16777216, count => 16000, enable => true}. - -force_shutdown_conf() -> - #{enable => true, max_heap_size => 4194304, max_mailbox_size => 1000}. - -rpc_conf() -> - #{ - async_batch_size => 256, - authentication_timeout => 5000, - call_receive_timeout => 15000, - connect_timeout => 5000, - mode => async, - port_discovery => stateless, - send_timeout => 5000, - socket_buffer => 1048576, - socket_keepalive_count => 9, - socket_keepalive_idle => 900, - socket_keepalive_interval => 75, - socket_recbuf => 1048576, - socket_sndbuf => 1048576, - tcp_client_num => 1, - tcp_server_port => 5369 - }. - -mqtt_conf() -> - #{ - await_rel_timeout => 300000, - idle_timeout => 15000, - ignore_loop_deliver => false, - keepalive_backoff => 0.75, - max_awaiting_rel => 100, - max_clientid_len => 65535, - max_inflight => 32, - max_mqueue_len => 1000, - max_packet_size => 1048576, - max_qos_allowed => 2, - max_subscriptions => infinity, - max_topic_alias => 65535, - max_topic_levels => 128, - mqueue_default_priority => lowest, - mqueue_priorities => disabled, - mqueue_store_qos0 => true, - peer_cert_as_clientid => disabled, - peer_cert_as_username => disabled, - response_information => [], - retain_available => true, - retry_interval => 30000, - server_keepalive => disabled, - session_expiry_interval => 7200000, - shared_subscription => true, - strict_mode => false, - upgrade_qos => false, - use_username_as_clientid => false, - wildcard_subscription => true - }. - -listener_mqtt_tcp_conf() -> - #{ - acceptors => 16, - zone => default, - access_rules => ["allow all"], - bind => {{0, 0, 0, 0}, 1883}, - max_connections => 1024000, - mountpoint => <<>>, - proxy_protocol => false, - proxy_protocol_timeout => 3000, - tcp_options => #{ - active_n => 100, - backlog => 1024, - buffer => 4096, - high_watermark => 1048576, - nodelay => false, - reuseaddr => true, - send_timeout => 15000, - send_timeout_close => true - } - }. - -listener_mqtt_ws_conf() -> - #{ - acceptors => 16, - zone => default, - access_rules => ["allow all"], - bind => {{0, 0, 0, 0}, 8083}, - max_connections => 1024000, - mountpoint => <<>>, - proxy_protocol => false, - proxy_protocol_timeout => 3000, - tcp_options => - #{ - active_n => 100, - backlog => 1024, - buffer => 4096, - high_watermark => 1048576, - nodelay => false, - reuseaddr => true, - send_timeout => 15000, - send_timeout_close => true - }, - websocket => - #{ - allow_origin_absence => true, - check_origin_enable => false, - check_origins => [], - compress => false, - deflate_opts => - #{ - client_max_window_bits => 15, - mem_level => 8, - server_max_window_bits => 15 - }, - fail_if_no_subprotocol => true, - idle_timeout => 86400000, - max_frame_size => infinity, - mqtt_path => "/mqtt", - mqtt_piggyback => multiple, - % should allow uppercase in config - proxy_address_header => "X-Forwarded-For", - proxy_port_header => "x-forwarded-port", - supported_subprotocols => - ["mqtt", "mqtt-v3", "mqtt-v3.1.1", "mqtt-v5"] - } - }. - -listeners_conf() -> - #{ - tcp => #{default => listener_mqtt_tcp_conf()}, - ws => #{default => listener_mqtt_ws_conf()} - }. - -limiter_conf() -> - Make = fun() -> - #{ - burst => 0, - rate => infinity - } - end, - - lists:foldl( - fun(Name, Acc) -> - Acc#{Name => Make()} - end, - #{}, - [bytes, messages, message_routing, connection, internal] - ). - -stats_conf() -> - #{enable => true}. - -zone_conf() -> - #{}. - -basic_conf() -> - #{ - force_gc => force_gc_conf(), - force_shutdown => force_shutdown_conf(), - mqtt => mqtt_conf(), - rpc => rpc_conf(), - stats => stats_conf(), - listeners => listeners_conf(), - zones => zone_conf(), - limiter => limiter_conf() - }. - -set_test_listener_confs() -> - Conf = emqx_config:get([], #{}), - emqx_config:put(basic_conf()), - Conf. - %%-------------------------------------------------------------------- %% CT Callbacks %%-------------------------------------------------------------------- @@ -242,14 +72,11 @@ init_per_testcase(_TestCase, Config) -> fun(_) -> {ok, #{is_superuser => false}} end ), ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end), - %% Set confs - OldConf = set_test_listener_confs(), emqx_common_test_helpers:start_apps([]), - [{config, OldConf} | Config]. + Config. end_per_testcase(_TestCase, Config) -> meck:unload([emqx_access_control]), - emqx_config:put(?config(config, Config)), emqx_common_test_helpers:stop_apps([]), Config. diff --git a/apps/emqx/test/emqx_client_SUITE.erl b/apps/emqx/test/emqx_client_SUITE.erl index 14617a152..c9923365e 100644 --- a/apps/emqx/test/emqx_client_SUITE.erl +++ b/apps/emqx/test/emqx_client_SUITE.erl @@ -383,7 +383,7 @@ t_certcn_as_clientid_tlsv1_2(_) -> tls_certcn_as_clientid('tlsv1.2'). t_peercert_preserved_before_connected(_) -> - ok = emqx_config:put_zone_conf(default, [mqtt], #{}), + ok = emqx_config:put_zone_conf(default, [mqtt, peer_cert_as_clientid], false), ok = emqx_hooks:add( 'client.connect', {?MODULE, on_hook, ['client.connect', self()]}, diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index a55531c2d..050a4f22c 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -19,6 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -96,3 +97,302 @@ t_unknown_rook_keys(_) -> end ), ok. + +t_init_load_emqx_schema(Config) -> + emqx_config:erase_all(), + %% Given empty config file + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + %% When load emqx_schema + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + %% Then default zone is injected with all global defaults + Default = emqx_config:get([zones, default]), + MQTT = emqx_config:get([mqtt]), + Stats = emqx_config:get([stats]), + FD = emqx_config:get([flapping_detect]), + FS = emqx_config:get([force_shutdown]), + CC = emqx_config:get([conn_congestion]), + FG = emqx_config:get([force_gc]), + OP = emqx_config:get([overload_protection]), + ?assertMatch( + #{ + mqtt := MQTT, + stats := Stats, + flapping_detect := FD, + force_shutdown := FS, + conn_congestion := CC, + force_gc := FG, + overload_protection := OP + }, + Default + ). + +t_init_zones_load_emqx_schema_no_default_for_none_existing(Config) -> + emqx_config:erase_all(), + %% Given empty config file + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + %% When emqx_schema is loaded + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + %% Then read for none existing zone should throw error + ?assertError( + {config_not_found, [zones, no_exists]}, + emqx_config:get([zones, no_exists]) + ). + +t_init_zones_load_other_schema(Config) -> + emqx_config:erase_all(), + %% Given empty config file + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + %% When load emqx_limiter_schema, not emqx_schema + %% Then load should success + ?assertEqual(ok, emqx_config:init_load(emqx_limiter_schema)), + %% Then no zones is loaded. + ?assertError( + {config_not_found, [zones]}, + emqx_config:get([zones]) + ), + %% Then no default zone is loaded. + ?assertError( + {config_not_found, [zones, default]}, + emqx_config:get([zones, default]) + ). + +t_init_zones_with_user_defined_default_zone(Config) -> + emqx_config:erase_all(), + %% Given user defined config for default zone + ConfFile = prepare_conf_file( + ?FUNCTION_NAME, <<"zones.default.mqtt.max_topic_alias=1024">>, Config + ), + application:set_env(emqx, config_files, [ConfFile]), + %% When schema is loaded + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + + %% Then user defined value is set + {MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, default])), + {ZGDMQTT, ExpectedOthers} = maps:take(mqtt, zone_global_defaults()), + ?assertEqual(ZGDMQTT#{max_topic_alias := 1024}, MqttV), + %% Then others are defaults + ?assertEqual(ExpectedOthers, Others). + +t_init_zones_with_user_defined_other_zone(Config) -> + emqx_config:erase_all(), + %% Given user defined config for default zone + ConfFile = prepare_conf_file( + ?FUNCTION_NAME, <<"zones.myzone.mqtt.max_topic_alias=1024">>, Config + ), + application:set_env(emqx, config_files, [ConfFile]), + %% When schema is loaded + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + %% Then user defined value is set and others are defaults + + %% Then user defined value is set + {MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, myzone])), + {ZGDMQTT, ExpectedOthers} = maps:take(mqtt, zone_global_defaults()), + ?assertEqual(ZGDMQTT#{max_topic_alias := 1024}, MqttV), + %% Then others are defaults + ?assertEqual(ExpectedOthers, Others), + %% Then default zone still have the defaults + ?assertEqual(zone_global_defaults(), emqx_config:get([zones, default])). + +t_init_zones_with_cust_root_mqtt(Config) -> + emqx_config:erase_all(), + %% Given config file with mqtt user overrides + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"mqtt.retry_interval=10m">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + %% When emqx_schema is loaded + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + %% Then the value is reflected as internal representation in default `zone' + %% and other fields under mqtt are defaults. + GDefaultMqtt = maps:get(mqtt, zone_global_defaults()), + ?assertEqual( + GDefaultMqtt#{retry_interval := 600000}, + emqx_config:get([zones, default, mqtt]) + ). + +t_default_zone_is_updated_after_global_defaults_updated(Config) -> + emqx_config:erase_all(), + %% Given empty emqx conf + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + ?assertNotEqual(900000, emqx_config:get([zones, default, mqtt, retry_interval])), + %% When emqx_schema is loaded + emqx_config:put([mqtt, retry_interval], 900000), + %% Then the value is reflected in default `zone' and other fields under mqtt are defaults. + GDefaultMqtt = maps:get(mqtt, zone_global_defaults()), + ?assertEqual( + GDefaultMqtt#{retry_interval := 900000}, + emqx_config:get([zones, default, mqtt]) + ). + +t_myzone_is_updated_after_global_defaults_updated(Config) -> + emqx_config:erase_all(), + %% Given emqx conf file with user override in myzone (none default zone) + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=32">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + ?assertNotEqual(900000, emqx_config:get([zones, myzone, mqtt, retry_interval])), + %% When update another value of global default + emqx_config:put([mqtt, retry_interval], 900000), + %% Then the value is reflected in myzone and the user defined value unchanged. + GDefaultMqtt = maps:get(mqtt, zone_global_defaults()), + ?assertEqual( + GDefaultMqtt#{ + retry_interval := 900000, + max_inflight := 32 + }, + emqx_config:get([zones, myzone, mqtt]) + ), + %% Then the value is reflected in default zone as well. + ?assertEqual( + GDefaultMqtt#{retry_interval := 900000}, + emqx_config:get([zones, default, mqtt]) + ). + +t_zone_no_user_defined_overrides(Config) -> + emqx_config:erase_all(), + %% Given emqx conf file with user specified myzone + ConfFile = prepare_conf_file( + ?FUNCTION_NAME, <<"zones.myzone.mqtt.retry_interval=10m">>, Config + ), + application:set_env(emqx, config_files, [ConfFile]), + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + ?assertEqual(600000, emqx_config:get([zones, myzone, mqtt, retry_interval])), + %% When there is an update in global default + emqx_config:put([mqtt, max_inflight], 2), + %% Then the value is reflected in both default and myzone + ?assertMatch(2, emqx_config:get([zones, default, mqtt, max_inflight])), + ?assertMatch(2, emqx_config:get([zones, myzone, mqtt, max_inflight])), + %% Then user defined value from config is not overwritten + ?assertMatch(600000, emqx_config:get([zones, myzone, mqtt, retry_interval])). + +t_zone_no_user_defined_overrides_internal_represent(Config) -> + emqx_config:erase_all(), + %% Given emqx conf file with user specified myzone + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=1">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + ?assertEqual(1, emqx_config:get([zones, myzone, mqtt, max_inflight])), + %% When there is an update in global default + emqx_config:put([mqtt, max_inflight], 2), + %% Then the value is reflected in default `zone' but not user-defined zone + ?assertMatch(2, emqx_config:get([zones, default, mqtt, max_inflight])), + ?assertMatch(1, emqx_config:get([zones, myzone, mqtt, max_inflight])). + +t_update_global_defaults_no_updates_on_user_overrides(Config) -> + emqx_config:erase_all(), + %% Given default zone config in conf file. + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.default.mqtt.max_inflight=1">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + ?assertEqual(1, emqx_config:get([zones, default, mqtt, max_inflight])), + %% When there is an update in global default + emqx_config:put([mqtt, max_inflight], 20), + %% Then the value is not reflected in default `zone' + ?assertMatch(1, emqx_config:get([zones, default, mqtt, max_inflight])). + +t_zone_update_with_new_zone(Config) -> + emqx_config:erase_all(), + %% Given loaded an empty conf file + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + %% When there is an update for creating new zone config + ok = emqx_config:put([zones, myzone, mqtt, max_inflight], 2), + %% Then the value is set and other roots are created with defaults. + GDefaultMqtt = maps:get(mqtt, zone_global_defaults()), + ?assertEqual( + GDefaultMqtt#{max_inflight := 2}, + emqx_config:get([zones, myzone, mqtt]) + ). + +t_init_zone_with_global_defaults(_Config) -> + %% Given uninitialized empty config + emqx_config:erase_all(), + Zones = #{myzone => #{mqtt => #{max_inflight => 3}}}, + %% when put zones with global default with emqx_config:put/1 + GlobalDefaults = zone_global_defaults(), + AllConf = maps:put(zones, Zones, GlobalDefaults), + %% Then put sucess + ?assertEqual(ok, emqx_config:put(AllConf)), + %% Then GlobalDefaults are set + ?assertEqual(GlobalDefaults, maps:with(maps:keys(GlobalDefaults), emqx_config:get([]))), + %% Then my zone and default zone are set + {MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, myzone])), + {ZGDMQTT, ExpectedOthers} = maps:take(mqtt, GlobalDefaults), + ?assertEqual(ZGDMQTT#{max_inflight := 3}, MqttV), + %% Then others are defaults + ?assertEqual(ExpectedOthers, Others). + +%%% +%%% Helpers +%%% +prepare_conf_file(Name, Content, CTConfig) -> + Filename = tc_conf_file(Name, CTConfig), + filelib:ensure_dir(Filename), + ok = file:write_file(Filename, Content), + Filename. + +tc_conf_file(TC, Config) -> + DataDir = ?config(data_dir, Config), + filename:join([DataDir, TC, 'emqx.conf']). + +zone_global_defaults() -> + #{ + conn_congestion => + #{enable_alarm => true, min_alarm_sustain_duration => 60000}, + flapping_detect => + #{ban_time => 300000, max_count => 15, window_time => disabled}, + force_gc => + #{bytes => 16777216, count => 16000, enable => true}, + force_shutdown => + #{ + enable => true, + max_heap_size => 4194304, + max_mailbox_size => 1000 + }, + mqtt => + #{ + await_rel_timeout => 300000, + exclusive_subscription => false, + idle_timeout => 15000, + ignore_loop_deliver => false, + keepalive_backoff => 0.75, + keepalive_multiplier => 1.5, + max_awaiting_rel => 100, + max_clientid_len => 65535, + max_inflight => 32, + max_mqueue_len => 1000, + max_packet_size => 1048576, + max_qos_allowed => 2, + max_subscriptions => infinity, + max_topic_alias => 65535, + max_topic_levels => 128, + mqueue_default_priority => lowest, + mqueue_priorities => disabled, + mqueue_store_qos0 => true, + peer_cert_as_clientid => disabled, + peer_cert_as_username => disabled, + response_information => [], + retain_available => true, + retry_interval => 30000, + server_keepalive => disabled, + session_expiry_interval => 7200000, + shared_subscription => true, + strict_mode => false, + upgrade_qos => false, + use_username_as_clientid => false, + wildcard_subscription => true + }, + overload_protection => + #{ + backoff_delay => 1, + backoff_gc => false, + backoff_hibernation => true, + backoff_new_conn => true, + enable => false + }, + stats => #{enable => true} + }. diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index de3672bf3..ea451eea5 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -57,7 +57,6 @@ init_per_suite(Config) -> ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end), ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end), - emqx_channel_SUITE:set_test_listener_confs(), emqx_common_test_helpers:start_apps([]), Config. diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index 21d8f0a2a..88fae7156 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -39,7 +39,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). %%-------------------------------------------------------------------- init_per_suite(Config) -> - emqx_channel_SUITE:set_test_listener_confs(), + emqx_common_test_helpers:start_apps([]), ok = meck:new( [emqx_hooks, emqx_metrics, emqx_broker], [passthrough, no_history, no_link] diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index e66b42bc8..7c6c144eb 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). -define(SUITE, ?MODULE). @@ -46,12 +47,30 @@ all() -> emqx_common_test_helpers:all(?SUITE). init_per_suite(Config) -> - net_kernel:start(['master@127.0.0.1', longnames]), + DistPid = + case net_kernel:nodename() of + ignored -> + %% calling `net_kernel:start' without `epmd' + %% running will result in a failure. + emqx_common_test_helpers:start_epmd(), + {ok, Pid} = net_kernel:start(['master@127.0.0.1', longnames]), + ct:pal("start epmd, node name: ~p", [node()]), + Pid; + _ -> + undefined + end, emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), - Config. + [{dist_pid, DistPid} | Config]. -end_per_suite(_Config) -> +end_per_suite(Config) -> + DistPid = ?config(dist_pid, Config), + case DistPid of + Pid when is_pid(Pid) -> + net_kernel:stop(); + _ -> + ok + end, emqx_common_test_helpers:stop_apps([]). init_per_testcase(Case, Config) -> diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 36ba6c6a0..d5b36c2c3 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -34,7 +34,6 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> emqx_common_test_helpers:boot_modules(all), - emqx_channel_SUITE:set_test_listener_confs(), ?check_trace( ?wait_async_action( emqx_common_test_helpers:start_apps([]), diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index bc91bc0ef..06be28a9c 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -137,7 +137,6 @@ end_per_testcase(_, Config) -> Config. init_per_suite(Config) -> - emqx_channel_SUITE:set_test_listener_confs(), emqx_common_test_helpers:start_apps([]), Config. diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl index 4ace80893..3fd21f389 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl @@ -55,7 +55,7 @@ t_start_no_session(_Config) -> Opts = #{ clientinfo => #{ clientid => ?CLIENT_ID, - zone => internal + zone => default }, conninfo => #{ clientid => ?CLIENT_ID, @@ -76,7 +76,7 @@ t_start_no_expire(_Config) -> Opts = #{ clientinfo => #{ clientid => ?CLIENT_ID, - zone => internal + zone => default }, conninfo => #{ clientid => ?CLIENT_ID, @@ -97,7 +97,7 @@ t_start_infinite_expire(_Config) -> Opts = #{ clientinfo => #{ clientid => ?CLIENT_ID, - zone => internal + zone => default }, conninfo => #{ clientid => ?CLIENT_ID, diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index 2805d260d..5fca80db2 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -137,9 +137,14 @@ t_global_zone(_Config) -> ), ?assertEqual(lists:usort(ZonesKeys), lists:usort(maps:keys(Zones))), ?assertEqual( - emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed]), + emqx_config:get_zone_conf(default, [mqtt, max_qos_allowed]), emqx_utils_maps:deep_get([<<"mqtt">>, <<"max_qos_allowed">>], Zones) ), + ?assertError( + {config_not_found, [zones, no_default, mqtt, max_qos_allowed]}, + emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed]) + ), + NewZones1 = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 1), NewZones2 = emqx_utils_maps:deep_remove([<<"mqtt">>, <<"peer_cert_as_clientid">>], NewZones1), {ok, #{<<"mqtt">> := Res}} = update_global_zone(NewZones2), @@ -151,7 +156,11 @@ t_global_zone(_Config) -> }, Res ), - ?assertEqual(1, emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed])), + ?assertEqual(1, emqx_config:get_zone_conf(default, [mqtt, max_qos_allowed])), + ?assertError( + {config_not_found, [zones, no_default, mqtt, max_qos_allowed]}, + emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed]) + ), %% Make sure the override config is updated, and remove the default value. ?assertMatch(#{<<"max_qos_allowed">> := 1}, read_conf(<<"mqtt">>)), diff --git a/changes/ce/perf-10790.en.md b/changes/ce/perf-10790.en.md new file mode 100644 index 000000000..cccb975f5 --- /dev/null +++ b/changes/ce/perf-10790.en.md @@ -0,0 +1,2 @@ +Reducing overhead of reading configs per zone. +