diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 4c8113cd5..3ea51ec3f 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -336,7 +336,7 @@ log { ## @doc log.console_handler.enable ## ValueType: Boolean ## Default: false - console_handler.enable = false + console_handler.enable: false ## The log level of this handler ## All the log messages with levels lower than this level will @@ -345,13 +345,13 @@ log { ## @doc log.console_handler.level ## ValueType: debug | info | notice | warning | error | critical | alert | emergency ## Default: warning - console_handler.level = warning + console_handler.level: warning ##---------------------------------------------------------------- ## The file log handlers send log messages to files ##---------------------------------------------------------------- ## file_handlers. - file_handlers.emqx_default: { + file_handlers.emqx_log: { ## The log level filter of this handler ## All the log messages with levels lower than this level will ## be dropped. @@ -383,7 +383,7 @@ log { ## @doc log.file_handlers..rotation.enable ## ValueType: Boolean ## Default: true - rotation.enable = true + rotation.enable: true ## Maximum rotation count of log files. ## @@ -409,7 +409,7 @@ log { ## ## You could also create multiple file handlers for different ## log level for example: - file_handlers.emqx_error: { + file_handlers.emqx_error_log: { level: error file: "{{ platform_log_dir }}/error.log" } @@ -422,7 +422,7 @@ log { ## - "utc" for Universal Coordinated Time (UTC) ## - "+hh:mm" or "-hh:mm" for a specified offset ## Default: system - time_offset = system + time_offset: system ## Limits the total number of characters printed for each log event. ## @@ -437,7 +437,7 @@ log { ## @doc log.max_depth ## ValueType: Integer | infinity ## Default: 80 - max_depth = 80 + max_depth: 80 ## Log formatter ## @doc log.formatter @@ -2120,7 +2120,7 @@ example_common_ssl_options { ## @doc listeners..ssl.depth ## ValueType: Number ## Default: 10 - ssl.depth = 10 + ssl.depth: 10 ## String containing the user's password. Only used if the private keyfile ## is password-protected. @@ -2130,7 +2130,7 @@ example_common_ssl_options { ## @doc listeners..ssl.depth ## ValueType: String ## Default: "" - #ssl.key_password = "" + #ssl.key_password: "" ## The Ephemeral Diffie-Helman key exchange is a very effective way of ## ensuring Forward Secrecy by exchanging a set of keys that never hit @@ -2248,14 +2248,14 @@ example_common_websocket_options { ## @doc listeners..websocket.fail_if_no_subprotocol ## ValueType: Boolean ## Default: true - websocket.fail_if_no_subprotocol = true + websocket.fail_if_no_subprotocol: true ## Enable origin check in header for websocket connection ## ## @doc listeners..websocket.check_origin_enable ## ValueType: Boolean ## Default: false - websocket.check_origin_enable = false + websocket.check_origin_enable: false ## Allow origin to be absent in header in websocket connection ## when check_origin_enable is true @@ -2263,7 +2263,7 @@ example_common_websocket_options { ## @doc listeners..websocket.allow_origin_absence ## ValueType: Boolean ## Default: true - websocket.allow_origin_absence = true + websocket.allow_origin_absence: true ## Comma separated list of allowed origin in header for websocket connection ## @@ -2273,7 +2273,7 @@ example_common_websocket_options { ## local http dashboard url ## check_origins: "http://localhost:18083, http://127.0.0.1:18083" ## Default: "" - websocket.check_origins = "http://localhost:18083, http://127.0.0.1:18083" + websocket.check_origins: "http://localhost:18083, http://127.0.0.1:18083" ## Specify which HTTP header for real source IP if the EMQ X cluster is ## deployed behind NGINX or HAProxy. @@ -2281,7 +2281,7 @@ example_common_websocket_options { ## @doc listeners..websocket.proxy_address_header ## ValueType: String ## Default: X-Forwarded-For - websocket.proxy_address_header = X-Forwarded-For + websocket.proxy_address_header: X-Forwarded-For ## Specify which HTTP header for real source port if the EMQ X cluster is ## deployed behind NGINX or HAProxy. @@ -2289,7 +2289,7 @@ example_common_websocket_options { ## @doc listeners..websocket.proxy_port_header ## ValueType: String ## Default: X-Forwarded-Port - websocket.proxy_port_header = X-Forwarded-Port + websocket.proxy_port_header: X-Forwarded-Port websocket.deflate_opts { ## The level of deflate options for external WebSocket connections. @@ -2345,4 +2345,4 @@ example_common_websocket_options { ## Default: 15 client_max_window_bits: 15 } -} \ No newline at end of file +} diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 292d5569a..3ec691bc0 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -508,72 +508,15 @@ mqtt_listener() -> , {"proxy_protocol_timeout", t(duration())} ]. -translations() -> ["ekka", "vm_args", "gen_rpc", "kernel", "emqx"]. - -translation("ekka") -> - [ {"cluster_discovery", fun tr_cluster__discovery/1}]; - -translation("vm_args") -> - [ {"+zdbbl", fun tr_zdbbl/1} - , {"-heart", fun tr_heart/1}]; - -translation("gen_rpc") -> - [ {"tcp_client_num", fun tr_tcp_client_num/1} - , {"tcp_client_port", fun tr_tcp_client_port/1}]; +translations() -> ["kernel"]. translation("kernel") -> [ {"logger_level", fun tr_logger_level/1} - , {"logger", fun tr_logger/1}]; + , {"logger", fun tr_logger/1}]. -translation("emqx") -> - [ {"flapping_detect_policy", fun tr_flapping_detect_policy/1} - , {"zones", fun tr_zones/1} - , {"listeners", fun tr_listeners/1} - , {"modules", fun tr_modules/1} - , {"alarm", fun tr_alarm/1} - , {"telemetry", fun tr_telemetry/1} - ]. - -tr_cluster__discovery(Conf) -> - Strategy = conf_get("cluster.discovery", Conf), - {Strategy, filter(options(Strategy, Conf))}. - -tr_heart(Conf) -> - case conf_get("node.heartbeat", Conf) of - true -> ""; - "on" -> ""; - _ -> undefined - end. - -tr_zdbbl(Conf) -> - case conf_get("node.dist_buffer_size", Conf) of - undefined -> undefined; - X when is_integer(X) -> ceiling(X / 1024); %% Bytes to Kilobytes; - _ -> undefined - end. - -%% Force client to use server listening port, because we do no provide -%% per-node listening port manual mapping from configs. -%% i.e. all nodes in the cluster should agree to the same -%% listening port number. -tr_tcp_client_num(Conf) -> - case conf_get("rpc.tcp_client_num", Conf) of - 0 -> max(1, erlang:system_info(schedulers) div 2); - V -> V - end. - -tr_tcp_client_port(Conf) -> - conf_get("rpc.tcp_server_port", Conf). - -tr_logger_level(Conf) -> conf_get("log.level", Conf). +tr_logger_level(Conf) -> conf_get("log.primary_level", Conf). tr_logger(Conf) -> - LogTo = conf_get("log.to", Conf), - LogLevel = conf_get("log.level", Conf), - LogType = case conf_get("log.rotation.enable", Conf) of - true -> wrap; - _ -> halt - end, CharsLimit = case conf_get("log.chars_limit", Conf) of -1 -> unlimited; V -> V @@ -581,309 +524,56 @@ tr_logger(Conf) -> SingleLine = conf_get("log.single_line", Conf), FmtName = conf_get("log.formatter", Conf), Formatter = formatter(FmtName, CharsLimit, SingleLine), - BurstLimit = conf_get("log.burst_limit", Conf), - {BustLimitOn, {MaxBurstCount, TimeWindow}} = burst_limit(BurstLimit), - FileConf = fun (Filename) -> - BasicConf = - #{type => LogType, - file => filename:join(conf_get("log.dir", Conf), Filename), - max_no_files => conf_get("log.rotation.count", Conf), - sync_mode_qlen => conf_get("log.sync_mode_qlen", Conf), - drop_mode_qlen => conf_get("log.drop_mode_qlen", Conf), - flush_qlen => conf_get("log.flush_qlen", Conf), - overload_kill_enable => conf_get("log.overload_kill", Conf), - overload_kill_qlen => conf_get("log.overload_kill_qlen", Conf), - overload_kill_mem_size => conf_get("log.overload_kill_mem_size", Conf), - overload_kill_restart_after => conf_get("log.overload_kill_restart_after", Conf), - burst_limit_enable => BustLimitOn, - burst_limit_max_count => MaxBurstCount, - burst_limit_window_time => TimeWindow - }, - MaxNoBytes = case LogType of - wrap -> conf_get("log.rotation.size", Conf); - halt -> conf_get("log.size", Conf) - end, - BasicConf#{max_no_bytes => MaxNoBytes} end, - + BasicConf = #{ + sync_mode_qlen => conf_get("log.sync_mode_qlen", Conf), + drop_mode_qlen => conf_get("log.drop_mode_qlen", Conf), + flush_qlen => conf_get("log.flush_qlen", Conf), + overload_kill_enable => conf_get("log.overload_kill.enable", Conf), + overload_kill_qlen => conf_get("log.overload_kill.qlen", Conf), + overload_kill_mem_size => conf_get("log.overload_kill.mem_size", Conf), + overload_kill_restart_after => conf_get("log.overload_kill.restart_after", Conf), + burst_limit_enable => conf_get("log.burst_limit.enable", Conf), + burst_limit_max_count => conf_get("log.burst_limit.max_count", Conf), + burst_limit_window_time => conf_get("log.burst_limit.window_time", Conf) + }, Filters = case conf_get("log.supervisor_reports", Conf) of error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}]; progress -> [] end, - %% For the default logger that outputs to console - DefaultHandler = - if LogTo =:= console orelse LogTo =:= both -> - [{handler, console, logger_std_h, - #{level => LogLevel, - config => #{type => standard_io}, - formatter => Formatter, - filters => Filters - } - }]; + ConsoleHandler = + case conf_get("log.console_handler.enable", Conf) of true -> - [{handler, default, undefined}] - end, - - %% For the file logger - FileHandler = - if LogTo =:= file orelse LogTo =:= both -> - [{handler, file, logger_disk_log_h, - #{level => LogLevel, - config => FileConf(conf_get("log.file", Conf)), + [{handler, console, logger_std_h, #{ + level => conf_get("log.console_handler.level", Conf), + config => BasicConf#{type => standard_io}, formatter => Formatter, - filesync_repeat_interval => no_repeat, filters => Filters }}]; - true -> [] + false -> [] end, - - AdditionalLogFiles = additional_log_files(Conf), - AdditionalHandlers = - [{handler, list_to_atom("file_for_"++Level), logger_disk_log_h, - #{level => list_to_atom(Level), - config => FileConf(Filename), + %% For the file logger + FileHandlers = + [{handler, binary_to_atom(HandlerName, latin1), logger_disk_log_h, #{ + level => conf_get("level", SubConf), + config => BasicConf#{ + type => case conf_get("rotation.enable", SubConf) of + true -> wrap; + _ -> halt + end, + file => conf_get("file", SubConf), + max_no_files => conf_get("rotation.count", SubConf), + max_no_bytes => conf_get("max_size", SubConf) + }, formatter => Formatter, - filesync_repeat_interval => no_repeat}} - || {Level, Filename} <- AdditionalLogFiles], + filters => Filters, + filesync_repeat_interval => no_repeat + }} + || {HandlerName, SubConf} <- maps:to_list(conf_get("log.file_handlers", Conf))], - DefaultHandler ++ FileHandler ++ AdditionalHandlers. - -tr_flapping_detect_policy(Conf) -> - [Threshold, Duration, Interval] = conf_get("acl.flapping_detect_policy", Conf), - ParseDuration = fun(S, F) -> - case F(S) of - {ok, I} -> I; - {error, Reason} -> error({duration, Reason}) - end end, - #{threshold => list_to_integer(Threshold), - duration => ParseDuration(Duration, fun to_duration/1), - banned_interval => ParseDuration(Interval, fun to_duration_s/1) - }. - -tr_zones(Conf) -> - Names = lists:usort(keys("zone", Conf)), - lists:foldl( - fun(Name, Zones) -> - Zone = keys("zone." ++ Name, Conf), - Mapped = lists:flatten([map_zones(K, conf_get(["zone", Name, K], Conf)) || K <- Zone]), - [{list_to_atom(Name), lists:filter(fun ({K, []}) when K =:= ratelimit; K =:= quota -> false; - ({_, undefined}) -> false; - (_) -> true end, Mapped)} | Zones] - end, [], Names). - -tr_listeners(Conf) -> - Atom = fun(undefined) -> undefined; - (B) when is_binary(B)-> binary_to_atom(B); - (S) when is_list(S) -> list_to_atom(S) end, - - Access = fun(S) -> - [A, CIDR] = string:tokens(S, " "), - {list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end} - end, - - AccOpts = fun(Prefix) -> - case keys(Prefix ++ ".access", Conf) of - [] -> []; - Ids -> - [{access_rules, [Access(conf_get(Prefix ++ ".access." ++ Id, Conf)) || Id <- Ids]}] - end end, - - RateLimit = fun(undefined) -> - undefined; - ([L, D]) -> - Limit = case to_bytesize(L) of - {ok, I0} -> I0; - {error, R0} -> error({bytesize, R0}) - end, - Duration = case to_duration_s(D) of - {ok, I1} -> I1; - {error, R1} -> error({duration, R1}) - end, - {Limit, Duration} - end, - - CheckOrigin = fun(S) -> [ list_to_binary(string:trim(O)) || O <- S] end, - - WsOpts = fun(Prefix) -> - case conf_get(Prefix ++ ".check_origins", Conf) of - undefined -> undefined; - Rules -> lists:flatten(CheckOrigin(Rules)) - end - end, - - LisOpts = fun(Prefix) -> - filter([{acceptors, conf_get(Prefix ++ ".acceptors", Conf)}, - {mqtt_path, conf_get(Prefix ++ ".mqtt_path", Conf)}, - {max_connections, conf_get(Prefix ++ ".max_connections", Conf)}, - {max_conn_rate, conf_get(Prefix ++ ".max_conn_rate", Conf)}, - {active_n, conf_get(Prefix ++ ".active_n", Conf)}, - {tune_buffer, conf_get(Prefix ++ ".tune_buffer", Conf)}, - {zone, Atom(conf_get(Prefix ++ ".zone", Conf))}, - {rate_limit, RateLimit(conf_get(Prefix ++ ".rate_limit", Conf))}, - {proxy_protocol, conf_get(Prefix ++ ".proxy_protocol", Conf)}, - {proxy_address_header, list_to_binary(string:lowercase(conf_get(Prefix ++ ".proxy_address_header", Conf, <<"">>)))}, - {proxy_port_header, list_to_binary(string:lowercase(conf_get(Prefix ++ ".proxy_port_header", Conf, <<"">>)))}, - {proxy_protocol_timeout, conf_get(Prefix ++ ".proxy_protocol_timeout", Conf)}, - {fail_if_no_subprotocol, conf_get(Prefix ++ ".fail_if_no_subprotocol", Conf)}, - {supported_subprotocols, string:tokens(conf_get(Prefix ++ ".supported_subprotocols", Conf, ""), ", ")}, - {peer_cert_as_username, conf_get(Prefix ++ ".peer_cert_as_username", Conf)}, - {peer_cert_as_clientid, conf_get(Prefix ++ ".peer_cert_as_clientid", Conf)}, - {compress, conf_get(Prefix ++ ".compress", Conf)}, - {idle_timeout, conf_get(Prefix ++ ".idle_timeout", Conf)}, - {max_frame_size, conf_get(Prefix ++ ".max_frame_size", Conf)}, - {mqtt_piggyback, conf_get(Prefix ++ ".mqtt_piggyback", Conf)}, - {check_origin_enable, conf_get(Prefix ++ ".check_origin_enable", Conf)}, - {allow_origin_absence, conf_get(Prefix ++ ".allow_origin_absence", Conf)}, - {check_origins, WsOpts(Prefix)} | AccOpts(Prefix)]) - end, - DeflateOpts = fun(Prefix) -> - filter([{level, conf_get(Prefix ++ ".deflate_opts.level", Conf)}, - {mem_level, conf_get(Prefix ++ ".deflate_opts.mem_level", Conf)}, - {strategy, conf_get(Prefix ++ ".deflate_opts.strategy", Conf)}, - {server_context_takeover, conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf)}, - {client_context_takeover, conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf)}, - {server_max_windows_bits, conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf)}, - {client_max_windows_bits, conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf)}]) - end, - TcpOpts = fun(Prefix) -> - filter([{backlog, conf_get(Prefix ++ ".backlog", Conf)}, - {send_timeout, conf_get(Prefix ++ ".send_timeout", Conf)}, - {send_timeout_close, conf_get(Prefix ++ ".send_timeout_close", Conf)}, - {recbuf, conf_get(Prefix ++ ".recbuf", Conf)}, - {sndbuf, conf_get(Prefix ++ ".sndbuf", Conf)}, - {buffer, conf_get(Prefix ++ ".buffer", Conf)}, - {high_watermark, conf_get(Prefix ++ ".high_watermark", Conf)}, - {nodelay, conf_get(Prefix ++ ".nodelay", Conf, true)}, - {reuseaddr, conf_get(Prefix ++ ".reuseaddr", Conf)}]) - end, - - SslOpts = fun(Prefix) -> - Opts = tr_ssl(Prefix, Conf), - case lists:keyfind(ciphers, 1, Opts) of - false -> - error(Prefix ++ ".ciphers or " ++ Prefix ++ ".psk_ciphers is absent"); - _ -> - Opts - end end, - - TcpListeners = fun(Type, Name) -> - Prefix = string:join(["listener", Type, Name], "."), - ListenOnN = case conf_get(Prefix ++ ".endpoint", Conf) of - undefined -> []; - ListenOn -> ListenOn - end, - [#{ proto => Atom(Type) - , name => Name - , listen_on => ListenOnN - , opts => [ {deflate_options, DeflateOpts(Prefix)} - , {tcp_options, TcpOpts(Prefix)} - | LisOpts(Prefix) - ] - } - ] - end, - SslListeners = fun(Type, Name) -> - Prefix = string:join(["listener", Type, Name], "."), - case conf_get(Prefix ++ ".endpoint", Conf) of - undefined -> - []; - ListenOn -> - [#{ proto => Atom(Type) - , name => Name - , listen_on => ListenOn - , opts => [ {deflate_options, DeflateOpts(Prefix)} - , {tcp_options, TcpOpts(Prefix)} - , {ssl_options, SslOpts(Prefix)} - | LisOpts(Prefix) - ] - } - ] - end end, - - - lists:flatten([TcpListeners("tcp", Name) || Name <- keys("listener.tcp", Conf)] - ++ [TcpListeners("ws", Name) || Name <- keys("listener.ws", Conf)] - ++ [SslListeners("ssl", Name) || Name <- keys("listener.ssl", Conf)] - ++ [SslListeners("wss", Name) || Name <- keys("listener.wss", Conf)]). - -tr_modules(Conf) -> - Subscriptions = fun() -> - List = keys("module.subscription", Conf), - TopicList = [{N, conf_get(["module", "subscription", N, "topic"], Conf)}|| N <- List], - [{list_to_binary(T), #{ qos => conf_get("module.subscription." ++ N ++ ".qos", Conf, 0), - nl => conf_get("module.subscription." ++ N ++ ".nl", Conf, 0), - rap => conf_get("module.subscription." ++ N ++ ".rap", Conf, 0), - rh => conf_get("module.subscription." ++ N ++ ".rh", Conf, 0) - }} || {N, T} <- TopicList] - end, - Rewrites = fun() -> - Rules = keys("module.rewrite.rule", Conf), - PubRules = keys("module.rewrite.pub_rule", Conf), - SubRules = keys("module.rewrite.sub_rule", Conf), - TotalRules = - [ {["module", "rewrite", "pub", "rule", R], conf_get(["module.rewrite.rule", R], Conf)} || R <- Rules] ++ - [ {["module", "rewrite", "pub", "rule", R], conf_get(["module.rewrite.pub_rule", R], Conf)} || R <- PubRules] ++ - [ {["module", "rewrite", "sub", "rule", R], conf_get(["module.rewrite.rule", R], Conf)} || R <- Rules] ++ - [ {["module", "rewrite", "sub", "rule", R], conf_get(["module.rewrite.sub_rule", R], Conf)} || R <- SubRules], - lists:map(fun({[_, "rewrite", PubOrSub, "rule", _], Rule}) -> - [Topic, Re, Dest] = string:tokens(Rule, " "), - {rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)} - end, TotalRules) - end, - lists:append([ - [{emqx_mod_presence, [{qos, conf_get("module.presence.qos", Conf, 1)}]}], - [{emqx_mod_subscription, Subscriptions()}], - [{emqx_mod_rewrite, Rewrites()}], - [{emqx_mod_topic_metrics, []}], - [{emqx_mod_delayed, []}], - [{emqx_mod_acl_internal, [{acl_file, conf_get("acl.acl_file", Conf)}]}] - ]). - -tr_alarm(Conf) -> - [ {actions, [list_to_atom(Action) || Action <- conf_get("alarm.actions", Conf)]} - , {size_limit, conf_get("alarm.size_limit", Conf)} - , {validity_period, conf_get("alarm.validity_period", Conf)} - ]. - -tr_telemetry(Conf) -> - [ {enabled, conf_get("telemetry.enabled", Conf)} - , {url, conf_get("telemetry.url", Conf)} - , {report_interval, conf_get("telemetry.report_interval", Conf)} - ]. + [{handler, default, undefined}] ++ ConsoleHandler ++ FileHandlers. %% helpers - -options(static, Conf) -> - [{seeds, [list_to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, "")]}]; -options(mcast, Conf) -> - {ok, Addr} = inet:parse_address(conf_get("cluster.mcast.addr", Conf)), - {ok, Iface} = inet:parse_address(conf_get("cluster.mcast.iface", Conf)), - Ports = [list_to_integer(S) || S <- conf_get("cluster.mcast.ports", Conf)], - [{addr, Addr}, {ports, Ports}, {iface, Iface}, - {ttl, conf_get("cluster.mcast.ttl", Conf, 1)}, - {loop, conf_get("cluster.mcast.loop", Conf, true)}]; -options(dns, Conf) -> - [{name, conf_get("cluster.dns.name", Conf)}, - {app, conf_get("cluster.dns.app", Conf)}]; -options(etcd, Conf) -> - Namespace = "cluster.etcd.ssl", - SslOpts = fun(C) -> - Options = keys(Namespace, C), - lists:map(fun(Key) -> {list_to_atom(Key), conf_get([Namespace, Key], Conf)} end, Options) end, - [{server, conf_get("cluster.etcd.server", Conf)}, - {prefix, conf_get("cluster.etcd.prefix", Conf, "emqxcl")}, - {node_ttl, conf_get("cluster.etcd.node_ttl", Conf, 60)}, - {ssl_options, filter(SslOpts(Conf))}]; -options(k8s, Conf) -> - [{apiserver, conf_get("cluster.k8s.apiserver", Conf)}, - {service_name, conf_get("cluster.k8s.service_name", Conf)}, - {address_type, conf_get("cluster.k8s.address_type", Conf, ip)}, - {app_name, conf_get("cluster.k8s.app_name", Conf)}, - {namespace, conf_get("cluster.k8s.namespace", Conf)}, - {suffix, conf_get("cluster.k8s.suffix", Conf, "")}]; -options(manual, _Conf) -> - []. - formatter(json, CharsLimit, SingleLine) -> {emqx_logger_jsonfmt, #{chars_limit => CharsLimit, @@ -905,117 +595,7 @@ formatter(text, CharsLimit, SingleLine) -> single_line => SingleLine }}. -burst_limit(["disabled"]) -> - {false, {20000, 1000}}; -burst_limit([Count, Window]) -> - {true, {list_to_integer(Count), - case to_duration(Window) of - {ok, I} -> I; - {error, R} -> error({duration, R}) - end}}. - -%% For creating additional log files for specific log levels. -additional_log_files(Conf) -> - LogLevel = ["debug", "info", "notice", "warning", - "error", "critical", "alert", "emergency"], - additional_log_files(Conf, LogLevel, []). - -additional_log_files(_Conf, [], Acc) -> - Acc; -additional_log_files(Conf, [L | More], Acc) -> - case conf_get(["log", L, "file"], Conf) of - undefined -> additional_log_files(Conf, More, Acc); - F -> additional_log_files(Conf, More, [{L, F} | Acc]) - end. - -rate_limit_byte_dur([L, D]) -> - Limit = case to_bytesize(L) of - {ok, I0} -> I0; - {error, R0} -> error({bytesize, R0}) - end, - Duration = case to_duration_s(D) of - {ok, I1} -> I1; - {error, R1} -> error({duration, R1}) - end, - {Limit, Duration}. - -rate_limit_num_dur([L, D]) -> - Limit = case string:to_integer(L) of - {Int, []} when is_integer(Int) -> Int; - _ -> error("failed to parse bytesize string") - end, - Duration = case to_duration_s(D) of - {ok, I} -> I; - {error, Reason} -> error(Reason) - end, - {Limit, Duration}. - -map_zones(_, undefined) -> - {undefined, undefined}; - -map_zones("mqueue_priorities", Val) -> - case Val of - ["none"] -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE - _ -> - MqueuePriorities = lists:foldl(fun(T, Acc) -> - %% NOTE: space in "= " is intended - [Topic, Prio] = string:tokens(T, "= "), - P = list_to_integer(Prio), - (P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}), - maps:put(iolist_to_binary(Topic), P, Acc) - end, #{}, Val), - {mqueue_priorities, MqueuePriorities} - end; -map_zones("response_information", Val) -> - {response_information, iolist_to_binary(Val)}; -map_zones("rate_limit", Conf) -> - Messages = case conf_get("conn_messages_in", #{value => Conf}) of - undefined -> - []; - M -> - [{conn_messages_in, rate_limit_num_dur(M)}] - end, - Bytes = case conf_get("conn_bytes_in", #{value => Conf}) of - undefined -> - []; - B -> - [{conn_bytes_in, rate_limit_byte_dur(B)}] - end, - {ratelimit, Messages ++ Bytes}; -map_zones("conn_congestion", Conf) -> - Alarm = case conf_get("alarm", #{value => Conf}) of - undefined -> - []; - A -> - [{conn_congestion_alarm_enabled, A}] - end, - MinAlarm = case conf_get("min_alarm_sustain_duration", #{value => Conf}) of - undefined -> - []; - M -> - [{conn_congestion_min_alarm_sustain_duration, M}] - end, - Alarm ++ MinAlarm; -map_zones("quota", Conf) -> - Conn = case conf_get("conn_messages_routing", #{value => Conf}) of - undefined -> - []; - C -> - [{conn_messages_routing, rate_limit_num_dur(C)}] - end, - Overall = case conf_get("overall_messages_routing", #{value => Conf}) of - undefined -> - []; - O -> - [{overall_messages_routing, rate_limit_num_dur(O)}] - end, - {quota, Conn ++ Overall}; -map_zones(Opt, Val) -> - {list_to_atom(Opt), Val}. - - %% utils - -spec(conf_get(string() | [string()], hocon:config()) -> term()). conf_get(Key, Conf) -> V = hocon_schema:deep_get(Key, Conf, value),