From 923913e15c85cd13748bce0663c280fc7fb0b1eb Mon Sep 17 00:00:00 2001 From: William Yang Date: Sun, 28 May 2023 16:19:32 +0200 Subject: [PATCH] feat(config): merge with global defaults when put new zone --- apps/emqx/src/emqx_config.erl | 178 +++++---- apps/emqx/test/emqx_config_SUITE.erl | 496 ++++++++++--------------- apps/emqx/test/emqx_takeover_SUITE.erl | 1 + 3 files changed, 311 insertions(+), 364 deletions(-) diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 6e36e9fa0..b43cd5003 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -218,6 +218,9 @@ find_listener_conf(Type, Listener, KeyPath) -> -spec put(map()) -> ok. put(Config) -> + put_with_order(Config). + +put1(Config) -> maps:fold( fun(RootName, RootValue, _) -> ?MODULE:put([atom(RootName)], RootValue) @@ -233,10 +236,8 @@ erase(RootName) -> -spec put(emqx_utils_maps:config_key_path(), term()) -> ok. put(KeyPath, Config) -> - Putter = fun(Path, Map, Value0) -> - Value = emqx_utils_maps:deep_put(Path, Map, Value0), - maybe_update_zone(KeyPath, Value0), - Value + Putter = fun(_Path, Map, Value) -> + maybe_update_zone(KeyPath, Map, Value) end, do_put(?CONF, Putter, KeyPath, Config). @@ -329,7 +330,9 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) -> %% check configs against the schema {AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf, #{}), save_to_app_env(AppEnvs), - ok = save_to_config_map(CheckedConf, RawConf). + ok = save_to_config_map(CheckedConf, RawConf), + maybe_init_default_zone(), + ok. %% Merge environment variable overrides on top, then merge with overrides. overlay_v0(SchemaMod, RawConf) when is_map(RawConf) -> @@ -586,15 +589,6 @@ save_to_app_env(AppEnvs0) -> -spec save_to_config_map(config(), raw_config()) -> ok. save_to_config_map(Conf, RawConf) -> ?MODULE:put(Conf), - try emqx_config:get([zones]) of - Zones when is_map(Zones) -> - init_default_zone() - catch - error:{config_not_found, [zones]} -> - %% emqx schema is not loaded. - %% note, don't trust get_root_names/0 - skip - end, ?MODULE:put_raw(RawConf). -spec save_to_override_conf(boolean(), raw_config(), update_opts()) -> ok | {error, term()}. @@ -796,63 +790,121 @@ to_atom_conf_path(Path, OnFail) -> %% %% note1, this should be called as post action after emqx_config terms (zones, and GlobalDefaults) %% are written in the PV storage during emqx config loading/initialization. --spec init_default_zone() -> ok. -init_default_zone() -> - Zones = - case ?MODULE:get([zones], #{}) of - #{default := _DefaultZone} = Z1 -> - Z1; - Z2 -> - Z2#{default => #{}} - end, - GlobalDefaults = maps:from_list([{K, ?MODULE:get([K])} || K <- zone_roots()]), - NewZones = maps:map( - fun(_ZoneName, ZoneVal) -> - merge_with_global_defaults(ZoneVal, GlobalDefaults) - end, - Zones - ), - ?MODULE:put([zones], NewZones). - -%% @TODO just use deep merge? --spec merge_with_global_defaults(map(), map()) -> map(). -merge_with_global_defaults(Val, Defaults) -> - maps:fold( - fun(K, V, Acc) -> - case maps:get(K, Acc, ?CONFIG_NOT_FOUND_MAGIC) of - ?CONFIG_NOT_FOUND_MAGIC -> - %% Use the value of global default - Acc#{K => V}; - Override -> - %% Merge with overrides - Acc#{K => emqx_utils_maps:deep_merge(V, Override)} - end - end, - Val, - Defaults - ). - -%% @doc Update zones in case global defaults are changed. --spec maybe_update_zone(runtime_config_key_path(), Val :: term()) -> skip | ok. -maybe_update_zone([], _Value) -> - skip; -maybe_update_zone([RootName | _T] = Path, Value) -> - case lists:member(RootName, zone_roots()) of - false -> +-spec maybe_init_default_zone() -> skip | ok. +maybe_init_default_zone() -> + case emqx_config:get([zones], ?CONFIG_NOT_FOUND_MAGIC) of + ?CONFIG_NOT_FOUND_MAGIC -> skip; - true -> - Zones = ?MODULE:get([zones], #{}), + Zones0 when is_map(Zones0) -> + Zones = + case Zones0 of + #{default := _DefaultZone} = Z1 -> + Z1; + Z2 -> + Z2#{default => #{}} + end, + GLD = zone_global_defaults(), NewZones = maps:map( fun(_ZoneName, ZoneVal) -> - %% @TODO we should not overwrite if it is a user defined value - emqx_utils_maps:deep_put(Path, ZoneVal, Value) + merge_with_global_defaults(GLD, ZoneVal) end, Zones ), - ?MODULE:put([zones], NewZones), - ok + ?MODULE:put([zones], NewZones) end. +-spec merge_with_global_defaults(map(), map()) -> map(). +merge_with_global_defaults(GlobalDefaults, ZoneVal) -> + emqx_utils_maps:deep_merge(GlobalDefaults, ZoneVal). + +%% @doc Update zones +%% when 1) zone updates, return *new* zones +%% when 2) zone global config updates, write to PT directly. +%% Zone global defaults are always presented in the configmap (PT) when updating zone +-spec maybe_update_zone(runtime_config_key_path(), RootValue :: map(), Val :: term()) -> + NewZoneVal :: map(). +maybe_update_zone([zones | T], ZonesValue, Value) -> + %% note, do not write to PT, return *New value* instead + NewZonesValue = emqx_utils_maps:deep_put(T, ZonesValue, Value), + ExistingZoneNames = maps:keys(?MODULE:get([zones], #{})), + %% Update only new zones with global defaults + GLD = zone_global_defaults(), + maps:fold( + fun(ZoneName, ZoneValue, Acc) -> + Acc#{ZoneName := merge_with_global_defaults(GLD, ZoneValue)} + end, + NewZonesValue, + maps:without(ExistingZoneNames, NewZonesValue) + ); +maybe_update_zone([RootName | T], RootValue, Value) when is_atom(RootName) -> + NewRootValue = emqx_utils_maps:deep_put(T, RootValue, Value), + case is_zone_root(RootName) of + false -> + skip; + true -> + %% When updates on global default roots. + ExistingZones = ?MODULE:get([zones], #{}), + RootNameBin = atom_to_binary(RootName), + NewZones = maps:map( + fun(ZoneName, ZoneVal) -> + BinPath = [<<"zones">>, atom_to_binary(ZoneName), RootNameBin], + case + %% look for user defined value from RAWCONF + ?MODULE:get_raw( + BinPath, + ?CONFIG_NOT_FOUND_MAGIC + ) + of + ?CONFIG_NOT_FOUND_MAGIC -> + ZoneVal#{RootName => NewRootValue}; + RawUserZoneRoot -> + UserDefinedValues = rawconf_to_conf( + emqx_schema, BinPath, RawUserZoneRoot + ), + ZoneVal#{ + RootName := + emqx_utils_maps:deep_merge( + NewRootValue, + UserDefinedValues + ) + } + end + end, + ExistingZones + ), + persistent_term:put(?PERSIS_KEY(?CONF, zones), NewZones) + end, + NewRootValue. + +zone_global_defaults() -> + maps:from_list([{K, ?MODULE:get([K])} || K <- zone_roots()]). + +-spec is_zone_root(atom) -> boolean(). +is_zone_root(Name) -> + lists:member(Name, zone_roots()). + -spec zone_roots() -> [atom()]. zone_roots() -> lists:map(fun list_to_atom/1, emqx_zone_schema:roots()). + +%%% +%%% @doc During init, ensure order of puts that zone is put after the other global defaults. +%%% +put_with_order(#{zones := _Zones} = Conf) -> + put1(maps:without([zones], Conf)), + put1(maps:with([zones], Conf)); +put_with_order(Conf) -> + put1(Conf). + +%% +%% @doc Helper function that converts raw conf val to runtime conf val +%% with the types info from schema module +-spec rawconf_to_conf(module(), RawPath :: [binary()], RawValue :: term()) -> term(). +rawconf_to_conf(SchemaModule, RawPath, RawValue) -> + {_, RawUserDefinedValues} = + check_config( + SchemaModule, + emqx_utils_maps:deep_put(RawPath, #{}, RawValue) + ), + AtomPath = to_atom_conf_path(RawPath, {raise_error, maybe_update_zone_error}), + emqx_utils_maps:deep_get(AtomPath, RawUserDefinedValues). diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 4f034676f..050a4f22c 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -107,25 +107,32 @@ t_init_load_emqx_schema(Config) -> ?assertEqual(ok, emqx_config:init_load(emqx_schema)), %% Then default zone is injected with all global defaults Default = emqx_config:get([zones, default]), + MQTT = emqx_config:get([mqtt]), + Stats = emqx_config:get([stats]), + FD = emqx_config:get([flapping_detect]), + FS = emqx_config:get([force_shutdown]), + CC = emqx_config:get([conn_congestion]), + FG = emqx_config:get([force_gc]), + OP = emqx_config:get([overload_protection]), ?assertMatch( #{ - mqtt := _, - stats := _, - flapping_detect := _, - force_shutdown := _, - conn_congestion := _, - force_gc := _, - overload_protection := _ + mqtt := MQTT, + stats := Stats, + flapping_detect := FD, + force_shutdown := FS, + conn_congestion := CC, + force_gc := FG, + overload_protection := OP }, Default ). -t_init_zones_load_emqx_schema_no_default(Config) -> +t_init_zones_load_emqx_schema_no_default_for_none_existing(Config) -> emqx_config:erase_all(), %% Given empty config file ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config), application:set_env(emqx, config_files, [ConfFile]), - %% When load emqx_schema + %% When emqx_schema is loaded ?assertEqual(ok, emqx_config:init_load(emqx_schema)), %% Then read for none existing zone should throw error ?assertError( @@ -138,9 +145,14 @@ t_init_zones_load_other_schema(Config) -> %% Given empty config file ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config), application:set_env(emqx, config_files, [ConfFile]), - %% When load schema other than emqx_schema + %% When load emqx_limiter_schema, not emqx_schema %% Then load should success ?assertEqual(ok, emqx_config:init_load(emqx_limiter_schema)), + %% Then no zones is loaded. + ?assertError( + {config_not_found, [zones]}, + emqx_config:get([zones]) + ), %% Then no default zone is loaded. ?assertError( {config_not_found, [zones, default]}, @@ -154,70 +166,15 @@ t_init_zones_with_user_defined_default_zone(Config) -> ?FUNCTION_NAME, <<"zones.default.mqtt.max_topic_alias=1024">>, Config ), application:set_env(emqx, config_files, [ConfFile]), - %% When load schema + %% When schema is loaded ?assertEqual(ok, emqx_config:init_load(emqx_schema)), - %% Then user defined value is set and others are defaults - ?assertMatch( - #{ - conn_congestion := - #{enable_alarm := true, min_alarm_sustain_duration := 60000}, - flapping_detect := - #{ban_time := 300000, max_count := 15, window_time := disabled}, - force_gc := - #{bytes := 16777216, count := 16000, enable := true}, - force_shutdown := - #{ - enable := true, - max_heap_size := 4194304, - max_mailbox_size := 1000 - }, - mqtt := - #{ - await_rel_timeout := 300000, - exclusive_subscription := false, - idle_timeout := 15000, - ignore_loop_deliver := false, - keepalive_backoff := 0.75, - keepalive_multiplier := 1.5, - max_awaiting_rel := 100, - max_clientid_len := 65535, - max_inflight := 32, - max_mqueue_len := 1000, - max_packet_size := 1048576, - max_qos_allowed := 2, - max_subscriptions := infinity, - %% <=== here! - max_topic_alias := 1024, - max_topic_levels := 128, - mqueue_default_priority := lowest, - mqueue_priorities := disabled, - mqueue_store_qos0 := true, - peer_cert_as_clientid := disabled, - peer_cert_as_username := disabled, - response_information := [], - retain_available := true, - retry_interval := 30000, - server_keepalive := disabled, - session_expiry_interval := 7200000, - shared_subscription := true, - strict_mode := false, - upgrade_qos := false, - use_username_as_clientid := false, - wildcard_subscription := true - }, - overload_protection := - #{ - backoff_delay := 1, - backoff_gc := false, - backoff_hibernation := true, - backoff_new_conn := true, - enable := false - }, - stats := #{enable := true} - }, - emqx_config:get([zones, default]) - ). + %% Then user defined value is set + {MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, default])), + {ZGDMQTT, ExpectedOthers} = maps:take(mqtt, zone_global_defaults()), + ?assertEqual(ZGDMQTT#{max_topic_alias := 1024}, MqttV), + %% Then others are defaults + ?assertEqual(ExpectedOthers, Others). t_init_zones_with_user_defined_other_zone(Config) -> emqx_config:erase_all(), @@ -226,270 +183,149 @@ t_init_zones_with_user_defined_other_zone(Config) -> ?FUNCTION_NAME, <<"zones.myzone.mqtt.max_topic_alias=1024">>, Config ), application:set_env(emqx, config_files, [ConfFile]), - %% When load schema + %% When schema is loaded ?assertEqual(ok, emqx_config:init_load(emqx_schema)), %% Then user defined value is set and others are defaults - ?assertMatch( - #{ - conn_congestion := - #{enable_alarm := true, min_alarm_sustain_duration := 60000}, - flapping_detect := - #{ban_time := 300000, max_count := 15, window_time := disabled}, - force_gc := - #{bytes := 16777216, count := 16000, enable := true}, - force_shutdown := - #{ - enable := true, - max_heap_size := 4194304, - max_mailbox_size := 1000 - }, - mqtt := - #{ - await_rel_timeout := 300000, - exclusive_subscription := false, - idle_timeout := 15000, - ignore_loop_deliver := false, - keepalive_backoff := 0.75, - keepalive_multiplier := 1.5, - max_awaiting_rel := 100, - max_clientid_len := 65535, - max_inflight := 32, - max_mqueue_len := 1000, - max_packet_size := 1048576, - max_qos_allowed := 2, - max_subscriptions := infinity, - %% <=== here! - max_topic_alias := 1024, - max_topic_levels := 128, - mqueue_default_priority := lowest, - mqueue_priorities := disabled, - mqueue_store_qos0 := true, - peer_cert_as_clientid := disabled, - peer_cert_as_username := disabled, - response_information := [], - retain_available := true, - retry_interval := 30000, - server_keepalive := disabled, - session_expiry_interval := 7200000, - shared_subscription := true, - strict_mode := false, - upgrade_qos := false, - use_username_as_clientid := false, - wildcard_subscription := true - }, - overload_protection := - #{ - backoff_delay := 1, - backoff_gc := false, - backoff_hibernation := true, - backoff_new_conn := true, - enable := false - }, - stats := #{enable := true} - }, - emqx_config:get([zones, myzone]) - ), + %% Then user defined value is set + {MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, myzone])), + {ZGDMQTT, ExpectedOthers} = maps:take(mqtt, zone_global_defaults()), + ?assertEqual(ZGDMQTT#{max_topic_alias := 1024}, MqttV), + %% Then others are defaults + ?assertEqual(ExpectedOthers, Others), %% Then default zone still have the defaults - ?assertMatch( - #{ - conn_congestion := - #{enable_alarm := true, min_alarm_sustain_duration := 60000}, - flapping_detect := - #{ban_time := 300000, max_count := 15, window_time := disabled}, - force_gc := - #{bytes := 16777216, count := 16000, enable := true}, - force_shutdown := - #{ - enable := true, - max_heap_size := 4194304, - max_mailbox_size := 1000 - }, - mqtt := - #{ - await_rel_timeout := 300000, - exclusive_subscription := false, - idle_timeout := 15000, - ignore_loop_deliver := false, - keepalive_backoff := 0.75, - keepalive_multiplier := 1.5, - max_awaiting_rel := 100, - max_clientid_len := 65535, - max_inflight := 32, - max_mqueue_len := 1000, - max_packet_size := 1048576, - max_qos_allowed := 2, - max_subscriptions := infinity, - max_topic_alias := 65535, - max_topic_levels := 128, - mqueue_default_priority := lowest, - mqueue_priorities := disabled, - mqueue_store_qos0 := true, - peer_cert_as_clientid := disabled, - peer_cert_as_username := disabled, - response_information := [], - retain_available := true, - retry_interval := 30000, - server_keepalive := disabled, - session_expiry_interval := 7200000, - shared_subscription := true, - strict_mode := false, - upgrade_qos := false, - use_username_as_clientid := false, - wildcard_subscription := true - }, - overload_protection := - #{ - backoff_delay := 1, - backoff_gc := false, - backoff_hibernation := true, - backoff_new_conn := true, - enable := false - }, - stats := #{enable := true} - }, - emqx_config:get([zones, default]) - ). + ?assertEqual(zone_global_defaults(), emqx_config:get([zones, default])). t_init_zones_with_cust_root_mqtt(Config) -> emqx_config:erase_all(), - %% Given user defined non default mqtt schema in config file - ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"mqtt.retry_interval=600000">>, Config), + %% Given config file with mqtt user overrides + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"mqtt.retry_interval=10m">>, Config), application:set_env(emqx, config_files, [ConfFile]), %% When emqx_schema is loaded ?assertEqual(ok, emqx_config:init_load(emqx_schema)), - %% Then the value is reflected in default `zone' and other fields under mqtt are default. - ?assertMatch( - #{ - await_rel_timeout := 300000, - exclusive_subscription := false, - idle_timeout := 15000, - ignore_loop_deliver := false, - keepalive_backoff := 0.75, - keepalive_multiplier := 1.5, - max_awaiting_rel := 100, - max_clientid_len := 65535, - max_inflight := 32, - max_mqueue_len := 1000, - max_packet_size := 1048576, - max_qos_allowed := 2, - max_subscriptions := infinity, - max_topic_alias := 65535, - max_topic_levels := 128, - mqueue_default_priority := lowest, - mqueue_priorities := disabled, - mqueue_store_qos0 := true, - peer_cert_as_clientid := disabled, - peer_cert_as_username := disabled, - response_information := [], - retain_available := true, - %% <=== here - retry_interval := 600000, - server_keepalive := disabled, - session_expiry_interval := 7200000, - shared_subscription := true, - strict_mode := false, - upgrade_qos := false, - use_username_as_clientid := false, - wildcard_subscription := true - }, + %% Then the value is reflected as internal representation in default `zone' + %% and other fields under mqtt are defaults. + GDefaultMqtt = maps:get(mqtt, zone_global_defaults()), + ?assertEqual( + GDefaultMqtt#{retry_interval := 600000}, emqx_config:get([zones, default, mqtt]) ). t_default_zone_is_updated_after_global_defaults_updated(Config) -> emqx_config:erase_all(), - %% Given user defined non default mqtt schema in config file + %% Given empty emqx conf ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config), application:set_env(emqx, config_files, [ConfFile]), ?assertEqual(ok, emqx_config:init_load(emqx_schema)), ?assertNotEqual(900000, emqx_config:get([zones, default, mqtt, retry_interval])), %% When emqx_schema is loaded emqx_config:put([mqtt, retry_interval], 900000), - %% Then the value is reflected in default `zone' and other fields under mqtt are default. - ?assertMatch( - #{ - await_rel_timeout := 300000, - exclusive_subscription := false, - idle_timeout := 15000, - ignore_loop_deliver := false, - keepalive_backoff := 0.75, - keepalive_multiplier := 1.5, - max_awaiting_rel := 100, - max_clientid_len := 65535, - max_inflight := 32, - max_mqueue_len := 1000, - max_packet_size := 1048576, - max_qos_allowed := 2, - max_subscriptions := infinity, - max_topic_alias := 65535, - max_topic_levels := 128, - mqueue_default_priority := lowest, - mqueue_priorities := disabled, - mqueue_store_qos0 := true, - peer_cert_as_clientid := disabled, - peer_cert_as_username := disabled, - response_information := [], - retain_available := true, - %% <=== here - retry_interval := 900000, - server_keepalive := disabled, - session_expiry_interval := 7200000, - shared_subscription := true, - strict_mode := false, - upgrade_qos := false, - use_username_as_clientid := false, - wildcard_subscription := true - }, + %% Then the value is reflected in default `zone' and other fields under mqtt are defaults. + GDefaultMqtt = maps:get(mqtt, zone_global_defaults()), + ?assertEqual( + GDefaultMqtt#{retry_interval := 900000}, emqx_config:get([zones, default, mqtt]) ). -t_other_zone_is_updated_after_global_defaults_updated(Config) -> +t_myzone_is_updated_after_global_defaults_updated(Config) -> emqx_config:erase_all(), - %% Given user defined non default mqtt schema in config file + %% Given emqx conf file with user override in myzone (none default zone) ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=32">>, Config), application:set_env(emqx, config_files, [ConfFile]), ?assertEqual(ok, emqx_config:init_load(emqx_schema)), ?assertNotEqual(900000, emqx_config:get([zones, myzone, mqtt, retry_interval])), - %% When emqx_schema is loaded + %% When update another value of global default emqx_config:put([mqtt, retry_interval], 900000), - %% Then the value is reflected in default `zone' and other fields under mqtt are default. - ?assertMatch( - #{ - await_rel_timeout := 300000, - exclusive_subscription := false, - idle_timeout := 15000, - ignore_loop_deliver := false, - keepalive_backoff := 0.75, - keepalive_multiplier := 1.5, - max_awaiting_rel := 100, - max_clientid_len := 65535, - max_inflight := 32, - max_mqueue_len := 1000, - max_packet_size := 1048576, - max_qos_allowed := 2, - max_subscriptions := infinity, - max_topic_alias := 65535, - max_topic_levels := 128, - mqueue_default_priority := lowest, - mqueue_priorities := disabled, - mqueue_store_qos0 := true, - peer_cert_as_clientid := disabled, - peer_cert_as_username := disabled, - response_information := [], - retain_available := true, - %% <=== here + %% Then the value is reflected in myzone and the user defined value unchanged. + GDefaultMqtt = maps:get(mqtt, zone_global_defaults()), + ?assertEqual( + GDefaultMqtt#{ retry_interval := 900000, - server_keepalive := disabled, - session_expiry_interval := 7200000, - shared_subscription := true, - strict_mode := false, - upgrade_qos := false, - use_username_as_clientid := false, - wildcard_subscription := true + max_inflight := 32 }, emqx_config:get([zones, myzone, mqtt]) + ), + %% Then the value is reflected in default zone as well. + ?assertEqual( + GDefaultMqtt#{retry_interval := 900000}, + emqx_config:get([zones, default, mqtt]) ). +t_zone_no_user_defined_overrides(Config) -> + emqx_config:erase_all(), + %% Given emqx conf file with user specified myzone + ConfFile = prepare_conf_file( + ?FUNCTION_NAME, <<"zones.myzone.mqtt.retry_interval=10m">>, Config + ), + application:set_env(emqx, config_files, [ConfFile]), + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + ?assertEqual(600000, emqx_config:get([zones, myzone, mqtt, retry_interval])), + %% When there is an update in global default + emqx_config:put([mqtt, max_inflight], 2), + %% Then the value is reflected in both default and myzone + ?assertMatch(2, emqx_config:get([zones, default, mqtt, max_inflight])), + ?assertMatch(2, emqx_config:get([zones, myzone, mqtt, max_inflight])), + %% Then user defined value from config is not overwritten + ?assertMatch(600000, emqx_config:get([zones, myzone, mqtt, retry_interval])). + +t_zone_no_user_defined_overrides_internal_represent(Config) -> + emqx_config:erase_all(), + %% Given emqx conf file with user specified myzone + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=1">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + ?assertEqual(1, emqx_config:get([zones, myzone, mqtt, max_inflight])), + %% When there is an update in global default + emqx_config:put([mqtt, max_inflight], 2), + %% Then the value is reflected in default `zone' but not user-defined zone + ?assertMatch(2, emqx_config:get([zones, default, mqtt, max_inflight])), + ?assertMatch(1, emqx_config:get([zones, myzone, mqtt, max_inflight])). + +t_update_global_defaults_no_updates_on_user_overrides(Config) -> + emqx_config:erase_all(), + %% Given default zone config in conf file. + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.default.mqtt.max_inflight=1">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + ?assertEqual(1, emqx_config:get([zones, default, mqtt, max_inflight])), + %% When there is an update in global default + emqx_config:put([mqtt, max_inflight], 20), + %% Then the value is not reflected in default `zone' + ?assertMatch(1, emqx_config:get([zones, default, mqtt, max_inflight])). + +t_zone_update_with_new_zone(Config) -> + emqx_config:erase_all(), + %% Given loaded an empty conf file + ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config), + application:set_env(emqx, config_files, [ConfFile]), + ?assertEqual(ok, emqx_config:init_load(emqx_schema)), + %% When there is an update for creating new zone config + ok = emqx_config:put([zones, myzone, mqtt, max_inflight], 2), + %% Then the value is set and other roots are created with defaults. + GDefaultMqtt = maps:get(mqtt, zone_global_defaults()), + ?assertEqual( + GDefaultMqtt#{max_inflight := 2}, + emqx_config:get([zones, myzone, mqtt]) + ). + +t_init_zone_with_global_defaults(_Config) -> + %% Given uninitialized empty config + emqx_config:erase_all(), + Zones = #{myzone => #{mqtt => #{max_inflight => 3}}}, + %% when put zones with global default with emqx_config:put/1 + GlobalDefaults = zone_global_defaults(), + AllConf = maps:put(zones, Zones, GlobalDefaults), + %% Then put sucess + ?assertEqual(ok, emqx_config:put(AllConf)), + %% Then GlobalDefaults are set + ?assertEqual(GlobalDefaults, maps:with(maps:keys(GlobalDefaults), emqx_config:get([]))), + %% Then my zone and default zone are set + {MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, myzone])), + {ZGDMQTT, ExpectedOthers} = maps:take(mqtt, GlobalDefaults), + ?assertEqual(ZGDMQTT#{max_inflight := 3}, MqttV), + %% Then others are defaults + ?assertEqual(ExpectedOthers, Others). + %%% %%% Helpers %%% @@ -502,3 +338,61 @@ prepare_conf_file(Name, Content, CTConfig) -> tc_conf_file(TC, Config) -> DataDir = ?config(data_dir, Config), filename:join([DataDir, TC, 'emqx.conf']). + +zone_global_defaults() -> + #{ + conn_congestion => + #{enable_alarm => true, min_alarm_sustain_duration => 60000}, + flapping_detect => + #{ban_time => 300000, max_count => 15, window_time => disabled}, + force_gc => + #{bytes => 16777216, count => 16000, enable => true}, + force_shutdown => + #{ + enable => true, + max_heap_size => 4194304, + max_mailbox_size => 1000 + }, + mqtt => + #{ + await_rel_timeout => 300000, + exclusive_subscription => false, + idle_timeout => 15000, + ignore_loop_deliver => false, + keepalive_backoff => 0.75, + keepalive_multiplier => 1.5, + max_awaiting_rel => 100, + max_clientid_len => 65535, + max_inflight => 32, + max_mqueue_len => 1000, + max_packet_size => 1048576, + max_qos_allowed => 2, + max_subscriptions => infinity, + max_topic_alias => 65535, + max_topic_levels => 128, + mqueue_default_priority => lowest, + mqueue_priorities => disabled, + mqueue_store_qos0 => true, + peer_cert_as_clientid => disabled, + peer_cert_as_username => disabled, + response_information => [], + retain_available => true, + retry_interval => 30000, + server_keepalive => disabled, + session_expiry_interval => 7200000, + shared_subscription => true, + strict_mode => false, + upgrade_qos => false, + use_username_as_clientid => false, + wildcard_subscription => true + }, + overload_protection => + #{ + backoff_delay => 1, + backoff_gc => false, + backoff_hibernation => true, + backoff_new_conn => true, + enable => false + }, + stats => #{enable => true} + }. diff --git a/apps/emqx/test/emqx_takeover_SUITE.erl b/apps/emqx/test/emqx_takeover_SUITE.erl index 4adfa1feb..d5b36c2c3 100644 --- a/apps/emqx/test/emqx_takeover_SUITE.erl +++ b/apps/emqx/test/emqx_takeover_SUITE.erl @@ -33,6 +33,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + emqx_common_test_helpers:boot_modules(all), ?check_trace( ?wait_async_action( emqx_common_test_helpers:start_apps([]),