feat(config): merge with global defaults when put new zone
This commit is contained in:
parent
a2f8e87389
commit
923913e15c
|
@ -218,6 +218,9 @@ find_listener_conf(Type, Listener, KeyPath) ->
|
||||||
|
|
||||||
-spec put(map()) -> ok.
|
-spec put(map()) -> ok.
|
||||||
put(Config) ->
|
put(Config) ->
|
||||||
|
put_with_order(Config).
|
||||||
|
|
||||||
|
put1(Config) ->
|
||||||
maps:fold(
|
maps:fold(
|
||||||
fun(RootName, RootValue, _) ->
|
fun(RootName, RootValue, _) ->
|
||||||
?MODULE:put([atom(RootName)], RootValue)
|
?MODULE:put([atom(RootName)], RootValue)
|
||||||
|
@ -233,10 +236,8 @@ erase(RootName) ->
|
||||||
|
|
||||||
-spec put(emqx_utils_maps:config_key_path(), term()) -> ok.
|
-spec put(emqx_utils_maps:config_key_path(), term()) -> ok.
|
||||||
put(KeyPath, Config) ->
|
put(KeyPath, Config) ->
|
||||||
Putter = fun(Path, Map, Value0) ->
|
Putter = fun(_Path, Map, Value) ->
|
||||||
Value = emqx_utils_maps:deep_put(Path, Map, Value0),
|
maybe_update_zone(KeyPath, Map, Value)
|
||||||
maybe_update_zone(KeyPath, Value0),
|
|
||||||
Value
|
|
||||||
end,
|
end,
|
||||||
do_put(?CONF, Putter, KeyPath, Config).
|
do_put(?CONF, Putter, KeyPath, Config).
|
||||||
|
|
||||||
|
@ -329,7 +330,9 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) ->
|
||||||
%% check configs against the schema
|
%% check configs against the schema
|
||||||
{AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf, #{}),
|
{AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf, #{}),
|
||||||
save_to_app_env(AppEnvs),
|
save_to_app_env(AppEnvs),
|
||||||
ok = save_to_config_map(CheckedConf, RawConf).
|
ok = save_to_config_map(CheckedConf, RawConf),
|
||||||
|
maybe_init_default_zone(),
|
||||||
|
ok.
|
||||||
|
|
||||||
%% Merge environment variable overrides on top, then merge with overrides.
|
%% Merge environment variable overrides on top, then merge with overrides.
|
||||||
overlay_v0(SchemaMod, RawConf) when is_map(RawConf) ->
|
overlay_v0(SchemaMod, RawConf) when is_map(RawConf) ->
|
||||||
|
@ -586,15 +589,6 @@ save_to_app_env(AppEnvs0) ->
|
||||||
-spec save_to_config_map(config(), raw_config()) -> ok.
|
-spec save_to_config_map(config(), raw_config()) -> ok.
|
||||||
save_to_config_map(Conf, RawConf) ->
|
save_to_config_map(Conf, RawConf) ->
|
||||||
?MODULE:put(Conf),
|
?MODULE:put(Conf),
|
||||||
try emqx_config:get([zones]) of
|
|
||||||
Zones when is_map(Zones) ->
|
|
||||||
init_default_zone()
|
|
||||||
catch
|
|
||||||
error:{config_not_found, [zones]} ->
|
|
||||||
%% emqx schema is not loaded.
|
|
||||||
%% note, don't trust get_root_names/0
|
|
||||||
skip
|
|
||||||
end,
|
|
||||||
?MODULE:put_raw(RawConf).
|
?MODULE:put_raw(RawConf).
|
||||||
|
|
||||||
-spec save_to_override_conf(boolean(), raw_config(), update_opts()) -> ok | {error, term()}.
|
-spec save_to_override_conf(boolean(), raw_config(), update_opts()) -> ok | {error, term()}.
|
||||||
|
@ -796,63 +790,121 @@ to_atom_conf_path(Path, OnFail) ->
|
||||||
%%
|
%%
|
||||||
%% note1, this should be called as post action after emqx_config terms (zones, and GlobalDefaults)
|
%% note1, this should be called as post action after emqx_config terms (zones, and GlobalDefaults)
|
||||||
%% are written in the PV storage during emqx config loading/initialization.
|
%% are written in the PV storage during emqx config loading/initialization.
|
||||||
-spec init_default_zone() -> ok.
|
-spec maybe_init_default_zone() -> skip | ok.
|
||||||
init_default_zone() ->
|
maybe_init_default_zone() ->
|
||||||
|
case emqx_config:get([zones], ?CONFIG_NOT_FOUND_MAGIC) of
|
||||||
|
?CONFIG_NOT_FOUND_MAGIC ->
|
||||||
|
skip;
|
||||||
|
Zones0 when is_map(Zones0) ->
|
||||||
Zones =
|
Zones =
|
||||||
case ?MODULE:get([zones], #{}) of
|
case Zones0 of
|
||||||
#{default := _DefaultZone} = Z1 ->
|
#{default := _DefaultZone} = Z1 ->
|
||||||
Z1;
|
Z1;
|
||||||
Z2 ->
|
Z2 ->
|
||||||
Z2#{default => #{}}
|
Z2#{default => #{}}
|
||||||
end,
|
end,
|
||||||
GlobalDefaults = maps:from_list([{K, ?MODULE:get([K])} || K <- zone_roots()]),
|
GLD = zone_global_defaults(),
|
||||||
NewZones = maps:map(
|
NewZones = maps:map(
|
||||||
fun(_ZoneName, ZoneVal) ->
|
fun(_ZoneName, ZoneVal) ->
|
||||||
merge_with_global_defaults(ZoneVal, GlobalDefaults)
|
merge_with_global_defaults(GLD, ZoneVal)
|
||||||
end,
|
end,
|
||||||
Zones
|
Zones
|
||||||
),
|
),
|
||||||
?MODULE:put([zones], NewZones).
|
?MODULE:put([zones], NewZones)
|
||||||
|
end.
|
||||||
|
|
||||||
%% @TODO just use deep merge?
|
|
||||||
-spec merge_with_global_defaults(map(), map()) -> map().
|
-spec merge_with_global_defaults(map(), map()) -> map().
|
||||||
merge_with_global_defaults(Val, Defaults) ->
|
merge_with_global_defaults(GlobalDefaults, ZoneVal) ->
|
||||||
maps:fold(
|
emqx_utils_maps:deep_merge(GlobalDefaults, ZoneVal).
|
||||||
fun(K, V, Acc) ->
|
|
||||||
case maps:get(K, Acc, ?CONFIG_NOT_FOUND_MAGIC) of
|
|
||||||
?CONFIG_NOT_FOUND_MAGIC ->
|
|
||||||
%% Use the value of global default
|
|
||||||
Acc#{K => V};
|
|
||||||
Override ->
|
|
||||||
%% Merge with overrides
|
|
||||||
Acc#{K => emqx_utils_maps:deep_merge(V, Override)}
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
Val,
|
|
||||||
Defaults
|
|
||||||
).
|
|
||||||
|
|
||||||
%% @doc Update zones in case global defaults are changed.
|
%% @doc Update zones
|
||||||
-spec maybe_update_zone(runtime_config_key_path(), Val :: term()) -> skip | ok.
|
%% when 1) zone updates, return *new* zones
|
||||||
maybe_update_zone([], _Value) ->
|
%% when 2) zone global config updates, write to PT directly.
|
||||||
skip;
|
%% Zone global defaults are always presented in the configmap (PT) when updating zone
|
||||||
maybe_update_zone([RootName | _T] = Path, Value) ->
|
-spec maybe_update_zone(runtime_config_key_path(), RootValue :: map(), Val :: term()) ->
|
||||||
case lists:member(RootName, zone_roots()) of
|
NewZoneVal :: map().
|
||||||
|
maybe_update_zone([zones | T], ZonesValue, Value) ->
|
||||||
|
%% note, do not write to PT, return *New value* instead
|
||||||
|
NewZonesValue = emqx_utils_maps:deep_put(T, ZonesValue, Value),
|
||||||
|
ExistingZoneNames = maps:keys(?MODULE:get([zones], #{})),
|
||||||
|
%% Update only new zones with global defaults
|
||||||
|
GLD = zone_global_defaults(),
|
||||||
|
maps:fold(
|
||||||
|
fun(ZoneName, ZoneValue, Acc) ->
|
||||||
|
Acc#{ZoneName := merge_with_global_defaults(GLD, ZoneValue)}
|
||||||
|
end,
|
||||||
|
NewZonesValue,
|
||||||
|
maps:without(ExistingZoneNames, NewZonesValue)
|
||||||
|
);
|
||||||
|
maybe_update_zone([RootName | T], RootValue, Value) when is_atom(RootName) ->
|
||||||
|
NewRootValue = emqx_utils_maps:deep_put(T, RootValue, Value),
|
||||||
|
case is_zone_root(RootName) of
|
||||||
false ->
|
false ->
|
||||||
skip;
|
skip;
|
||||||
true ->
|
true ->
|
||||||
Zones = ?MODULE:get([zones], #{}),
|
%% When updates on global default roots.
|
||||||
|
ExistingZones = ?MODULE:get([zones], #{}),
|
||||||
|
RootNameBin = atom_to_binary(RootName),
|
||||||
NewZones = maps:map(
|
NewZones = maps:map(
|
||||||
fun(_ZoneName, ZoneVal) ->
|
fun(ZoneName, ZoneVal) ->
|
||||||
%% @TODO we should not overwrite if it is a user defined value
|
BinPath = [<<"zones">>, atom_to_binary(ZoneName), RootNameBin],
|
||||||
emqx_utils_maps:deep_put(Path, ZoneVal, Value)
|
case
|
||||||
end,
|
%% look for user defined value from RAWCONF
|
||||||
Zones
|
?MODULE:get_raw(
|
||||||
|
BinPath,
|
||||||
|
?CONFIG_NOT_FOUND_MAGIC
|
||||||
|
)
|
||||||
|
of
|
||||||
|
?CONFIG_NOT_FOUND_MAGIC ->
|
||||||
|
ZoneVal#{RootName => NewRootValue};
|
||||||
|
RawUserZoneRoot ->
|
||||||
|
UserDefinedValues = rawconf_to_conf(
|
||||||
|
emqx_schema, BinPath, RawUserZoneRoot
|
||||||
),
|
),
|
||||||
?MODULE:put([zones], NewZones),
|
ZoneVal#{
|
||||||
ok
|
RootName :=
|
||||||
end.
|
emqx_utils_maps:deep_merge(
|
||||||
|
NewRootValue,
|
||||||
|
UserDefinedValues
|
||||||
|
)
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
ExistingZones
|
||||||
|
),
|
||||||
|
persistent_term:put(?PERSIS_KEY(?CONF, zones), NewZones)
|
||||||
|
end,
|
||||||
|
NewRootValue.
|
||||||
|
|
||||||
|
zone_global_defaults() ->
|
||||||
|
maps:from_list([{K, ?MODULE:get([K])} || K <- zone_roots()]).
|
||||||
|
|
||||||
|
-spec is_zone_root(atom) -> boolean().
|
||||||
|
is_zone_root(Name) ->
|
||||||
|
lists:member(Name, zone_roots()).
|
||||||
|
|
||||||
-spec zone_roots() -> [atom()].
|
-spec zone_roots() -> [atom()].
|
||||||
zone_roots() ->
|
zone_roots() ->
|
||||||
lists:map(fun list_to_atom/1, emqx_zone_schema:roots()).
|
lists:map(fun list_to_atom/1, emqx_zone_schema:roots()).
|
||||||
|
|
||||||
|
%%%
|
||||||
|
%%% @doc During init, ensure order of puts that zone is put after the other global defaults.
|
||||||
|
%%%
|
||||||
|
put_with_order(#{zones := _Zones} = Conf) ->
|
||||||
|
put1(maps:without([zones], Conf)),
|
||||||
|
put1(maps:with([zones], Conf));
|
||||||
|
put_with_order(Conf) ->
|
||||||
|
put1(Conf).
|
||||||
|
|
||||||
|
%%
|
||||||
|
%% @doc Helper function that converts raw conf val to runtime conf val
|
||||||
|
%% with the types info from schema module
|
||||||
|
-spec rawconf_to_conf(module(), RawPath :: [binary()], RawValue :: term()) -> term().
|
||||||
|
rawconf_to_conf(SchemaModule, RawPath, RawValue) ->
|
||||||
|
{_, RawUserDefinedValues} =
|
||||||
|
check_config(
|
||||||
|
SchemaModule,
|
||||||
|
emqx_utils_maps:deep_put(RawPath, #{}, RawValue)
|
||||||
|
),
|
||||||
|
AtomPath = to_atom_conf_path(RawPath, {raise_error, maybe_update_zone_error}),
|
||||||
|
emqx_utils_maps:deep_get(AtomPath, RawUserDefinedValues).
|
||||||
|
|
|
@ -107,25 +107,32 @@ t_init_load_emqx_schema(Config) ->
|
||||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||||
%% Then default zone is injected with all global defaults
|
%% Then default zone is injected with all global defaults
|
||||||
Default = emqx_config:get([zones, default]),
|
Default = emqx_config:get([zones, default]),
|
||||||
|
MQTT = emqx_config:get([mqtt]),
|
||||||
|
Stats = emqx_config:get([stats]),
|
||||||
|
FD = emqx_config:get([flapping_detect]),
|
||||||
|
FS = emqx_config:get([force_shutdown]),
|
||||||
|
CC = emqx_config:get([conn_congestion]),
|
||||||
|
FG = emqx_config:get([force_gc]),
|
||||||
|
OP = emqx_config:get([overload_protection]),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
#{
|
||||||
mqtt := _,
|
mqtt := MQTT,
|
||||||
stats := _,
|
stats := Stats,
|
||||||
flapping_detect := _,
|
flapping_detect := FD,
|
||||||
force_shutdown := _,
|
force_shutdown := FS,
|
||||||
conn_congestion := _,
|
conn_congestion := CC,
|
||||||
force_gc := _,
|
force_gc := FG,
|
||||||
overload_protection := _
|
overload_protection := OP
|
||||||
},
|
},
|
||||||
Default
|
Default
|
||||||
).
|
).
|
||||||
|
|
||||||
t_init_zones_load_emqx_schema_no_default(Config) ->
|
t_init_zones_load_emqx_schema_no_default_for_none_existing(Config) ->
|
||||||
emqx_config:erase_all(),
|
emqx_config:erase_all(),
|
||||||
%% Given empty config file
|
%% Given empty config file
|
||||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||||
application:set_env(emqx, config_files, [ConfFile]),
|
application:set_env(emqx, config_files, [ConfFile]),
|
||||||
%% When load emqx_schema
|
%% When emqx_schema is loaded
|
||||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||||
%% Then read for none existing zone should throw error
|
%% Then read for none existing zone should throw error
|
||||||
?assertError(
|
?assertError(
|
||||||
|
@ -138,9 +145,14 @@ t_init_zones_load_other_schema(Config) ->
|
||||||
%% Given empty config file
|
%% Given empty config file
|
||||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||||
application:set_env(emqx, config_files, [ConfFile]),
|
application:set_env(emqx, config_files, [ConfFile]),
|
||||||
%% When load schema other than emqx_schema
|
%% When load emqx_limiter_schema, not emqx_schema
|
||||||
%% Then load should success
|
%% Then load should success
|
||||||
?assertEqual(ok, emqx_config:init_load(emqx_limiter_schema)),
|
?assertEqual(ok, emqx_config:init_load(emqx_limiter_schema)),
|
||||||
|
%% Then no zones is loaded.
|
||||||
|
?assertError(
|
||||||
|
{config_not_found, [zones]},
|
||||||
|
emqx_config:get([zones])
|
||||||
|
),
|
||||||
%% Then no default zone is loaded.
|
%% Then no default zone is loaded.
|
||||||
?assertError(
|
?assertError(
|
||||||
{config_not_found, [zones, default]},
|
{config_not_found, [zones, default]},
|
||||||
|
@ -154,70 +166,15 @@ t_init_zones_with_user_defined_default_zone(Config) ->
|
||||||
?FUNCTION_NAME, <<"zones.default.mqtt.max_topic_alias=1024">>, Config
|
?FUNCTION_NAME, <<"zones.default.mqtt.max_topic_alias=1024">>, Config
|
||||||
),
|
),
|
||||||
application:set_env(emqx, config_files, [ConfFile]),
|
application:set_env(emqx, config_files, [ConfFile]),
|
||||||
%% When load schema
|
%% When schema is loaded
|
||||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||||
%% Then user defined value is set and others are defaults
|
|
||||||
|
|
||||||
?assertMatch(
|
%% Then user defined value is set
|
||||||
#{
|
{MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, default])),
|
||||||
conn_congestion :=
|
{ZGDMQTT, ExpectedOthers} = maps:take(mqtt, zone_global_defaults()),
|
||||||
#{enable_alarm := true, min_alarm_sustain_duration := 60000},
|
?assertEqual(ZGDMQTT#{max_topic_alias := 1024}, MqttV),
|
||||||
flapping_detect :=
|
%% Then others are defaults
|
||||||
#{ban_time := 300000, max_count := 15, window_time := disabled},
|
?assertEqual(ExpectedOthers, Others).
|
||||||
force_gc :=
|
|
||||||
#{bytes := 16777216, count := 16000, enable := true},
|
|
||||||
force_shutdown :=
|
|
||||||
#{
|
|
||||||
enable := true,
|
|
||||||
max_heap_size := 4194304,
|
|
||||||
max_mailbox_size := 1000
|
|
||||||
},
|
|
||||||
mqtt :=
|
|
||||||
#{
|
|
||||||
await_rel_timeout := 300000,
|
|
||||||
exclusive_subscription := false,
|
|
||||||
idle_timeout := 15000,
|
|
||||||
ignore_loop_deliver := false,
|
|
||||||
keepalive_backoff := 0.75,
|
|
||||||
keepalive_multiplier := 1.5,
|
|
||||||
max_awaiting_rel := 100,
|
|
||||||
max_clientid_len := 65535,
|
|
||||||
max_inflight := 32,
|
|
||||||
max_mqueue_len := 1000,
|
|
||||||
max_packet_size := 1048576,
|
|
||||||
max_qos_allowed := 2,
|
|
||||||
max_subscriptions := infinity,
|
|
||||||
%% <=== here!
|
|
||||||
max_topic_alias := 1024,
|
|
||||||
max_topic_levels := 128,
|
|
||||||
mqueue_default_priority := lowest,
|
|
||||||
mqueue_priorities := disabled,
|
|
||||||
mqueue_store_qos0 := true,
|
|
||||||
peer_cert_as_clientid := disabled,
|
|
||||||
peer_cert_as_username := disabled,
|
|
||||||
response_information := [],
|
|
||||||
retain_available := true,
|
|
||||||
retry_interval := 30000,
|
|
||||||
server_keepalive := disabled,
|
|
||||||
session_expiry_interval := 7200000,
|
|
||||||
shared_subscription := true,
|
|
||||||
strict_mode := false,
|
|
||||||
upgrade_qos := false,
|
|
||||||
use_username_as_clientid := false,
|
|
||||||
wildcard_subscription := true
|
|
||||||
},
|
|
||||||
overload_protection :=
|
|
||||||
#{
|
|
||||||
backoff_delay := 1,
|
|
||||||
backoff_gc := false,
|
|
||||||
backoff_hibernation := true,
|
|
||||||
backoff_new_conn := true,
|
|
||||||
enable := false
|
|
||||||
},
|
|
||||||
stats := #{enable := true}
|
|
||||||
},
|
|
||||||
emqx_config:get([zones, default])
|
|
||||||
).
|
|
||||||
|
|
||||||
t_init_zones_with_user_defined_other_zone(Config) ->
|
t_init_zones_with_user_defined_other_zone(Config) ->
|
||||||
emqx_config:erase_all(),
|
emqx_config:erase_all(),
|
||||||
|
@ -226,270 +183,149 @@ t_init_zones_with_user_defined_other_zone(Config) ->
|
||||||
?FUNCTION_NAME, <<"zones.myzone.mqtt.max_topic_alias=1024">>, Config
|
?FUNCTION_NAME, <<"zones.myzone.mqtt.max_topic_alias=1024">>, Config
|
||||||
),
|
),
|
||||||
application:set_env(emqx, config_files, [ConfFile]),
|
application:set_env(emqx, config_files, [ConfFile]),
|
||||||
%% When load schema
|
%% When schema is loaded
|
||||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||||
%% Then user defined value is set and others are defaults
|
%% Then user defined value is set and others are defaults
|
||||||
?assertMatch(
|
|
||||||
#{
|
|
||||||
conn_congestion :=
|
|
||||||
#{enable_alarm := true, min_alarm_sustain_duration := 60000},
|
|
||||||
flapping_detect :=
|
|
||||||
#{ban_time := 300000, max_count := 15, window_time := disabled},
|
|
||||||
force_gc :=
|
|
||||||
#{bytes := 16777216, count := 16000, enable := true},
|
|
||||||
force_shutdown :=
|
|
||||||
#{
|
|
||||||
enable := true,
|
|
||||||
max_heap_size := 4194304,
|
|
||||||
max_mailbox_size := 1000
|
|
||||||
},
|
|
||||||
mqtt :=
|
|
||||||
#{
|
|
||||||
await_rel_timeout := 300000,
|
|
||||||
exclusive_subscription := false,
|
|
||||||
idle_timeout := 15000,
|
|
||||||
ignore_loop_deliver := false,
|
|
||||||
keepalive_backoff := 0.75,
|
|
||||||
keepalive_multiplier := 1.5,
|
|
||||||
max_awaiting_rel := 100,
|
|
||||||
max_clientid_len := 65535,
|
|
||||||
max_inflight := 32,
|
|
||||||
max_mqueue_len := 1000,
|
|
||||||
max_packet_size := 1048576,
|
|
||||||
max_qos_allowed := 2,
|
|
||||||
max_subscriptions := infinity,
|
|
||||||
%% <=== here!
|
|
||||||
max_topic_alias := 1024,
|
|
||||||
max_topic_levels := 128,
|
|
||||||
mqueue_default_priority := lowest,
|
|
||||||
mqueue_priorities := disabled,
|
|
||||||
mqueue_store_qos0 := true,
|
|
||||||
peer_cert_as_clientid := disabled,
|
|
||||||
peer_cert_as_username := disabled,
|
|
||||||
response_information := [],
|
|
||||||
retain_available := true,
|
|
||||||
retry_interval := 30000,
|
|
||||||
server_keepalive := disabled,
|
|
||||||
session_expiry_interval := 7200000,
|
|
||||||
shared_subscription := true,
|
|
||||||
strict_mode := false,
|
|
||||||
upgrade_qos := false,
|
|
||||||
use_username_as_clientid := false,
|
|
||||||
wildcard_subscription := true
|
|
||||||
},
|
|
||||||
overload_protection :=
|
|
||||||
#{
|
|
||||||
backoff_delay := 1,
|
|
||||||
backoff_gc := false,
|
|
||||||
backoff_hibernation := true,
|
|
||||||
backoff_new_conn := true,
|
|
||||||
enable := false
|
|
||||||
},
|
|
||||||
stats := #{enable := true}
|
|
||||||
},
|
|
||||||
emqx_config:get([zones, myzone])
|
|
||||||
),
|
|
||||||
|
|
||||||
|
%% Then user defined value is set
|
||||||
|
{MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, myzone])),
|
||||||
|
{ZGDMQTT, ExpectedOthers} = maps:take(mqtt, zone_global_defaults()),
|
||||||
|
?assertEqual(ZGDMQTT#{max_topic_alias := 1024}, MqttV),
|
||||||
|
%% Then others are defaults
|
||||||
|
?assertEqual(ExpectedOthers, Others),
|
||||||
%% Then default zone still have the defaults
|
%% Then default zone still have the defaults
|
||||||
?assertMatch(
|
?assertEqual(zone_global_defaults(), emqx_config:get([zones, default])).
|
||||||
#{
|
|
||||||
conn_congestion :=
|
|
||||||
#{enable_alarm := true, min_alarm_sustain_duration := 60000},
|
|
||||||
flapping_detect :=
|
|
||||||
#{ban_time := 300000, max_count := 15, window_time := disabled},
|
|
||||||
force_gc :=
|
|
||||||
#{bytes := 16777216, count := 16000, enable := true},
|
|
||||||
force_shutdown :=
|
|
||||||
#{
|
|
||||||
enable := true,
|
|
||||||
max_heap_size := 4194304,
|
|
||||||
max_mailbox_size := 1000
|
|
||||||
},
|
|
||||||
mqtt :=
|
|
||||||
#{
|
|
||||||
await_rel_timeout := 300000,
|
|
||||||
exclusive_subscription := false,
|
|
||||||
idle_timeout := 15000,
|
|
||||||
ignore_loop_deliver := false,
|
|
||||||
keepalive_backoff := 0.75,
|
|
||||||
keepalive_multiplier := 1.5,
|
|
||||||
max_awaiting_rel := 100,
|
|
||||||
max_clientid_len := 65535,
|
|
||||||
max_inflight := 32,
|
|
||||||
max_mqueue_len := 1000,
|
|
||||||
max_packet_size := 1048576,
|
|
||||||
max_qos_allowed := 2,
|
|
||||||
max_subscriptions := infinity,
|
|
||||||
max_topic_alias := 65535,
|
|
||||||
max_topic_levels := 128,
|
|
||||||
mqueue_default_priority := lowest,
|
|
||||||
mqueue_priorities := disabled,
|
|
||||||
mqueue_store_qos0 := true,
|
|
||||||
peer_cert_as_clientid := disabled,
|
|
||||||
peer_cert_as_username := disabled,
|
|
||||||
response_information := [],
|
|
||||||
retain_available := true,
|
|
||||||
retry_interval := 30000,
|
|
||||||
server_keepalive := disabled,
|
|
||||||
session_expiry_interval := 7200000,
|
|
||||||
shared_subscription := true,
|
|
||||||
strict_mode := false,
|
|
||||||
upgrade_qos := false,
|
|
||||||
use_username_as_clientid := false,
|
|
||||||
wildcard_subscription := true
|
|
||||||
},
|
|
||||||
overload_protection :=
|
|
||||||
#{
|
|
||||||
backoff_delay := 1,
|
|
||||||
backoff_gc := false,
|
|
||||||
backoff_hibernation := true,
|
|
||||||
backoff_new_conn := true,
|
|
||||||
enable := false
|
|
||||||
},
|
|
||||||
stats := #{enable := true}
|
|
||||||
},
|
|
||||||
emqx_config:get([zones, default])
|
|
||||||
).
|
|
||||||
|
|
||||||
t_init_zones_with_cust_root_mqtt(Config) ->
|
t_init_zones_with_cust_root_mqtt(Config) ->
|
||||||
emqx_config:erase_all(),
|
emqx_config:erase_all(),
|
||||||
%% Given user defined non default mqtt schema in config file
|
%% Given config file with mqtt user overrides
|
||||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"mqtt.retry_interval=600000">>, Config),
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"mqtt.retry_interval=10m">>, Config),
|
||||||
application:set_env(emqx, config_files, [ConfFile]),
|
application:set_env(emqx, config_files, [ConfFile]),
|
||||||
%% When emqx_schema is loaded
|
%% When emqx_schema is loaded
|
||||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||||
%% Then the value is reflected in default `zone' and other fields under mqtt are default.
|
%% Then the value is reflected as internal representation in default `zone'
|
||||||
?assertMatch(
|
%% and other fields under mqtt are defaults.
|
||||||
#{
|
GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
|
||||||
await_rel_timeout := 300000,
|
?assertEqual(
|
||||||
exclusive_subscription := false,
|
GDefaultMqtt#{retry_interval := 600000},
|
||||||
idle_timeout := 15000,
|
|
||||||
ignore_loop_deliver := false,
|
|
||||||
keepalive_backoff := 0.75,
|
|
||||||
keepalive_multiplier := 1.5,
|
|
||||||
max_awaiting_rel := 100,
|
|
||||||
max_clientid_len := 65535,
|
|
||||||
max_inflight := 32,
|
|
||||||
max_mqueue_len := 1000,
|
|
||||||
max_packet_size := 1048576,
|
|
||||||
max_qos_allowed := 2,
|
|
||||||
max_subscriptions := infinity,
|
|
||||||
max_topic_alias := 65535,
|
|
||||||
max_topic_levels := 128,
|
|
||||||
mqueue_default_priority := lowest,
|
|
||||||
mqueue_priorities := disabled,
|
|
||||||
mqueue_store_qos0 := true,
|
|
||||||
peer_cert_as_clientid := disabled,
|
|
||||||
peer_cert_as_username := disabled,
|
|
||||||
response_information := [],
|
|
||||||
retain_available := true,
|
|
||||||
%% <=== here
|
|
||||||
retry_interval := 600000,
|
|
||||||
server_keepalive := disabled,
|
|
||||||
session_expiry_interval := 7200000,
|
|
||||||
shared_subscription := true,
|
|
||||||
strict_mode := false,
|
|
||||||
upgrade_qos := false,
|
|
||||||
use_username_as_clientid := false,
|
|
||||||
wildcard_subscription := true
|
|
||||||
},
|
|
||||||
emqx_config:get([zones, default, mqtt])
|
emqx_config:get([zones, default, mqtt])
|
||||||
).
|
).
|
||||||
|
|
||||||
t_default_zone_is_updated_after_global_defaults_updated(Config) ->
|
t_default_zone_is_updated_after_global_defaults_updated(Config) ->
|
||||||
emqx_config:erase_all(),
|
emqx_config:erase_all(),
|
||||||
%% Given user defined non default mqtt schema in config file
|
%% Given empty emqx conf
|
||||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||||
application:set_env(emqx, config_files, [ConfFile]),
|
application:set_env(emqx, config_files, [ConfFile]),
|
||||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||||
?assertNotEqual(900000, emqx_config:get([zones, default, mqtt, retry_interval])),
|
?assertNotEqual(900000, emqx_config:get([zones, default, mqtt, retry_interval])),
|
||||||
%% When emqx_schema is loaded
|
%% When emqx_schema is loaded
|
||||||
emqx_config:put([mqtt, retry_interval], 900000),
|
emqx_config:put([mqtt, retry_interval], 900000),
|
||||||
%% Then the value is reflected in default `zone' and other fields under mqtt are default.
|
%% Then the value is reflected in default `zone' and other fields under mqtt are defaults.
|
||||||
?assertMatch(
|
GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
|
||||||
#{
|
?assertEqual(
|
||||||
await_rel_timeout := 300000,
|
GDefaultMqtt#{retry_interval := 900000},
|
||||||
exclusive_subscription := false,
|
|
||||||
idle_timeout := 15000,
|
|
||||||
ignore_loop_deliver := false,
|
|
||||||
keepalive_backoff := 0.75,
|
|
||||||
keepalive_multiplier := 1.5,
|
|
||||||
max_awaiting_rel := 100,
|
|
||||||
max_clientid_len := 65535,
|
|
||||||
max_inflight := 32,
|
|
||||||
max_mqueue_len := 1000,
|
|
||||||
max_packet_size := 1048576,
|
|
||||||
max_qos_allowed := 2,
|
|
||||||
max_subscriptions := infinity,
|
|
||||||
max_topic_alias := 65535,
|
|
||||||
max_topic_levels := 128,
|
|
||||||
mqueue_default_priority := lowest,
|
|
||||||
mqueue_priorities := disabled,
|
|
||||||
mqueue_store_qos0 := true,
|
|
||||||
peer_cert_as_clientid := disabled,
|
|
||||||
peer_cert_as_username := disabled,
|
|
||||||
response_information := [],
|
|
||||||
retain_available := true,
|
|
||||||
%% <=== here
|
|
||||||
retry_interval := 900000,
|
|
||||||
server_keepalive := disabled,
|
|
||||||
session_expiry_interval := 7200000,
|
|
||||||
shared_subscription := true,
|
|
||||||
strict_mode := false,
|
|
||||||
upgrade_qos := false,
|
|
||||||
use_username_as_clientid := false,
|
|
||||||
wildcard_subscription := true
|
|
||||||
},
|
|
||||||
emqx_config:get([zones, default, mqtt])
|
emqx_config:get([zones, default, mqtt])
|
||||||
).
|
).
|
||||||
|
|
||||||
t_other_zone_is_updated_after_global_defaults_updated(Config) ->
|
t_myzone_is_updated_after_global_defaults_updated(Config) ->
|
||||||
emqx_config:erase_all(),
|
emqx_config:erase_all(),
|
||||||
%% Given user defined non default mqtt schema in config file
|
%% Given emqx conf file with user override in myzone (none default zone)
|
||||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=32">>, Config),
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=32">>, Config),
|
||||||
application:set_env(emqx, config_files, [ConfFile]),
|
application:set_env(emqx, config_files, [ConfFile]),
|
||||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||||
?assertNotEqual(900000, emqx_config:get([zones, myzone, mqtt, retry_interval])),
|
?assertNotEqual(900000, emqx_config:get([zones, myzone, mqtt, retry_interval])),
|
||||||
%% When emqx_schema is loaded
|
%% When update another value of global default
|
||||||
emqx_config:put([mqtt, retry_interval], 900000),
|
emqx_config:put([mqtt, retry_interval], 900000),
|
||||||
%% Then the value is reflected in default `zone' and other fields under mqtt are default.
|
%% Then the value is reflected in myzone and the user defined value unchanged.
|
||||||
?assertMatch(
|
GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
|
||||||
#{
|
?assertEqual(
|
||||||
await_rel_timeout := 300000,
|
GDefaultMqtt#{
|
||||||
exclusive_subscription := false,
|
|
||||||
idle_timeout := 15000,
|
|
||||||
ignore_loop_deliver := false,
|
|
||||||
keepalive_backoff := 0.75,
|
|
||||||
keepalive_multiplier := 1.5,
|
|
||||||
max_awaiting_rel := 100,
|
|
||||||
max_clientid_len := 65535,
|
|
||||||
max_inflight := 32,
|
|
||||||
max_mqueue_len := 1000,
|
|
||||||
max_packet_size := 1048576,
|
|
||||||
max_qos_allowed := 2,
|
|
||||||
max_subscriptions := infinity,
|
|
||||||
max_topic_alias := 65535,
|
|
||||||
max_topic_levels := 128,
|
|
||||||
mqueue_default_priority := lowest,
|
|
||||||
mqueue_priorities := disabled,
|
|
||||||
mqueue_store_qos0 := true,
|
|
||||||
peer_cert_as_clientid := disabled,
|
|
||||||
peer_cert_as_username := disabled,
|
|
||||||
response_information := [],
|
|
||||||
retain_available := true,
|
|
||||||
%% <=== here
|
|
||||||
retry_interval := 900000,
|
retry_interval := 900000,
|
||||||
server_keepalive := disabled,
|
max_inflight := 32
|
||||||
session_expiry_interval := 7200000,
|
|
||||||
shared_subscription := true,
|
|
||||||
strict_mode := false,
|
|
||||||
upgrade_qos := false,
|
|
||||||
use_username_as_clientid := false,
|
|
||||||
wildcard_subscription := true
|
|
||||||
},
|
},
|
||||||
emqx_config:get([zones, myzone, mqtt])
|
emqx_config:get([zones, myzone, mqtt])
|
||||||
|
),
|
||||||
|
%% Then the value is reflected in default zone as well.
|
||||||
|
?assertEqual(
|
||||||
|
GDefaultMqtt#{retry_interval := 900000},
|
||||||
|
emqx_config:get([zones, default, mqtt])
|
||||||
).
|
).
|
||||||
|
|
||||||
|
t_zone_no_user_defined_overrides(Config) ->
|
||||||
|
emqx_config:erase_all(),
|
||||||
|
%% Given emqx conf file with user specified myzone
|
||||||
|
ConfFile = prepare_conf_file(
|
||||||
|
?FUNCTION_NAME, <<"zones.myzone.mqtt.retry_interval=10m">>, Config
|
||||||
|
),
|
||||||
|
application:set_env(emqx, config_files, [ConfFile]),
|
||||||
|
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||||
|
?assertEqual(600000, emqx_config:get([zones, myzone, mqtt, retry_interval])),
|
||||||
|
%% When there is an update in global default
|
||||||
|
emqx_config:put([mqtt, max_inflight], 2),
|
||||||
|
%% Then the value is reflected in both default and myzone
|
||||||
|
?assertMatch(2, emqx_config:get([zones, default, mqtt, max_inflight])),
|
||||||
|
?assertMatch(2, emqx_config:get([zones, myzone, mqtt, max_inflight])),
|
||||||
|
%% Then user defined value from config is not overwritten
|
||||||
|
?assertMatch(600000, emqx_config:get([zones, myzone, mqtt, retry_interval])).
|
||||||
|
|
||||||
|
t_zone_no_user_defined_overrides_internal_represent(Config) ->
|
||||||
|
emqx_config:erase_all(),
|
||||||
|
%% Given emqx conf file with user specified myzone
|
||||||
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=1">>, Config),
|
||||||
|
application:set_env(emqx, config_files, [ConfFile]),
|
||||||
|
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||||
|
?assertEqual(1, emqx_config:get([zones, myzone, mqtt, max_inflight])),
|
||||||
|
%% When there is an update in global default
|
||||||
|
emqx_config:put([mqtt, max_inflight], 2),
|
||||||
|
%% Then the value is reflected in default `zone' but not user-defined zone
|
||||||
|
?assertMatch(2, emqx_config:get([zones, default, mqtt, max_inflight])),
|
||||||
|
?assertMatch(1, emqx_config:get([zones, myzone, mqtt, max_inflight])).
|
||||||
|
|
||||||
|
t_update_global_defaults_no_updates_on_user_overrides(Config) ->
|
||||||
|
emqx_config:erase_all(),
|
||||||
|
%% Given default zone config in conf file.
|
||||||
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.default.mqtt.max_inflight=1">>, Config),
|
||||||
|
application:set_env(emqx, config_files, [ConfFile]),
|
||||||
|
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||||
|
?assertEqual(1, emqx_config:get([zones, default, mqtt, max_inflight])),
|
||||||
|
%% When there is an update in global default
|
||||||
|
emqx_config:put([mqtt, max_inflight], 20),
|
||||||
|
%% Then the value is not reflected in default `zone'
|
||||||
|
?assertMatch(1, emqx_config:get([zones, default, mqtt, max_inflight])).
|
||||||
|
|
||||||
|
t_zone_update_with_new_zone(Config) ->
|
||||||
|
emqx_config:erase_all(),
|
||||||
|
%% Given loaded an empty conf file
|
||||||
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||||
|
application:set_env(emqx, config_files, [ConfFile]),
|
||||||
|
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||||
|
%% When there is an update for creating new zone config
|
||||||
|
ok = emqx_config:put([zones, myzone, mqtt, max_inflight], 2),
|
||||||
|
%% Then the value is set and other roots are created with defaults.
|
||||||
|
GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
|
||||||
|
?assertEqual(
|
||||||
|
GDefaultMqtt#{max_inflight := 2},
|
||||||
|
emqx_config:get([zones, myzone, mqtt])
|
||||||
|
).
|
||||||
|
|
||||||
|
t_init_zone_with_global_defaults(_Config) ->
|
||||||
|
%% Given uninitialized empty config
|
||||||
|
emqx_config:erase_all(),
|
||||||
|
Zones = #{myzone => #{mqtt => #{max_inflight => 3}}},
|
||||||
|
%% when put zones with global default with emqx_config:put/1
|
||||||
|
GlobalDefaults = zone_global_defaults(),
|
||||||
|
AllConf = maps:put(zones, Zones, GlobalDefaults),
|
||||||
|
%% Then put sucess
|
||||||
|
?assertEqual(ok, emqx_config:put(AllConf)),
|
||||||
|
%% Then GlobalDefaults are set
|
||||||
|
?assertEqual(GlobalDefaults, maps:with(maps:keys(GlobalDefaults), emqx_config:get([]))),
|
||||||
|
%% Then my zone and default zone are set
|
||||||
|
{MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, myzone])),
|
||||||
|
{ZGDMQTT, ExpectedOthers} = maps:take(mqtt, GlobalDefaults),
|
||||||
|
?assertEqual(ZGDMQTT#{max_inflight := 3}, MqttV),
|
||||||
|
%% Then others are defaults
|
||||||
|
?assertEqual(ExpectedOthers, Others).
|
||||||
|
|
||||||
%%%
|
%%%
|
||||||
%%% Helpers
|
%%% Helpers
|
||||||
%%%
|
%%%
|
||||||
|
@ -502,3 +338,61 @@ prepare_conf_file(Name, Content, CTConfig) ->
|
||||||
tc_conf_file(TC, Config) ->
|
tc_conf_file(TC, Config) ->
|
||||||
DataDir = ?config(data_dir, Config),
|
DataDir = ?config(data_dir, Config),
|
||||||
filename:join([DataDir, TC, 'emqx.conf']).
|
filename:join([DataDir, TC, 'emqx.conf']).
|
||||||
|
|
||||||
|
zone_global_defaults() ->
|
||||||
|
#{
|
||||||
|
conn_congestion =>
|
||||||
|
#{enable_alarm => true, min_alarm_sustain_duration => 60000},
|
||||||
|
flapping_detect =>
|
||||||
|
#{ban_time => 300000, max_count => 15, window_time => disabled},
|
||||||
|
force_gc =>
|
||||||
|
#{bytes => 16777216, count => 16000, enable => true},
|
||||||
|
force_shutdown =>
|
||||||
|
#{
|
||||||
|
enable => true,
|
||||||
|
max_heap_size => 4194304,
|
||||||
|
max_mailbox_size => 1000
|
||||||
|
},
|
||||||
|
mqtt =>
|
||||||
|
#{
|
||||||
|
await_rel_timeout => 300000,
|
||||||
|
exclusive_subscription => false,
|
||||||
|
idle_timeout => 15000,
|
||||||
|
ignore_loop_deliver => false,
|
||||||
|
keepalive_backoff => 0.75,
|
||||||
|
keepalive_multiplier => 1.5,
|
||||||
|
max_awaiting_rel => 100,
|
||||||
|
max_clientid_len => 65535,
|
||||||
|
max_inflight => 32,
|
||||||
|
max_mqueue_len => 1000,
|
||||||
|
max_packet_size => 1048576,
|
||||||
|
max_qos_allowed => 2,
|
||||||
|
max_subscriptions => infinity,
|
||||||
|
max_topic_alias => 65535,
|
||||||
|
max_topic_levels => 128,
|
||||||
|
mqueue_default_priority => lowest,
|
||||||
|
mqueue_priorities => disabled,
|
||||||
|
mqueue_store_qos0 => true,
|
||||||
|
peer_cert_as_clientid => disabled,
|
||||||
|
peer_cert_as_username => disabled,
|
||||||
|
response_information => [],
|
||||||
|
retain_available => true,
|
||||||
|
retry_interval => 30000,
|
||||||
|
server_keepalive => disabled,
|
||||||
|
session_expiry_interval => 7200000,
|
||||||
|
shared_subscription => true,
|
||||||
|
strict_mode => false,
|
||||||
|
upgrade_qos => false,
|
||||||
|
use_username_as_clientid => false,
|
||||||
|
wildcard_subscription => true
|
||||||
|
},
|
||||||
|
overload_protection =>
|
||||||
|
#{
|
||||||
|
backoff_delay => 1,
|
||||||
|
backoff_gc => false,
|
||||||
|
backoff_hibernation => true,
|
||||||
|
backoff_new_conn => true,
|
||||||
|
enable => false
|
||||||
|
},
|
||||||
|
stats => #{enable => true}
|
||||||
|
}.
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
emqx_common_test_helpers:boot_modules(all),
|
||||||
?check_trace(
|
?check_trace(
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
emqx_common_test_helpers:start_apps([]),
|
emqx_common_test_helpers:start_apps([]),
|
||||||
|
|
Loading…
Reference in New Issue