feat(config): update emqx_schema for logger

This commit is contained in:
Shawn 2021-07-01 11:47:33 +08:00
parent d2c50baf93
commit 66aaed1f87
2 changed files with 55 additions and 475 deletions

View File

@ -336,7 +336,7 @@ log {
## @doc log.console_handler.enable
## ValueType: Boolean
## Default: false
console_handler.enable = false
console_handler.enable: false
## The log level of this handler
## All the log messages with levels lower than this level will
@ -345,13 +345,13 @@ log {
## @doc log.console_handler.level
## ValueType: debug | info | notice | warning | error | critical | alert | emergency
## Default: warning
console_handler.level = warning
console_handler.level: warning
##----------------------------------------------------------------
## The file log handlers send log messages to files
##----------------------------------------------------------------
## file_handlers.<name>
file_handlers.emqx_default: {
file_handlers.emqx_log: {
## The log level filter of this handler
## All the log messages with levels lower than this level will
## be dropped.
@ -383,7 +383,7 @@ log {
## @doc log.file_handlers.<name>.rotation.enable
## ValueType: Boolean
## Default: true
rotation.enable = true
rotation.enable: true
## Maximum rotation count of log files.
##
@ -409,7 +409,7 @@ log {
##
## You could also create multiple file handlers for different
## log level for example:
file_handlers.emqx_error: {
file_handlers.emqx_error_log: {
level: error
file: "{{ platform_log_dir }}/error.log"
}
@ -422,7 +422,7 @@ log {
## - "utc" for Universal Coordinated Time (UTC)
## - "+hh:mm" or "-hh:mm" for a specified offset
## Default: system
time_offset = system
time_offset: system
## Limits the total number of characters printed for each log event.
##
@ -437,7 +437,7 @@ log {
## @doc log.max_depth
## ValueType: Integer | infinity
## Default: 80
max_depth = 80
max_depth: 80
## Log formatter
## @doc log.formatter
@ -2120,7 +2120,7 @@ example_common_ssl_options {
## @doc listeners.<name>.ssl.depth
## ValueType: Number
## Default: 10
ssl.depth = 10
ssl.depth: 10
## String containing the user's password. Only used if the private keyfile
## is password-protected.
@ -2130,7 +2130,7 @@ example_common_ssl_options {
## @doc listeners.<name>.ssl.depth
## ValueType: String
## Default: ""
#ssl.key_password = ""
#ssl.key_password: ""
## The Ephemeral Diffie-Helman key exchange is a very effective way of
## ensuring Forward Secrecy by exchanging a set of keys that never hit
@ -2248,14 +2248,14 @@ example_common_websocket_options {
## @doc listeners.<name>.websocket.fail_if_no_subprotocol
## ValueType: Boolean
## Default: true
websocket.fail_if_no_subprotocol = true
websocket.fail_if_no_subprotocol: true
## Enable origin check in header for websocket connection
##
## @doc listeners.<name>.websocket.check_origin_enable
## ValueType: Boolean
## Default: false
websocket.check_origin_enable = false
websocket.check_origin_enable: false
## Allow origin to be absent in header in websocket connection
## when check_origin_enable is true
@ -2263,7 +2263,7 @@ example_common_websocket_options {
## @doc listeners.<name>.websocket.allow_origin_absence
## ValueType: Boolean
## Default: true
websocket.allow_origin_absence = true
websocket.allow_origin_absence: true
## Comma separated list of allowed origin in header for websocket connection
##
@ -2273,7 +2273,7 @@ example_common_websocket_options {
## local http dashboard url
## check_origins: "http://localhost:18083, http://127.0.0.1:18083"
## Default: ""
websocket.check_origins = "http://localhost:18083, http://127.0.0.1:18083"
websocket.check_origins: "http://localhost:18083, http://127.0.0.1:18083"
## Specify which HTTP header for real source IP if the EMQ X cluster is
## deployed behind NGINX or HAProxy.
@ -2281,7 +2281,7 @@ example_common_websocket_options {
## @doc listeners.<name>.websocket.proxy_address_header
## ValueType: String
## Default: X-Forwarded-For
websocket.proxy_address_header = X-Forwarded-For
websocket.proxy_address_header: X-Forwarded-For
## Specify which HTTP header for real source port if the EMQ X cluster is
## deployed behind NGINX or HAProxy.
@ -2289,7 +2289,7 @@ example_common_websocket_options {
## @doc listeners.<name>.websocket.proxy_port_header
## ValueType: String
## Default: X-Forwarded-Port
websocket.proxy_port_header = X-Forwarded-Port
websocket.proxy_port_header: X-Forwarded-Port
websocket.deflate_opts {
## The level of deflate options for external WebSocket connections.
@ -2345,4 +2345,4 @@ example_common_websocket_options {
## Default: 15
client_max_window_bits: 15
}
}
}

View File

@ -508,72 +508,15 @@ mqtt_listener() ->
, {"proxy_protocol_timeout", t(duration())}
].
translations() -> ["ekka", "vm_args", "gen_rpc", "kernel", "emqx"].
translation("ekka") ->
[ {"cluster_discovery", fun tr_cluster__discovery/1}];
translation("vm_args") ->
[ {"+zdbbl", fun tr_zdbbl/1}
, {"-heart", fun tr_heart/1}];
translation("gen_rpc") ->
[ {"tcp_client_num", fun tr_tcp_client_num/1}
, {"tcp_client_port", fun tr_tcp_client_port/1}];
translations() -> ["kernel"].
translation("kernel") ->
[ {"logger_level", fun tr_logger_level/1}
, {"logger", fun tr_logger/1}];
, {"logger", fun tr_logger/1}].
translation("emqx") ->
[ {"flapping_detect_policy", fun tr_flapping_detect_policy/1}
, {"zones", fun tr_zones/1}
, {"listeners", fun tr_listeners/1}
, {"modules", fun tr_modules/1}
, {"alarm", fun tr_alarm/1}
, {"telemetry", fun tr_telemetry/1}
].
tr_cluster__discovery(Conf) ->
Strategy = conf_get("cluster.discovery", Conf),
{Strategy, filter(options(Strategy, Conf))}.
tr_heart(Conf) ->
case conf_get("node.heartbeat", Conf) of
true -> "";
"on" -> "";
_ -> undefined
end.
tr_zdbbl(Conf) ->
case conf_get("node.dist_buffer_size", Conf) of
undefined -> undefined;
X when is_integer(X) -> ceiling(X / 1024); %% Bytes to Kilobytes;
_ -> undefined
end.
%% Force client to use server listening port, because we do no provide
%% per-node listening port manual mapping from configs.
%% i.e. all nodes in the cluster should agree to the same
%% listening port number.
tr_tcp_client_num(Conf) ->
case conf_get("rpc.tcp_client_num", Conf) of
0 -> max(1, erlang:system_info(schedulers) div 2);
V -> V
end.
tr_tcp_client_port(Conf) ->
conf_get("rpc.tcp_server_port", Conf).
tr_logger_level(Conf) -> conf_get("log.level", Conf).
tr_logger_level(Conf) -> conf_get("log.primary_level", Conf).
tr_logger(Conf) ->
LogTo = conf_get("log.to", Conf),
LogLevel = conf_get("log.level", Conf),
LogType = case conf_get("log.rotation.enable", Conf) of
true -> wrap;
_ -> halt
end,
CharsLimit = case conf_get("log.chars_limit", Conf) of
-1 -> unlimited;
V -> V
@ -581,309 +524,56 @@ tr_logger(Conf) ->
SingleLine = conf_get("log.single_line", Conf),
FmtName = conf_get("log.formatter", Conf),
Formatter = formatter(FmtName, CharsLimit, SingleLine),
BurstLimit = conf_get("log.burst_limit", Conf),
{BustLimitOn, {MaxBurstCount, TimeWindow}} = burst_limit(BurstLimit),
FileConf = fun (Filename) ->
BasicConf =
#{type => LogType,
file => filename:join(conf_get("log.dir", Conf), Filename),
max_no_files => conf_get("log.rotation.count", Conf),
sync_mode_qlen => conf_get("log.sync_mode_qlen", Conf),
drop_mode_qlen => conf_get("log.drop_mode_qlen", Conf),
flush_qlen => conf_get("log.flush_qlen", Conf),
overload_kill_enable => conf_get("log.overload_kill", Conf),
overload_kill_qlen => conf_get("log.overload_kill_qlen", Conf),
overload_kill_mem_size => conf_get("log.overload_kill_mem_size", Conf),
overload_kill_restart_after => conf_get("log.overload_kill_restart_after", Conf),
burst_limit_enable => BustLimitOn,
burst_limit_max_count => MaxBurstCount,
burst_limit_window_time => TimeWindow
},
MaxNoBytes = case LogType of
wrap -> conf_get("log.rotation.size", Conf);
halt -> conf_get("log.size", Conf)
end,
BasicConf#{max_no_bytes => MaxNoBytes} end,
BasicConf = #{
sync_mode_qlen => conf_get("log.sync_mode_qlen", Conf),
drop_mode_qlen => conf_get("log.drop_mode_qlen", Conf),
flush_qlen => conf_get("log.flush_qlen", Conf),
overload_kill_enable => conf_get("log.overload_kill.enable", Conf),
overload_kill_qlen => conf_get("log.overload_kill.qlen", Conf),
overload_kill_mem_size => conf_get("log.overload_kill.mem_size", Conf),
overload_kill_restart_after => conf_get("log.overload_kill.restart_after", Conf),
burst_limit_enable => conf_get("log.burst_limit.enable", Conf),
burst_limit_max_count => conf_get("log.burst_limit.max_count", Conf),
burst_limit_window_time => conf_get("log.burst_limit.window_time", Conf)
},
Filters = case conf_get("log.supervisor_reports", Conf) of
error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}];
progress -> []
end,
%% For the default logger that outputs to console
DefaultHandler =
if LogTo =:= console orelse LogTo =:= both ->
[{handler, console, logger_std_h,
#{level => LogLevel,
config => #{type => standard_io},
formatter => Formatter,
filters => Filters
}
}];
ConsoleHandler =
case conf_get("log.console_handler.enable", Conf) of
true ->
[{handler, default, undefined}]
end,
%% For the file logger
FileHandler =
if LogTo =:= file orelse LogTo =:= both ->
[{handler, file, logger_disk_log_h,
#{level => LogLevel,
config => FileConf(conf_get("log.file", Conf)),
[{handler, console, logger_std_h, #{
level => conf_get("log.console_handler.level", Conf),
config => BasicConf#{type => standard_io},
formatter => Formatter,
filesync_repeat_interval => no_repeat,
filters => Filters
}}];
true -> []
false -> []
end,
AdditionalLogFiles = additional_log_files(Conf),
AdditionalHandlers =
[{handler, list_to_atom("file_for_"++Level), logger_disk_log_h,
#{level => list_to_atom(Level),
config => FileConf(Filename),
%% For the file logger
FileHandlers =
[{handler, binary_to_atom(HandlerName, latin1), logger_disk_log_h, #{
level => conf_get("level", SubConf),
config => BasicConf#{
type => case conf_get("rotation.enable", SubConf) of
true -> wrap;
_ -> halt
end,
file => conf_get("file", SubConf),
max_no_files => conf_get("rotation.count", SubConf),
max_no_bytes => conf_get("max_size", SubConf)
},
formatter => Formatter,
filesync_repeat_interval => no_repeat}}
|| {Level, Filename} <- AdditionalLogFiles],
filters => Filters,
filesync_repeat_interval => no_repeat
}}
|| {HandlerName, SubConf} <- maps:to_list(conf_get("log.file_handlers", Conf))],
DefaultHandler ++ FileHandler ++ AdditionalHandlers.
tr_flapping_detect_policy(Conf) ->
[Threshold, Duration, Interval] = conf_get("acl.flapping_detect_policy", Conf),
ParseDuration = fun(S, F) ->
case F(S) of
{ok, I} -> I;
{error, Reason} -> error({duration, Reason})
end end,
#{threshold => list_to_integer(Threshold),
duration => ParseDuration(Duration, fun to_duration/1),
banned_interval => ParseDuration(Interval, fun to_duration_s/1)
}.
tr_zones(Conf) ->
Names = lists:usort(keys("zone", Conf)),
lists:foldl(
fun(Name, Zones) ->
Zone = keys("zone." ++ Name, Conf),
Mapped = lists:flatten([map_zones(K, conf_get(["zone", Name, K], Conf)) || K <- Zone]),
[{list_to_atom(Name), lists:filter(fun ({K, []}) when K =:= ratelimit; K =:= quota -> false;
({_, undefined}) -> false;
(_) -> true end, Mapped)} | Zones]
end, [], Names).
tr_listeners(Conf) ->
Atom = fun(undefined) -> undefined;
(B) when is_binary(B)-> binary_to_atom(B);
(S) when is_list(S) -> list_to_atom(S) end,
Access = fun(S) ->
[A, CIDR] = string:tokens(S, " "),
{list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end}
end,
AccOpts = fun(Prefix) ->
case keys(Prefix ++ ".access", Conf) of
[] -> [];
Ids ->
[{access_rules, [Access(conf_get(Prefix ++ ".access." ++ Id, Conf)) || Id <- Ids]}]
end end,
RateLimit = fun(undefined) ->
undefined;
([L, D]) ->
Limit = case to_bytesize(L) of
{ok, I0} -> I0;
{error, R0} -> error({bytesize, R0})
end,
Duration = case to_duration_s(D) of
{ok, I1} -> I1;
{error, R1} -> error({duration, R1})
end,
{Limit, Duration}
end,
CheckOrigin = fun(S) -> [ list_to_binary(string:trim(O)) || O <- S] end,
WsOpts = fun(Prefix) ->
case conf_get(Prefix ++ ".check_origins", Conf) of
undefined -> undefined;
Rules -> lists:flatten(CheckOrigin(Rules))
end
end,
LisOpts = fun(Prefix) ->
filter([{acceptors, conf_get(Prefix ++ ".acceptors", Conf)},
{mqtt_path, conf_get(Prefix ++ ".mqtt_path", Conf)},
{max_connections, conf_get(Prefix ++ ".max_connections", Conf)},
{max_conn_rate, conf_get(Prefix ++ ".max_conn_rate", Conf)},
{active_n, conf_get(Prefix ++ ".active_n", Conf)},
{tune_buffer, conf_get(Prefix ++ ".tune_buffer", Conf)},
{zone, Atom(conf_get(Prefix ++ ".zone", Conf))},
{rate_limit, RateLimit(conf_get(Prefix ++ ".rate_limit", Conf))},
{proxy_protocol, conf_get(Prefix ++ ".proxy_protocol", Conf)},
{proxy_address_header, list_to_binary(string:lowercase(conf_get(Prefix ++ ".proxy_address_header", Conf, <<"">>)))},
{proxy_port_header, list_to_binary(string:lowercase(conf_get(Prefix ++ ".proxy_port_header", Conf, <<"">>)))},
{proxy_protocol_timeout, conf_get(Prefix ++ ".proxy_protocol_timeout", Conf)},
{fail_if_no_subprotocol, conf_get(Prefix ++ ".fail_if_no_subprotocol", Conf)},
{supported_subprotocols, string:tokens(conf_get(Prefix ++ ".supported_subprotocols", Conf, ""), ", ")},
{peer_cert_as_username, conf_get(Prefix ++ ".peer_cert_as_username", Conf)},
{peer_cert_as_clientid, conf_get(Prefix ++ ".peer_cert_as_clientid", Conf)},
{compress, conf_get(Prefix ++ ".compress", Conf)},
{idle_timeout, conf_get(Prefix ++ ".idle_timeout", Conf)},
{max_frame_size, conf_get(Prefix ++ ".max_frame_size", Conf)},
{mqtt_piggyback, conf_get(Prefix ++ ".mqtt_piggyback", Conf)},
{check_origin_enable, conf_get(Prefix ++ ".check_origin_enable", Conf)},
{allow_origin_absence, conf_get(Prefix ++ ".allow_origin_absence", Conf)},
{check_origins, WsOpts(Prefix)} | AccOpts(Prefix)])
end,
DeflateOpts = fun(Prefix) ->
filter([{level, conf_get(Prefix ++ ".deflate_opts.level", Conf)},
{mem_level, conf_get(Prefix ++ ".deflate_opts.mem_level", Conf)},
{strategy, conf_get(Prefix ++ ".deflate_opts.strategy", Conf)},
{server_context_takeover, conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf)},
{client_context_takeover, conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf)},
{server_max_windows_bits, conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf)},
{client_max_windows_bits, conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf)}])
end,
TcpOpts = fun(Prefix) ->
filter([{backlog, conf_get(Prefix ++ ".backlog", Conf)},
{send_timeout, conf_get(Prefix ++ ".send_timeout", Conf)},
{send_timeout_close, conf_get(Prefix ++ ".send_timeout_close", Conf)},
{recbuf, conf_get(Prefix ++ ".recbuf", Conf)},
{sndbuf, conf_get(Prefix ++ ".sndbuf", Conf)},
{buffer, conf_get(Prefix ++ ".buffer", Conf)},
{high_watermark, conf_get(Prefix ++ ".high_watermark", Conf)},
{nodelay, conf_get(Prefix ++ ".nodelay", Conf, true)},
{reuseaddr, conf_get(Prefix ++ ".reuseaddr", Conf)}])
end,
SslOpts = fun(Prefix) ->
Opts = tr_ssl(Prefix, Conf),
case lists:keyfind(ciphers, 1, Opts) of
false ->
error(Prefix ++ ".ciphers or " ++ Prefix ++ ".psk_ciphers is absent");
_ ->
Opts
end end,
TcpListeners = fun(Type, Name) ->
Prefix = string:join(["listener", Type, Name], "."),
ListenOnN = case conf_get(Prefix ++ ".endpoint", Conf) of
undefined -> [];
ListenOn -> ListenOn
end,
[#{ proto => Atom(Type)
, name => Name
, listen_on => ListenOnN
, opts => [ {deflate_options, DeflateOpts(Prefix)}
, {tcp_options, TcpOpts(Prefix)}
| LisOpts(Prefix)
]
}
]
end,
SslListeners = fun(Type, Name) ->
Prefix = string:join(["listener", Type, Name], "."),
case conf_get(Prefix ++ ".endpoint", Conf) of
undefined ->
[];
ListenOn ->
[#{ proto => Atom(Type)
, name => Name
, listen_on => ListenOn
, opts => [ {deflate_options, DeflateOpts(Prefix)}
, {tcp_options, TcpOpts(Prefix)}
, {ssl_options, SslOpts(Prefix)}
| LisOpts(Prefix)
]
}
]
end end,
lists:flatten([TcpListeners("tcp", Name) || Name <- keys("listener.tcp", Conf)]
++ [TcpListeners("ws", Name) || Name <- keys("listener.ws", Conf)]
++ [SslListeners("ssl", Name) || Name <- keys("listener.ssl", Conf)]
++ [SslListeners("wss", Name) || Name <- keys("listener.wss", Conf)]).
tr_modules(Conf) ->
Subscriptions = fun() ->
List = keys("module.subscription", Conf),
TopicList = [{N, conf_get(["module", "subscription", N, "topic"], Conf)}|| N <- List],
[{list_to_binary(T), #{ qos => conf_get("module.subscription." ++ N ++ ".qos", Conf, 0),
nl => conf_get("module.subscription." ++ N ++ ".nl", Conf, 0),
rap => conf_get("module.subscription." ++ N ++ ".rap", Conf, 0),
rh => conf_get("module.subscription." ++ N ++ ".rh", Conf, 0)
}} || {N, T} <- TopicList]
end,
Rewrites = fun() ->
Rules = keys("module.rewrite.rule", Conf),
PubRules = keys("module.rewrite.pub_rule", Conf),
SubRules = keys("module.rewrite.sub_rule", Conf),
TotalRules =
[ {["module", "rewrite", "pub", "rule", R], conf_get(["module.rewrite.rule", R], Conf)} || R <- Rules] ++
[ {["module", "rewrite", "pub", "rule", R], conf_get(["module.rewrite.pub_rule", R], Conf)} || R <- PubRules] ++
[ {["module", "rewrite", "sub", "rule", R], conf_get(["module.rewrite.rule", R], Conf)} || R <- Rules] ++
[ {["module", "rewrite", "sub", "rule", R], conf_get(["module.rewrite.sub_rule", R], Conf)} || R <- SubRules],
lists:map(fun({[_, "rewrite", PubOrSub, "rule", _], Rule}) ->
[Topic, Re, Dest] = string:tokens(Rule, " "),
{rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)}
end, TotalRules)
end,
lists:append([
[{emqx_mod_presence, [{qos, conf_get("module.presence.qos", Conf, 1)}]}],
[{emqx_mod_subscription, Subscriptions()}],
[{emqx_mod_rewrite, Rewrites()}],
[{emqx_mod_topic_metrics, []}],
[{emqx_mod_delayed, []}],
[{emqx_mod_acl_internal, [{acl_file, conf_get("acl.acl_file", Conf)}]}]
]).
tr_alarm(Conf) ->
[ {actions, [list_to_atom(Action) || Action <- conf_get("alarm.actions", Conf)]}
, {size_limit, conf_get("alarm.size_limit", Conf)}
, {validity_period, conf_get("alarm.validity_period", Conf)}
].
tr_telemetry(Conf) ->
[ {enabled, conf_get("telemetry.enabled", Conf)}
, {url, conf_get("telemetry.url", Conf)}
, {report_interval, conf_get("telemetry.report_interval", Conf)}
].
[{handler, default, undefined}] ++ ConsoleHandler ++ FileHandlers.
%% helpers
options(static, Conf) ->
[{seeds, [list_to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, "")]}];
options(mcast, Conf) ->
{ok, Addr} = inet:parse_address(conf_get("cluster.mcast.addr", Conf)),
{ok, Iface} = inet:parse_address(conf_get("cluster.mcast.iface", Conf)),
Ports = [list_to_integer(S) || S <- conf_get("cluster.mcast.ports", Conf)],
[{addr, Addr}, {ports, Ports}, {iface, Iface},
{ttl, conf_get("cluster.mcast.ttl", Conf, 1)},
{loop, conf_get("cluster.mcast.loop", Conf, true)}];
options(dns, Conf) ->
[{name, conf_get("cluster.dns.name", Conf)},
{app, conf_get("cluster.dns.app", Conf)}];
options(etcd, Conf) ->
Namespace = "cluster.etcd.ssl",
SslOpts = fun(C) ->
Options = keys(Namespace, C),
lists:map(fun(Key) -> {list_to_atom(Key), conf_get([Namespace, Key], Conf)} end, Options) end,
[{server, conf_get("cluster.etcd.server", Conf)},
{prefix, conf_get("cluster.etcd.prefix", Conf, "emqxcl")},
{node_ttl, conf_get("cluster.etcd.node_ttl", Conf, 60)},
{ssl_options, filter(SslOpts(Conf))}];
options(k8s, Conf) ->
[{apiserver, conf_get("cluster.k8s.apiserver", Conf)},
{service_name, conf_get("cluster.k8s.service_name", Conf)},
{address_type, conf_get("cluster.k8s.address_type", Conf, ip)},
{app_name, conf_get("cluster.k8s.app_name", Conf)},
{namespace, conf_get("cluster.k8s.namespace", Conf)},
{suffix, conf_get("cluster.k8s.suffix", Conf, "")}];
options(manual, _Conf) ->
[].
formatter(json, CharsLimit, SingleLine) ->
{emqx_logger_jsonfmt,
#{chars_limit => CharsLimit,
@ -905,117 +595,7 @@ formatter(text, CharsLimit, SingleLine) ->
single_line => SingleLine
}}.
burst_limit(["disabled"]) ->
{false, {20000, 1000}};
burst_limit([Count, Window]) ->
{true, {list_to_integer(Count),
case to_duration(Window) of
{ok, I} -> I;
{error, R} -> error({duration, R})
end}}.
%% For creating additional log files for specific log levels.
additional_log_files(Conf) ->
LogLevel = ["debug", "info", "notice", "warning",
"error", "critical", "alert", "emergency"],
additional_log_files(Conf, LogLevel, []).
additional_log_files(_Conf, [], Acc) ->
Acc;
additional_log_files(Conf, [L | More], Acc) ->
case conf_get(["log", L, "file"], Conf) of
undefined -> additional_log_files(Conf, More, Acc);
F -> additional_log_files(Conf, More, [{L, F} | Acc])
end.
rate_limit_byte_dur([L, D]) ->
Limit = case to_bytesize(L) of
{ok, I0} -> I0;
{error, R0} -> error({bytesize, R0})
end,
Duration = case to_duration_s(D) of
{ok, I1} -> I1;
{error, R1} -> error({duration, R1})
end,
{Limit, Duration}.
rate_limit_num_dur([L, D]) ->
Limit = case string:to_integer(L) of
{Int, []} when is_integer(Int) -> Int;
_ -> error("failed to parse bytesize string")
end,
Duration = case to_duration_s(D) of
{ok, I} -> I;
{error, Reason} -> error(Reason)
end,
{Limit, Duration}.
map_zones(_, undefined) ->
{undefined, undefined};
map_zones("mqueue_priorities", Val) ->
case Val of
["none"] -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE
_ ->
MqueuePriorities = lists:foldl(fun(T, Acc) ->
%% NOTE: space in "= " is intended
[Topic, Prio] = string:tokens(T, "= "),
P = list_to_integer(Prio),
(P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}),
maps:put(iolist_to_binary(Topic), P, Acc)
end, #{}, Val),
{mqueue_priorities, MqueuePriorities}
end;
map_zones("response_information", Val) ->
{response_information, iolist_to_binary(Val)};
map_zones("rate_limit", Conf) ->
Messages = case conf_get("conn_messages_in", #{value => Conf}) of
undefined ->
[];
M ->
[{conn_messages_in, rate_limit_num_dur(M)}]
end,
Bytes = case conf_get("conn_bytes_in", #{value => Conf}) of
undefined ->
[];
B ->
[{conn_bytes_in, rate_limit_byte_dur(B)}]
end,
{ratelimit, Messages ++ Bytes};
map_zones("conn_congestion", Conf) ->
Alarm = case conf_get("alarm", #{value => Conf}) of
undefined ->
[];
A ->
[{conn_congestion_alarm_enabled, A}]
end,
MinAlarm = case conf_get("min_alarm_sustain_duration", #{value => Conf}) of
undefined ->
[];
M ->
[{conn_congestion_min_alarm_sustain_duration, M}]
end,
Alarm ++ MinAlarm;
map_zones("quota", Conf) ->
Conn = case conf_get("conn_messages_routing", #{value => Conf}) of
undefined ->
[];
C ->
[{conn_messages_routing, rate_limit_num_dur(C)}]
end,
Overall = case conf_get("overall_messages_routing", #{value => Conf}) of
undefined ->
[];
O ->
[{overall_messages_routing, rate_limit_num_dur(O)}]
end,
{quota, Conn ++ Overall};
map_zones(Opt, Val) ->
{list_to_atom(Opt), Val}.
%% utils
-spec(conf_get(string() | [string()], hocon:config()) -> term()).
conf_get(Key, Conf) ->
V = hocon_schema:deep_get(Key, Conf, value),