Merge pull request #10790 from qzhuyan/perf/william/explicit-default-zone
perf(config): read zone conf once
This commit is contained in:
commit
34958cef0a
|
@ -71,10 +71,10 @@ jobs:
|
|||
./rebar3 xref
|
||||
./rebar3 dialyzer
|
||||
./rebar3 eunit -v
|
||||
./rebar3 ct -v --readable=true
|
||||
./rebar3 ct --name 'test@127.0.0.1' -v --readable=true
|
||||
./rebar3 proper -d test/props
|
||||
- uses: actions/upload-artifact@v3
|
||||
if: failure()
|
||||
with:
|
||||
name: logs
|
||||
name: logs-${{ matrix.runs-on }}
|
||||
path: apps/emqx/_build/test/logs
|
||||
|
|
|
@ -189,23 +189,11 @@ find_raw(KeyPath) ->
|
|||
|
||||
-spec get_zone_conf(atom(), emqx_utils_maps:config_key_path()) -> term().
|
||||
get_zone_conf(Zone, KeyPath) ->
|
||||
case find(?ZONE_CONF_PATH(Zone, KeyPath)) of
|
||||
%% not found in zones, try to find the global config
|
||||
{not_found, _, _} ->
|
||||
?MODULE:get(KeyPath);
|
||||
{ok, Value} ->
|
||||
Value
|
||||
end.
|
||||
?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath)).
|
||||
|
||||
-spec get_zone_conf(atom(), emqx_utils_maps:config_key_path(), term()) -> term().
|
||||
get_zone_conf(Zone, KeyPath, Default) ->
|
||||
case find(?ZONE_CONF_PATH(Zone, KeyPath)) of
|
||||
%% not found in zones, try to find the global config
|
||||
{not_found, _, _} ->
|
||||
?MODULE:get(KeyPath, Default);
|
||||
{ok, Value} ->
|
||||
Value
|
||||
end.
|
||||
?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath), Default).
|
||||
|
||||
-spec put_zone_conf(atom(), emqx_utils_maps:config_key_path(), term()) -> ok.
|
||||
put_zone_conf(Zone, KeyPath, Conf) ->
|
||||
|
@ -230,6 +218,9 @@ find_listener_conf(Type, Listener, KeyPath) ->
|
|||
|
||||
-spec put(map()) -> ok.
|
||||
put(Config) ->
|
||||
put_with_order(Config).
|
||||
|
||||
put1(Config) ->
|
||||
maps:fold(
|
||||
fun(RootName, RootValue, _) ->
|
||||
?MODULE:put([atom(RootName)], RootValue)
|
||||
|
@ -245,8 +236,8 @@ erase(RootName) ->
|
|||
|
||||
-spec put(emqx_utils_maps:config_key_path(), term()) -> ok.
|
||||
put(KeyPath, Config) ->
|
||||
Putter = fun(Path, Map, Value) ->
|
||||
emqx_utils_maps:deep_put(Path, Map, Value)
|
||||
Putter = fun(_Path, Map, Value) ->
|
||||
maybe_update_zone(KeyPath, Map, Value)
|
||||
end,
|
||||
do_put(?CONF, Putter, KeyPath, Config).
|
||||
|
||||
|
@ -339,7 +330,9 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) ->
|
|||
%% check configs against the schema
|
||||
{AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf, #{}),
|
||||
save_to_app_env(AppEnvs),
|
||||
ok = save_to_config_map(CheckedConf, RawConf).
|
||||
ok = save_to_config_map(CheckedConf, RawConf),
|
||||
maybe_init_default_zone(),
|
||||
ok.
|
||||
|
||||
%% Merge environment variable overrides on top, then merge with overrides.
|
||||
overlay_v0(SchemaMod, RawConf) when is_map(RawConf) ->
|
||||
|
@ -788,3 +781,130 @@ to_atom_conf_path(Path, OnFail) ->
|
|||
V
|
||||
end
|
||||
end.
|
||||
|
||||
%% @doc Init zones under root `zones'
|
||||
%% 1. ensure one `default' zone as it is referenced by listeners.
|
||||
%% if default zone is unset, clone all default values from `GlobalDefaults'
|
||||
%% if default zone is set, values are merged with `GlobalDefaults'
|
||||
%% 2. For any user defined zones, merge with `GlobalDefaults'
|
||||
%%
|
||||
%% note1, this should be called as post action after emqx_config terms (zones, and GlobalDefaults)
|
||||
%% are written in the PV storage during emqx config loading/initialization.
|
||||
-spec maybe_init_default_zone() -> skip | ok.
|
||||
maybe_init_default_zone() ->
|
||||
case emqx_config:get([zones], ?CONFIG_NOT_FOUND_MAGIC) of
|
||||
?CONFIG_NOT_FOUND_MAGIC ->
|
||||
skip;
|
||||
Zones0 when is_map(Zones0) ->
|
||||
Zones =
|
||||
case Zones0 of
|
||||
#{default := _DefaultZone} = Z1 ->
|
||||
Z1;
|
||||
Z2 ->
|
||||
Z2#{default => #{}}
|
||||
end,
|
||||
GLD = zone_global_defaults(),
|
||||
NewZones = maps:map(
|
||||
fun(_ZoneName, ZoneVal) ->
|
||||
merge_with_global_defaults(GLD, ZoneVal)
|
||||
end,
|
||||
Zones
|
||||
),
|
||||
?MODULE:put([zones], NewZones)
|
||||
end.
|
||||
|
||||
-spec merge_with_global_defaults(map(), map()) -> map().
|
||||
merge_with_global_defaults(GlobalDefaults, ZoneVal) ->
|
||||
emqx_utils_maps:deep_merge(GlobalDefaults, ZoneVal).
|
||||
|
||||
%% @doc Update zones
|
||||
%% when 1) zone updates, return *new* zones
|
||||
%% when 2) zone global config updates, write to PT directly.
|
||||
%% Zone global defaults are always presented in the configmap (PT) when updating zone
|
||||
-spec maybe_update_zone(runtime_config_key_path(), RootValue :: map(), Val :: term()) ->
|
||||
NewZoneVal :: map().
|
||||
maybe_update_zone([zones | T], ZonesValue, Value) ->
|
||||
%% note, do not write to PT, return *New value* instead
|
||||
NewZonesValue = emqx_utils_maps:deep_put(T, ZonesValue, Value),
|
||||
ExistingZoneNames = maps:keys(?MODULE:get([zones], #{})),
|
||||
%% Update only new zones with global defaults
|
||||
GLD = zone_global_defaults(),
|
||||
maps:fold(
|
||||
fun(ZoneName, ZoneValue, Acc) ->
|
||||
Acc#{ZoneName := merge_with_global_defaults(GLD, ZoneValue)}
|
||||
end,
|
||||
NewZonesValue,
|
||||
maps:without(ExistingZoneNames, NewZonesValue)
|
||||
);
|
||||
maybe_update_zone([RootName | T], RootValue, Value) when is_atom(RootName) ->
|
||||
NewRootValue = emqx_utils_maps:deep_put(T, RootValue, Value),
|
||||
case is_zone_root(RootName) of
|
||||
false ->
|
||||
skip;
|
||||
true ->
|
||||
%% When updates on global default roots.
|
||||
ExistingZones = ?MODULE:get([zones], #{}),
|
||||
RootNameBin = atom_to_binary(RootName),
|
||||
NewZones = maps:map(
|
||||
fun(ZoneName, ZoneVal) ->
|
||||
BinPath = [<<"zones">>, atom_to_binary(ZoneName), RootNameBin],
|
||||
case
|
||||
%% look for user defined value from RAWCONF
|
||||
?MODULE:get_raw(
|
||||
BinPath,
|
||||
?CONFIG_NOT_FOUND_MAGIC
|
||||
)
|
||||
of
|
||||
?CONFIG_NOT_FOUND_MAGIC ->
|
||||
ZoneVal#{RootName => NewRootValue};
|
||||
RawUserZoneRoot ->
|
||||
UserDefinedValues = rawconf_to_conf(
|
||||
emqx_schema, BinPath, RawUserZoneRoot
|
||||
),
|
||||
ZoneVal#{
|
||||
RootName :=
|
||||
emqx_utils_maps:deep_merge(
|
||||
NewRootValue,
|
||||
UserDefinedValues
|
||||
)
|
||||
}
|
||||
end
|
||||
end,
|
||||
ExistingZones
|
||||
),
|
||||
persistent_term:put(?PERSIS_KEY(?CONF, zones), NewZones)
|
||||
end,
|
||||
NewRootValue.
|
||||
|
||||
zone_global_defaults() ->
|
||||
maps:from_list([{K, ?MODULE:get([K])} || K <- zone_roots()]).
|
||||
|
||||
-spec is_zone_root(atom) -> boolean().
|
||||
is_zone_root(Name) ->
|
||||
lists:member(Name, zone_roots()).
|
||||
|
||||
-spec zone_roots() -> [atom()].
|
||||
zone_roots() ->
|
||||
lists:map(fun list_to_atom/1, emqx_zone_schema:roots()).
|
||||
|
||||
%%%
|
||||
%%% @doc During init, ensure order of puts that zone is put after the other global defaults.
|
||||
%%%
|
||||
put_with_order(#{zones := _Zones} = Conf) ->
|
||||
put1(maps:without([zones], Conf)),
|
||||
put1(maps:with([zones], Conf));
|
||||
put_with_order(Conf) ->
|
||||
put1(Conf).
|
||||
|
||||
%%
|
||||
%% @doc Helper function that converts raw conf val to runtime conf val
|
||||
%% with the types info from schema module
|
||||
-spec rawconf_to_conf(module(), RawPath :: [binary()], RawValue :: term()) -> term().
|
||||
rawconf_to_conf(SchemaModule, RawPath, RawValue) ->
|
||||
{_, RawUserDefinedValues} =
|
||||
check_config(
|
||||
SchemaModule,
|
||||
emqx_utils_maps:deep_put(RawPath, #{}, RawValue)
|
||||
),
|
||||
AtomPath = to_atom_conf_path(RawPath, {raise_error, maybe_update_zone_error}),
|
||||
emqx_utils_maps:deep_get(AtomPath, RawUserDefinedValues).
|
||||
|
|
|
@ -41,8 +41,6 @@
|
|||
exclusive_subscription => boolean()
|
||||
}.
|
||||
|
||||
-define(MAX_TOPIC_LEVELS, 65535).
|
||||
|
||||
-define(PUBCAP_KEYS, [
|
||||
max_topic_levels,
|
||||
max_qos_allowed,
|
||||
|
@ -154,8 +152,5 @@ get_caps(Zone) ->
|
|||
get_caps(Keys, Zone) ->
|
||||
maps:with(
|
||||
Keys,
|
||||
maps:merge(
|
||||
emqx_config:get([mqtt]),
|
||||
emqx_config:get_zone_conf(Zone, [mqtt])
|
||||
)
|
||||
emqx_config:get_zone_conf(Zone, [mqtt])
|
||||
).
|
||||
|
|
|
@ -27,176 +27,6 @@
|
|||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
force_gc_conf() ->
|
||||
#{bytes => 16777216, count => 16000, enable => true}.
|
||||
|
||||
force_shutdown_conf() ->
|
||||
#{enable => true, max_heap_size => 4194304, max_mailbox_size => 1000}.
|
||||
|
||||
rpc_conf() ->
|
||||
#{
|
||||
async_batch_size => 256,
|
||||
authentication_timeout => 5000,
|
||||
call_receive_timeout => 15000,
|
||||
connect_timeout => 5000,
|
||||
mode => async,
|
||||
port_discovery => stateless,
|
||||
send_timeout => 5000,
|
||||
socket_buffer => 1048576,
|
||||
socket_keepalive_count => 9,
|
||||
socket_keepalive_idle => 900,
|
||||
socket_keepalive_interval => 75,
|
||||
socket_recbuf => 1048576,
|
||||
socket_sndbuf => 1048576,
|
||||
tcp_client_num => 1,
|
||||
tcp_server_port => 5369
|
||||
}.
|
||||
|
||||
mqtt_conf() ->
|
||||
#{
|
||||
await_rel_timeout => 300000,
|
||||
idle_timeout => 15000,
|
||||
ignore_loop_deliver => false,
|
||||
keepalive_backoff => 0.75,
|
||||
max_awaiting_rel => 100,
|
||||
max_clientid_len => 65535,
|
||||
max_inflight => 32,
|
||||
max_mqueue_len => 1000,
|
||||
max_packet_size => 1048576,
|
||||
max_qos_allowed => 2,
|
||||
max_subscriptions => infinity,
|
||||
max_topic_alias => 65535,
|
||||
max_topic_levels => 128,
|
||||
mqueue_default_priority => lowest,
|
||||
mqueue_priorities => disabled,
|
||||
mqueue_store_qos0 => true,
|
||||
peer_cert_as_clientid => disabled,
|
||||
peer_cert_as_username => disabled,
|
||||
response_information => [],
|
||||
retain_available => true,
|
||||
retry_interval => 30000,
|
||||
server_keepalive => disabled,
|
||||
session_expiry_interval => 7200000,
|
||||
shared_subscription => true,
|
||||
strict_mode => false,
|
||||
upgrade_qos => false,
|
||||
use_username_as_clientid => false,
|
||||
wildcard_subscription => true
|
||||
}.
|
||||
|
||||
listener_mqtt_tcp_conf() ->
|
||||
#{
|
||||
acceptors => 16,
|
||||
zone => default,
|
||||
access_rules => ["allow all"],
|
||||
bind => {{0, 0, 0, 0}, 1883},
|
||||
max_connections => 1024000,
|
||||
mountpoint => <<>>,
|
||||
proxy_protocol => false,
|
||||
proxy_protocol_timeout => 3000,
|
||||
tcp_options => #{
|
||||
active_n => 100,
|
||||
backlog => 1024,
|
||||
buffer => 4096,
|
||||
high_watermark => 1048576,
|
||||
nodelay => false,
|
||||
reuseaddr => true,
|
||||
send_timeout => 15000,
|
||||
send_timeout_close => true
|
||||
}
|
||||
}.
|
||||
|
||||
listener_mqtt_ws_conf() ->
|
||||
#{
|
||||
acceptors => 16,
|
||||
zone => default,
|
||||
access_rules => ["allow all"],
|
||||
bind => {{0, 0, 0, 0}, 8083},
|
||||
max_connections => 1024000,
|
||||
mountpoint => <<>>,
|
||||
proxy_protocol => false,
|
||||
proxy_protocol_timeout => 3000,
|
||||
tcp_options =>
|
||||
#{
|
||||
active_n => 100,
|
||||
backlog => 1024,
|
||||
buffer => 4096,
|
||||
high_watermark => 1048576,
|
||||
nodelay => false,
|
||||
reuseaddr => true,
|
||||
send_timeout => 15000,
|
||||
send_timeout_close => true
|
||||
},
|
||||
websocket =>
|
||||
#{
|
||||
allow_origin_absence => true,
|
||||
check_origin_enable => false,
|
||||
check_origins => [],
|
||||
compress => false,
|
||||
deflate_opts =>
|
||||
#{
|
||||
client_max_window_bits => 15,
|
||||
mem_level => 8,
|
||||
server_max_window_bits => 15
|
||||
},
|
||||
fail_if_no_subprotocol => true,
|
||||
idle_timeout => 86400000,
|
||||
max_frame_size => infinity,
|
||||
mqtt_path => "/mqtt",
|
||||
mqtt_piggyback => multiple,
|
||||
% should allow uppercase in config
|
||||
proxy_address_header => "X-Forwarded-For",
|
||||
proxy_port_header => "x-forwarded-port",
|
||||
supported_subprotocols =>
|
||||
["mqtt", "mqtt-v3", "mqtt-v3.1.1", "mqtt-v5"]
|
||||
}
|
||||
}.
|
||||
|
||||
listeners_conf() ->
|
||||
#{
|
||||
tcp => #{default => listener_mqtt_tcp_conf()},
|
||||
ws => #{default => listener_mqtt_ws_conf()}
|
||||
}.
|
||||
|
||||
limiter_conf() ->
|
||||
Make = fun() ->
|
||||
#{
|
||||
burst => 0,
|
||||
rate => infinity
|
||||
}
|
||||
end,
|
||||
|
||||
lists:foldl(
|
||||
fun(Name, Acc) ->
|
||||
Acc#{Name => Make()}
|
||||
end,
|
||||
#{},
|
||||
[bytes, messages, message_routing, connection, internal]
|
||||
).
|
||||
|
||||
stats_conf() ->
|
||||
#{enable => true}.
|
||||
|
||||
zone_conf() ->
|
||||
#{}.
|
||||
|
||||
basic_conf() ->
|
||||
#{
|
||||
force_gc => force_gc_conf(),
|
||||
force_shutdown => force_shutdown_conf(),
|
||||
mqtt => mqtt_conf(),
|
||||
rpc => rpc_conf(),
|
||||
stats => stats_conf(),
|
||||
listeners => listeners_conf(),
|
||||
zones => zone_conf(),
|
||||
limiter => limiter_conf()
|
||||
}.
|
||||
|
||||
set_test_listener_confs() ->
|
||||
Conf = emqx_config:get([], #{}),
|
||||
emqx_config:put(basic_conf()),
|
||||
Conf.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% CT Callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -242,14 +72,11 @@ init_per_testcase(_TestCase, Config) ->
|
|||
fun(_) -> {ok, #{is_superuser => false}} end
|
||||
),
|
||||
ok = meck:expect(emqx_access_control, authorize, fun(_, _, _) -> allow end),
|
||||
%% Set confs
|
||||
OldConf = set_test_listener_confs(),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
[{config, OldConf} | Config].
|
||||
Config.
|
||||
|
||||
end_per_testcase(_TestCase, Config) ->
|
||||
meck:unload([emqx_access_control]),
|
||||
emqx_config:put(?config(config, Config)),
|
||||
emqx_common_test_helpers:stop_apps([]),
|
||||
Config.
|
||||
|
||||
|
|
|
@ -383,7 +383,7 @@ t_certcn_as_clientid_tlsv1_2(_) ->
|
|||
tls_certcn_as_clientid('tlsv1.2').
|
||||
|
||||
t_peercert_preserved_before_connected(_) ->
|
||||
ok = emqx_config:put_zone_conf(default, [mqtt], #{}),
|
||||
ok = emqx_config:put_zone_conf(default, [mqtt, peer_cert_as_clientid], false),
|
||||
ok = emqx_hooks:add(
|
||||
'client.connect',
|
||||
{?MODULE, on_hook, ['client.connect', self()]},
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
@ -96,3 +97,302 @@ t_unknown_rook_keys(_) ->
|
|||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_init_load_emqx_schema(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given empty config file
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
%% When load emqx_schema
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||
%% Then default zone is injected with all global defaults
|
||||
Default = emqx_config:get([zones, default]),
|
||||
MQTT = emqx_config:get([mqtt]),
|
||||
Stats = emqx_config:get([stats]),
|
||||
FD = emqx_config:get([flapping_detect]),
|
||||
FS = emqx_config:get([force_shutdown]),
|
||||
CC = emqx_config:get([conn_congestion]),
|
||||
FG = emqx_config:get([force_gc]),
|
||||
OP = emqx_config:get([overload_protection]),
|
||||
?assertMatch(
|
||||
#{
|
||||
mqtt := MQTT,
|
||||
stats := Stats,
|
||||
flapping_detect := FD,
|
||||
force_shutdown := FS,
|
||||
conn_congestion := CC,
|
||||
force_gc := FG,
|
||||
overload_protection := OP
|
||||
},
|
||||
Default
|
||||
).
|
||||
|
||||
t_init_zones_load_emqx_schema_no_default_for_none_existing(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given empty config file
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
%% When emqx_schema is loaded
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||
%% Then read for none existing zone should throw error
|
||||
?assertError(
|
||||
{config_not_found, [zones, no_exists]},
|
||||
emqx_config:get([zones, no_exists])
|
||||
).
|
||||
|
||||
t_init_zones_load_other_schema(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given empty config file
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
%% When load emqx_limiter_schema, not emqx_schema
|
||||
%% Then load should success
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_limiter_schema)),
|
||||
%% Then no zones is loaded.
|
||||
?assertError(
|
||||
{config_not_found, [zones]},
|
||||
emqx_config:get([zones])
|
||||
),
|
||||
%% Then no default zone is loaded.
|
||||
?assertError(
|
||||
{config_not_found, [zones, default]},
|
||||
emqx_config:get([zones, default])
|
||||
).
|
||||
|
||||
t_init_zones_with_user_defined_default_zone(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given user defined config for default zone
|
||||
ConfFile = prepare_conf_file(
|
||||
?FUNCTION_NAME, <<"zones.default.mqtt.max_topic_alias=1024">>, Config
|
||||
),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
%% When schema is loaded
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||
|
||||
%% Then user defined value is set
|
||||
{MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, default])),
|
||||
{ZGDMQTT, ExpectedOthers} = maps:take(mqtt, zone_global_defaults()),
|
||||
?assertEqual(ZGDMQTT#{max_topic_alias := 1024}, MqttV),
|
||||
%% Then others are defaults
|
||||
?assertEqual(ExpectedOthers, Others).
|
||||
|
||||
t_init_zones_with_user_defined_other_zone(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given user defined config for default zone
|
||||
ConfFile = prepare_conf_file(
|
||||
?FUNCTION_NAME, <<"zones.myzone.mqtt.max_topic_alias=1024">>, Config
|
||||
),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
%% When schema is loaded
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||
%% Then user defined value is set and others are defaults
|
||||
|
||||
%% Then user defined value is set
|
||||
{MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, myzone])),
|
||||
{ZGDMQTT, ExpectedOthers} = maps:take(mqtt, zone_global_defaults()),
|
||||
?assertEqual(ZGDMQTT#{max_topic_alias := 1024}, MqttV),
|
||||
%% Then others are defaults
|
||||
?assertEqual(ExpectedOthers, Others),
|
||||
%% Then default zone still have the defaults
|
||||
?assertEqual(zone_global_defaults(), emqx_config:get([zones, default])).
|
||||
|
||||
t_init_zones_with_cust_root_mqtt(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given config file with mqtt user overrides
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"mqtt.retry_interval=10m">>, Config),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
%% When emqx_schema is loaded
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||
%% Then the value is reflected as internal representation in default `zone'
|
||||
%% and other fields under mqtt are defaults.
|
||||
GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
|
||||
?assertEqual(
|
||||
GDefaultMqtt#{retry_interval := 600000},
|
||||
emqx_config:get([zones, default, mqtt])
|
||||
).
|
||||
|
||||
t_default_zone_is_updated_after_global_defaults_updated(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given empty emqx conf
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||
?assertNotEqual(900000, emqx_config:get([zones, default, mqtt, retry_interval])),
|
||||
%% When emqx_schema is loaded
|
||||
emqx_config:put([mqtt, retry_interval], 900000),
|
||||
%% Then the value is reflected in default `zone' and other fields under mqtt are defaults.
|
||||
GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
|
||||
?assertEqual(
|
||||
GDefaultMqtt#{retry_interval := 900000},
|
||||
emqx_config:get([zones, default, mqtt])
|
||||
).
|
||||
|
||||
t_myzone_is_updated_after_global_defaults_updated(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given emqx conf file with user override in myzone (none default zone)
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=32">>, Config),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||
?assertNotEqual(900000, emqx_config:get([zones, myzone, mqtt, retry_interval])),
|
||||
%% When update another value of global default
|
||||
emqx_config:put([mqtt, retry_interval], 900000),
|
||||
%% Then the value is reflected in myzone and the user defined value unchanged.
|
||||
GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
|
||||
?assertEqual(
|
||||
GDefaultMqtt#{
|
||||
retry_interval := 900000,
|
||||
max_inflight := 32
|
||||
},
|
||||
emqx_config:get([zones, myzone, mqtt])
|
||||
),
|
||||
%% Then the value is reflected in default zone as well.
|
||||
?assertEqual(
|
||||
GDefaultMqtt#{retry_interval := 900000},
|
||||
emqx_config:get([zones, default, mqtt])
|
||||
).
|
||||
|
||||
t_zone_no_user_defined_overrides(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given emqx conf file with user specified myzone
|
||||
ConfFile = prepare_conf_file(
|
||||
?FUNCTION_NAME, <<"zones.myzone.mqtt.retry_interval=10m">>, Config
|
||||
),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||
?assertEqual(600000, emqx_config:get([zones, myzone, mqtt, retry_interval])),
|
||||
%% When there is an update in global default
|
||||
emqx_config:put([mqtt, max_inflight], 2),
|
||||
%% Then the value is reflected in both default and myzone
|
||||
?assertMatch(2, emqx_config:get([zones, default, mqtt, max_inflight])),
|
||||
?assertMatch(2, emqx_config:get([zones, myzone, mqtt, max_inflight])),
|
||||
%% Then user defined value from config is not overwritten
|
||||
?assertMatch(600000, emqx_config:get([zones, myzone, mqtt, retry_interval])).
|
||||
|
||||
t_zone_no_user_defined_overrides_internal_represent(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given emqx conf file with user specified myzone
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.myzone.mqtt.max_inflight=1">>, Config),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||
?assertEqual(1, emqx_config:get([zones, myzone, mqtt, max_inflight])),
|
||||
%% When there is an update in global default
|
||||
emqx_config:put([mqtt, max_inflight], 2),
|
||||
%% Then the value is reflected in default `zone' but not user-defined zone
|
||||
?assertMatch(2, emqx_config:get([zones, default, mqtt, max_inflight])),
|
||||
?assertMatch(1, emqx_config:get([zones, myzone, mqtt, max_inflight])).
|
||||
|
||||
t_update_global_defaults_no_updates_on_user_overrides(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given default zone config in conf file.
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"zones.default.mqtt.max_inflight=1">>, Config),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||
?assertEqual(1, emqx_config:get([zones, default, mqtt, max_inflight])),
|
||||
%% When there is an update in global default
|
||||
emqx_config:put([mqtt, max_inflight], 20),
|
||||
%% Then the value is not reflected in default `zone'
|
||||
?assertMatch(1, emqx_config:get([zones, default, mqtt, max_inflight])).
|
||||
|
||||
t_zone_update_with_new_zone(Config) ->
|
||||
emqx_config:erase_all(),
|
||||
%% Given loaded an empty conf file
|
||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, <<"">>, Config),
|
||||
application:set_env(emqx, config_files, [ConfFile]),
|
||||
?assertEqual(ok, emqx_config:init_load(emqx_schema)),
|
||||
%% When there is an update for creating new zone config
|
||||
ok = emqx_config:put([zones, myzone, mqtt, max_inflight], 2),
|
||||
%% Then the value is set and other roots are created with defaults.
|
||||
GDefaultMqtt = maps:get(mqtt, zone_global_defaults()),
|
||||
?assertEqual(
|
||||
GDefaultMqtt#{max_inflight := 2},
|
||||
emqx_config:get([zones, myzone, mqtt])
|
||||
).
|
||||
|
||||
t_init_zone_with_global_defaults(_Config) ->
|
||||
%% Given uninitialized empty config
|
||||
emqx_config:erase_all(),
|
||||
Zones = #{myzone => #{mqtt => #{max_inflight => 3}}},
|
||||
%% when put zones with global default with emqx_config:put/1
|
||||
GlobalDefaults = zone_global_defaults(),
|
||||
AllConf = maps:put(zones, Zones, GlobalDefaults),
|
||||
%% Then put sucess
|
||||
?assertEqual(ok, emqx_config:put(AllConf)),
|
||||
%% Then GlobalDefaults are set
|
||||
?assertEqual(GlobalDefaults, maps:with(maps:keys(GlobalDefaults), emqx_config:get([]))),
|
||||
%% Then my zone and default zone are set
|
||||
{MqttV, Others} = maps:take(mqtt, emqx_config:get([zones, myzone])),
|
||||
{ZGDMQTT, ExpectedOthers} = maps:take(mqtt, GlobalDefaults),
|
||||
?assertEqual(ZGDMQTT#{max_inflight := 3}, MqttV),
|
||||
%% Then others are defaults
|
||||
?assertEqual(ExpectedOthers, Others).
|
||||
|
||||
%%%
|
||||
%%% Helpers
|
||||
%%%
|
||||
prepare_conf_file(Name, Content, CTConfig) ->
|
||||
Filename = tc_conf_file(Name, CTConfig),
|
||||
filelib:ensure_dir(Filename),
|
||||
ok = file:write_file(Filename, Content),
|
||||
Filename.
|
||||
|
||||
tc_conf_file(TC, Config) ->
|
||||
DataDir = ?config(data_dir, Config),
|
||||
filename:join([DataDir, TC, 'emqx.conf']).
|
||||
|
||||
zone_global_defaults() ->
|
||||
#{
|
||||
conn_congestion =>
|
||||
#{enable_alarm => true, min_alarm_sustain_duration => 60000},
|
||||
flapping_detect =>
|
||||
#{ban_time => 300000, max_count => 15, window_time => disabled},
|
||||
force_gc =>
|
||||
#{bytes => 16777216, count => 16000, enable => true},
|
||||
force_shutdown =>
|
||||
#{
|
||||
enable => true,
|
||||
max_heap_size => 4194304,
|
||||
max_mailbox_size => 1000
|
||||
},
|
||||
mqtt =>
|
||||
#{
|
||||
await_rel_timeout => 300000,
|
||||
exclusive_subscription => false,
|
||||
idle_timeout => 15000,
|
||||
ignore_loop_deliver => false,
|
||||
keepalive_backoff => 0.75,
|
||||
keepalive_multiplier => 1.5,
|
||||
max_awaiting_rel => 100,
|
||||
max_clientid_len => 65535,
|
||||
max_inflight => 32,
|
||||
max_mqueue_len => 1000,
|
||||
max_packet_size => 1048576,
|
||||
max_qos_allowed => 2,
|
||||
max_subscriptions => infinity,
|
||||
max_topic_alias => 65535,
|
||||
max_topic_levels => 128,
|
||||
mqueue_default_priority => lowest,
|
||||
mqueue_priorities => disabled,
|
||||
mqueue_store_qos0 => true,
|
||||
peer_cert_as_clientid => disabled,
|
||||
peer_cert_as_username => disabled,
|
||||
response_information => [],
|
||||
retain_available => true,
|
||||
retry_interval => 30000,
|
||||
server_keepalive => disabled,
|
||||
session_expiry_interval => 7200000,
|
||||
shared_subscription => true,
|
||||
strict_mode => false,
|
||||
upgrade_qos => false,
|
||||
use_username_as_clientid => false,
|
||||
wildcard_subscription => true
|
||||
},
|
||||
overload_protection =>
|
||||
#{
|
||||
backoff_delay => 1,
|
||||
backoff_gc => false,
|
||||
backoff_hibernation => true,
|
||||
backoff_new_conn => true,
|
||||
enable => false
|
||||
},
|
||||
stats => #{enable => true}
|
||||
}.
|
||||
|
|
|
@ -57,7 +57,6 @@ init_per_suite(Config) ->
|
|||
ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end),
|
||||
ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end),
|
||||
|
||||
emqx_channel_SUITE:set_test_listener_confs(),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_channel_SUITE:set_test_listener_confs(),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
ok = meck:new(
|
||||
[emqx_hooks, emqx_metrics, emqx_broker],
|
||||
[passthrough, no_history, no_link]
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-define(SUITE, ?MODULE).
|
||||
|
||||
|
@ -46,12 +47,30 @@
|
|||
all() -> emqx_common_test_helpers:all(?SUITE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
net_kernel:start(['master@127.0.0.1', longnames]),
|
||||
DistPid =
|
||||
case net_kernel:nodename() of
|
||||
ignored ->
|
||||
%% calling `net_kernel:start' without `epmd'
|
||||
%% running will result in a failure.
|
||||
emqx_common_test_helpers:start_epmd(),
|
||||
{ok, Pid} = net_kernel:start(['master@127.0.0.1', longnames]),
|
||||
ct:pal("start epmd, node name: ~p", [node()]),
|
||||
Pid;
|
||||
_ ->
|
||||
undefined
|
||||
end,
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
[{dist_pid, DistPid} | Config].
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
end_per_suite(Config) ->
|
||||
DistPid = ?config(dist_pid, Config),
|
||||
case DistPid of
|
||||
Pid when is_pid(Pid) ->
|
||||
net_kernel:stop();
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(Case, Config) ->
|
||||
|
|
|
@ -34,7 +34,6 @@ all() -> emqx_common_test_helpers:all(?MODULE).
|
|||
|
||||
init_per_suite(Config) ->
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_channel_SUITE:set_test_listener_confs(),
|
||||
?check_trace(
|
||||
?wait_async_action(
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
|
|
|
@ -137,7 +137,6 @@ end_per_testcase(_, Config) ->
|
|||
Config.
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_channel_SUITE:set_test_listener_confs(),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ t_start_no_session(_Config) ->
|
|||
Opts = #{
|
||||
clientinfo => #{
|
||||
clientid => ?CLIENT_ID,
|
||||
zone => internal
|
||||
zone => default
|
||||
},
|
||||
conninfo => #{
|
||||
clientid => ?CLIENT_ID,
|
||||
|
@ -76,7 +76,7 @@ t_start_no_expire(_Config) ->
|
|||
Opts = #{
|
||||
clientinfo => #{
|
||||
clientid => ?CLIENT_ID,
|
||||
zone => internal
|
||||
zone => default
|
||||
},
|
||||
conninfo => #{
|
||||
clientid => ?CLIENT_ID,
|
||||
|
@ -97,7 +97,7 @@ t_start_infinite_expire(_Config) ->
|
|||
Opts = #{
|
||||
clientinfo => #{
|
||||
clientid => ?CLIENT_ID,
|
||||
zone => internal
|
||||
zone => default
|
||||
},
|
||||
conninfo => #{
|
||||
clientid => ?CLIENT_ID,
|
||||
|
|
|
@ -137,9 +137,14 @@ t_global_zone(_Config) ->
|
|||
),
|
||||
?assertEqual(lists:usort(ZonesKeys), lists:usort(maps:keys(Zones))),
|
||||
?assertEqual(
|
||||
emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed]),
|
||||
emqx_config:get_zone_conf(default, [mqtt, max_qos_allowed]),
|
||||
emqx_utils_maps:deep_get([<<"mqtt">>, <<"max_qos_allowed">>], Zones)
|
||||
),
|
||||
?assertError(
|
||||
{config_not_found, [zones, no_default, mqtt, max_qos_allowed]},
|
||||
emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed])
|
||||
),
|
||||
|
||||
NewZones1 = emqx_utils_maps:deep_put([<<"mqtt">>, <<"max_qos_allowed">>], Zones, 1),
|
||||
NewZones2 = emqx_utils_maps:deep_remove([<<"mqtt">>, <<"peer_cert_as_clientid">>], NewZones1),
|
||||
{ok, #{<<"mqtt">> := Res}} = update_global_zone(NewZones2),
|
||||
|
@ -151,7 +156,11 @@ t_global_zone(_Config) ->
|
|||
},
|
||||
Res
|
||||
),
|
||||
?assertEqual(1, emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed])),
|
||||
?assertEqual(1, emqx_config:get_zone_conf(default, [mqtt, max_qos_allowed])),
|
||||
?assertError(
|
||||
{config_not_found, [zones, no_default, mqtt, max_qos_allowed]},
|
||||
emqx_config:get_zone_conf(no_default, [mqtt, max_qos_allowed])
|
||||
),
|
||||
%% Make sure the override config is updated, and remove the default value.
|
||||
?assertMatch(#{<<"max_qos_allowed">> := 1}, read_conf(<<"mqtt">>)),
|
||||
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
Reducing overhead of reading configs per zone.
|
||||
|
Loading…
Reference in New Issue