1541 lines
48 KiB
Erlang
1541 lines
48 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_conf_schema).
|
|
|
|
-dialyzer(no_return).
|
|
-dialyzer(no_match).
|
|
-dialyzer(no_contracts).
|
|
-dialyzer(no_unused).
|
|
-dialyzer(no_fail_call).
|
|
|
|
-include_lib("emqx/include/emqx_access_control.hrl").
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
|
|
-include("emqx_conf.hrl").
|
|
|
|
-type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all.
|
|
-type file() :: string().
|
|
-type cipher() :: map().
|
|
|
|
-behaviour(hocon_schema).
|
|
|
|
-reflect_type([
|
|
log_level/0,
|
|
file/0,
|
|
cipher/0
|
|
]).
|
|
|
|
-export([
|
|
namespace/0, roots/0, fields/1, translations/0, translation/1, validations/0, desc/1, tags/0
|
|
]).
|
|
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
|
|
-export([upgrade_raw_conf/1]).
|
|
|
|
%% internal exports for `emqx_enterprise_schema' only.
|
|
-export([ensure_unicode_path/2, convert_rotation/2, log_handler_common_confs/2]).
|
|
|
|
%% Static apps which merge their configs into the merged emqx.conf
|
|
%% The list can not be made a dynamic read at run-time as it is used
|
|
%% by nodetool to generate app.<time>.config before EMQX is started
|
|
-define(MERGED_CONFIGS, [
|
|
emqx_bridge_schema,
|
|
emqx_connector_schema,
|
|
emqx_bridge_v2_schema,
|
|
emqx_retainer_schema,
|
|
emqx_authn_schema,
|
|
emqx_authz_schema,
|
|
emqx_auto_subscribe_schema,
|
|
{emqx_telemetry_schema, ce},
|
|
emqx_modules_schema,
|
|
emqx_plugins_schema,
|
|
emqx_dashboard_schema,
|
|
emqx_gateway_schema,
|
|
emqx_prometheus_schema,
|
|
emqx_rule_engine_schema,
|
|
emqx_exhook_schema,
|
|
emqx_psk_schema,
|
|
emqx_limiter_schema,
|
|
emqx_slow_subs_schema,
|
|
emqx_otel_schema,
|
|
emqx_mgmt_api_key_schema
|
|
]).
|
|
-define(INJECTING_CONFIGS, [
|
|
{emqx_authn_schema, ?AUTHN_PROVIDER_SCHEMA_MODS},
|
|
{emqx_authz_schema, ?AUTHZ_SOURCE_SCHEMA_MODS}
|
|
]).
|
|
|
|
%% 1 million default ports counter
|
|
-define(DEFAULT_MAX_PORTS, 1024 * 1024).
|
|
|
|
%% Callback to upgrade config after loaded from config file but before validation.
|
|
upgrade_raw_conf(RawConf) ->
|
|
emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(RawConf).
|
|
|
|
%% root config should not have a namespace
|
|
namespace() -> undefined.
|
|
|
|
tags() ->
|
|
[<<"EMQX">>].
|
|
|
|
roots() ->
|
|
ok = emqx_schema_hooks:inject_from_modules(?INJECTING_CONFIGS),
|
|
emqx_schema_high_prio_roots() ++
|
|
[
|
|
{"node",
|
|
sc(
|
|
?R_REF("node"),
|
|
#{
|
|
translate_to => ["emqx"]
|
|
}
|
|
)},
|
|
{"cluster",
|
|
sc(
|
|
?R_REF("cluster"),
|
|
#{translate_to => ["ekka"]}
|
|
)},
|
|
{"log",
|
|
sc(
|
|
?R_REF("log"),
|
|
#{
|
|
translate_to => ["kernel"],
|
|
importance => ?IMPORTANCE_HIGH
|
|
}
|
|
)},
|
|
{"rpc",
|
|
sc(
|
|
?R_REF("rpc"),
|
|
#{
|
|
translate_to => ["gen_rpc"],
|
|
importance => ?IMPORTANCE_LOW
|
|
}
|
|
)}
|
|
] ++
|
|
emqx_schema:roots(medium) ++
|
|
emqx_schema:roots(low) ++
|
|
lists:flatmap(fun roots/1, common_apps()).
|
|
|
|
validations() ->
|
|
hocon_schema:validations(emqx_schema) ++
|
|
lists:flatmap(fun hocon_schema:validations/1, common_apps()).
|
|
|
|
common_apps() ->
|
|
Edition = emqx_release:edition(),
|
|
lists:filtermap(
|
|
fun
|
|
({N, E}) ->
|
|
case E =:= Edition of
|
|
true -> {true, N};
|
|
false -> false
|
|
end;
|
|
(N) when is_atom(N) -> {true, N}
|
|
end,
|
|
?MERGED_CONFIGS
|
|
).
|
|
|
|
fields("cluster") ->
|
|
[
|
|
{"name",
|
|
sc(
|
|
atom(),
|
|
#{
|
|
mapping => "ekka.cluster_name",
|
|
default => emqxcl,
|
|
desc => ?DESC(cluster_name),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"discovery_strategy",
|
|
sc(
|
|
hoconsc:enum([manual, static, dns, etcd, k8s, mcast]),
|
|
#{
|
|
default => manual,
|
|
desc => ?DESC(cluster_discovery_strategy),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"core_nodes",
|
|
sc(
|
|
node_array(),
|
|
#{
|
|
%% This config is nerver needed (since 5.0.0)
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
mapping => "mria.core_nodes",
|
|
default => [],
|
|
'readOnly' => true,
|
|
desc => ?DESC(db_core_nodes)
|
|
}
|
|
)},
|
|
{"autoclean",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
mapping => "mria.cluster_autoclean",
|
|
default => <<"24h">>,
|
|
desc => ?DESC(cluster_autoclean),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"autoheal",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
mapping => "mria.cluster_autoheal",
|
|
default => true,
|
|
desc => ?DESC(cluster_autoheal),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"proto_dist",
|
|
sc(
|
|
hoconsc:enum([inet_tcp, inet6_tcp, inet_tls, inet6_tls]),
|
|
#{
|
|
mapping => "ekka.proto_dist",
|
|
default => inet_tcp,
|
|
'readOnly' => true,
|
|
desc => ?DESC(cluster_proto_dist)
|
|
}
|
|
)},
|
|
{"static",
|
|
sc(
|
|
?R_REF(cluster_static),
|
|
#{}
|
|
)},
|
|
{"mcast",
|
|
sc(
|
|
?R_REF(cluster_mcast),
|
|
#{importance => ?IMPORTANCE_HIDDEN}
|
|
)},
|
|
{"dns",
|
|
sc(
|
|
?R_REF(cluster_dns),
|
|
#{}
|
|
)},
|
|
{"etcd",
|
|
sc(
|
|
?R_REF(cluster_etcd),
|
|
#{}
|
|
)},
|
|
{"k8s",
|
|
sc(
|
|
?R_REF(cluster_k8s),
|
|
#{}
|
|
)},
|
|
{"prevent_overlapping_partitions",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
mapping => "vm_args.-kernel prevent_overlapping_partitions",
|
|
desc => ?DESC(prevent_overlapping_partitions),
|
|
default => false,
|
|
importance => ?IMPORTANCE_HIDDEN
|
|
}
|
|
)}
|
|
];
|
|
fields(cluster_static) ->
|
|
[
|
|
{"seeds",
|
|
sc(
|
|
node_array(),
|
|
#{
|
|
default => [],
|
|
desc => ?DESC(cluster_static_seeds),
|
|
'readOnly' => true
|
|
}
|
|
)}
|
|
];
|
|
fields(cluster_mcast) ->
|
|
[
|
|
{"addr",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => <<"239.192.0.1">>,
|
|
desc => ?DESC(cluster_mcast_addr),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"ports",
|
|
sc(
|
|
hoconsc:array(integer()),
|
|
#{
|
|
default => [4369, 4370],
|
|
'readOnly' => true,
|
|
desc => ?DESC(cluster_mcast_ports)
|
|
}
|
|
)},
|
|
{"iface",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => <<"0.0.0.0">>,
|
|
desc => ?DESC(cluster_mcast_iface),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"ttl",
|
|
sc(
|
|
range(0, 255),
|
|
#{
|
|
default => 255,
|
|
desc => ?DESC(cluster_mcast_ttl),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"loop",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => true,
|
|
desc => ?DESC(cluster_mcast_loop),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"sndbuf",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
default => <<"16KB">>,
|
|
desc => ?DESC(cluster_mcast_sndbuf),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"recbuf",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
default => <<"16KB">>,
|
|
desc => ?DESC(cluster_mcast_recbuf),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"buffer",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
default => <<"32KB">>,
|
|
desc => ?DESC(cluster_mcast_buffer),
|
|
'readOnly' => true
|
|
}
|
|
)}
|
|
];
|
|
fields(cluster_dns) ->
|
|
[
|
|
{"name",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => <<"localhost">>,
|
|
desc => ?DESC(cluster_dns_name),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"record_type",
|
|
sc(
|
|
hoconsc:enum([a, srv]),
|
|
#{
|
|
default => a,
|
|
desc => ?DESC(cluster_dns_record_type),
|
|
'readOnly' => true
|
|
}
|
|
)}
|
|
];
|
|
fields(cluster_etcd) ->
|
|
[
|
|
{"server",
|
|
sc(
|
|
emqx_schema:comma_separated_list(),
|
|
#{
|
|
desc => ?DESC(cluster_etcd_server),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"prefix",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => <<"emqxcl">>,
|
|
desc => ?DESC(cluster_etcd_prefix),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"node_ttl",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
default => <<"1m">>,
|
|
'readOnly' => true,
|
|
desc => ?DESC(cluster_etcd_node_ttl)
|
|
}
|
|
)},
|
|
{"ssl_options",
|
|
sc(
|
|
?R_REF(emqx_schema, "ssl_client_opts"),
|
|
#{
|
|
desc => ?DESC(cluster_etcd_ssl),
|
|
aliases => [ssl],
|
|
'readOnly' => true
|
|
}
|
|
)}
|
|
];
|
|
fields(cluster_k8s) ->
|
|
[
|
|
{"apiserver",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => <<"https://kubernetes.default.svc:443">>,
|
|
desc => ?DESC(cluster_k8s_apiserver),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"service_name",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => <<"emqx">>,
|
|
desc => ?DESC(cluster_k8s_service_name),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"address_type",
|
|
sc(
|
|
hoconsc:enum([ip, dns, hostname]),
|
|
#{
|
|
default => ip,
|
|
desc => ?DESC(cluster_k8s_address_type),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"namespace",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => <<"default">>,
|
|
desc => ?DESC(cluster_k8s_namespace),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"suffix",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => <<"pod.local">>,
|
|
'readOnly' => true,
|
|
desc => ?DESC(cluster_k8s_suffix)
|
|
}
|
|
)}
|
|
];
|
|
fields("node") ->
|
|
[
|
|
{"name",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => <<"emqx@127.0.0.1">>,
|
|
'readOnly' => true,
|
|
importance => ?IMPORTANCE_HIGH,
|
|
desc => ?DESC(node_name)
|
|
}
|
|
)},
|
|
{"cookie",
|
|
sc(
|
|
string(),
|
|
#{
|
|
mapping => "vm_args.-setcookie",
|
|
required => true,
|
|
'readOnly' => true,
|
|
sensitive => true,
|
|
desc => ?DESC(node_cookie),
|
|
importance => ?IMPORTANCE_HIGH,
|
|
converter => fun emqx_schema:password_converter/2
|
|
}
|
|
)},
|
|
{"process_limit",
|
|
sc(
|
|
range(1024, 134217727),
|
|
#{
|
|
%% deprecated make sure it's disappeared in raw_conf user(HTTP API)
|
|
%% but still in vm.args via translation/1
|
|
%% ProcessLimit is always equal to MaxPort * 2 when translation/1.
|
|
deprecated => true,
|
|
desc => ?DESC(process_limit),
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"max_ports",
|
|
sc(
|
|
range(1024, 134217727),
|
|
#{
|
|
mapping => "vm_args.+Q",
|
|
desc => ?DESC(max_ports),
|
|
default => ?DEFAULT_MAX_PORTS,
|
|
importance => ?IMPORTANCE_HIGH,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"dist_buffer_size",
|
|
sc(
|
|
range(1, 2097151),
|
|
#{
|
|
mapping => "vm_args.+zdbbl",
|
|
desc => ?DESC(dist_buffer_size),
|
|
default => 8192,
|
|
importance => ?IMPORTANCE_LOW,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"max_ets_tables",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
mapping => "vm_args.+e",
|
|
desc => ?DESC(max_ets_tables),
|
|
default => 262144,
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"data_dir",
|
|
sc(
|
|
string(),
|
|
#{
|
|
required => true,
|
|
'readOnly' => true,
|
|
mapping => "emqx.data_dir",
|
|
%% for now, it's tricky to use a different data_dir
|
|
%% otherwise data paths in cluster config may differ
|
|
%% TODO: change configurable data file paths to relative
|
|
importance => ?IMPORTANCE_LOW,
|
|
desc => ?DESC(node_data_dir)
|
|
}
|
|
)},
|
|
{"config_files",
|
|
sc(
|
|
hoconsc:array(string()),
|
|
#{
|
|
mapping => "emqx.config_files",
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
required => false,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"global_gc_interval",
|
|
sc(
|
|
hoconsc:union([disabled, emqx_schema:duration()]),
|
|
#{
|
|
mapping => "emqx_machine.global_gc_interval",
|
|
default => <<"15m">>,
|
|
desc => ?DESC(node_global_gc_interval),
|
|
importance => ?IMPORTANCE_LOW,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"crash_dump_file",
|
|
sc(
|
|
file(),
|
|
#{
|
|
mapping => "vm_args.-env ERL_CRASH_DUMP",
|
|
desc => ?DESC(node_crash_dump_file),
|
|
default => crash_dump_file_default(),
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
converter => fun ensure_unicode_path/2,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"crash_dump_seconds",
|
|
sc(
|
|
emqx_schema:timeout_duration_s(),
|
|
#{
|
|
mapping => "vm_args.-env ERL_CRASH_DUMP_SECONDS",
|
|
default => <<"30s">>,
|
|
desc => ?DESC(node_crash_dump_seconds),
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"crash_dump_bytes",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
mapping => "vm_args.-env ERL_CRASH_DUMP_BYTES",
|
|
default => <<"100MB">>,
|
|
desc => ?DESC(node_crash_dump_bytes),
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"dist_net_ticktime",
|
|
sc(
|
|
emqx_schema:timeout_duration_s(),
|
|
#{
|
|
mapping => "vm_args.-kernel net_ticktime",
|
|
default => <<"2m">>,
|
|
'readOnly' => true,
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
desc => ?DESC(node_dist_net_ticktime)
|
|
}
|
|
)},
|
|
{"backtrace_depth",
|
|
sc(
|
|
integer(),
|
|
#{
|
|
mapping => "emqx_machine.backtrace_depth",
|
|
default => 23,
|
|
'readOnly' => true,
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
desc => ?DESC(node_backtrace_depth)
|
|
}
|
|
)},
|
|
{"applications",
|
|
sc(
|
|
emqx_schema:comma_separated_atoms(),
|
|
#{
|
|
mapping => "emqx_machine.applications",
|
|
default => <<"">>,
|
|
'readOnly' => true,
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
desc => ?DESC(node_applications)
|
|
}
|
|
)},
|
|
{"etc_dir",
|
|
sc(
|
|
string(),
|
|
#{
|
|
desc => ?DESC(node_etc_dir),
|
|
'readOnly' => true,
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
deprecated => {since, "5.0.8"}
|
|
}
|
|
)},
|
|
{"cluster_call",
|
|
sc(
|
|
?R_REF("cluster_call"),
|
|
#{
|
|
'readOnly' => true,
|
|
importance => ?IMPORTANCE_HIDDEN
|
|
}
|
|
)},
|
|
{"db_backend",
|
|
sc(
|
|
hoconsc:enum([mnesia, rlog]),
|
|
#{
|
|
mapping => "mria.db_backend",
|
|
default => rlog,
|
|
'readOnly' => true,
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
desc => ?DESC(db_backend)
|
|
}
|
|
)},
|
|
{"role",
|
|
sc(
|
|
hoconsc:enum([core, replicant]),
|
|
#{
|
|
mapping => "mria.node_role",
|
|
default => core,
|
|
'readOnly' => true,
|
|
importance => ?IMPORTANCE_HIGH,
|
|
aliases => [db_role],
|
|
desc => ?DESC(db_role)
|
|
}
|
|
)},
|
|
{"rpc_module",
|
|
sc(
|
|
hoconsc:enum([gen_rpc, rpc]),
|
|
#{
|
|
mapping => "mria.rlog_rpc_module",
|
|
default => rpc,
|
|
'readOnly' => true,
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
desc => ?DESC(db_rpc_module)
|
|
}
|
|
)},
|
|
{"tlog_push_mode",
|
|
sc(
|
|
hoconsc:enum([sync, async]),
|
|
#{
|
|
mapping => "mria.tlog_push_mode",
|
|
default => async,
|
|
'readOnly' => true,
|
|
deprecated => {since, "5.2.0"},
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
desc => ?DESC(db_tlog_push_mode)
|
|
}
|
|
)},
|
|
{"default_shard_transport",
|
|
sc(
|
|
hoconsc:enum([gen_rpc, distr]),
|
|
#{
|
|
mapping => "mria.shard_transport",
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
default => gen_rpc,
|
|
desc => ?DESC(db_default_shard_transport)
|
|
}
|
|
)},
|
|
{"shard_transports",
|
|
sc(
|
|
map(shard, hoconsc:enum([gen_rpc, distr])),
|
|
#{
|
|
desc => ?DESC(db_shard_transports),
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
mapping => "emqx_machine.custom_shard_transports",
|
|
default => #{}
|
|
}
|
|
)},
|
|
{"default_bootstrap_batch_size",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
mapping => "mria.bootstrap_batch_size",
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
default => 500,
|
|
desc => ?DESC(db_default_bootstrap_batch_size)
|
|
}
|
|
)},
|
|
{"broker_pool_size",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
default => emqx_vm:schedulers() * 2,
|
|
'readOnly' => true,
|
|
desc => ?DESC(node_broker_pool_size)
|
|
}
|
|
)},
|
|
{"generic_pool_size",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
default => emqx_vm:schedulers(),
|
|
'readOnly' => true,
|
|
desc => ?DESC(node_generic_pool_size)
|
|
}
|
|
)},
|
|
{"channel_cleanup_batch_size",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
importance => ?IMPORTANCE_HIDDEN,
|
|
default => 100_000,
|
|
desc => ?DESC(node_channel_cleanup_batch_size)
|
|
}
|
|
)}
|
|
];
|
|
fields("cluster_call") ->
|
|
[
|
|
{"retry_interval",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
desc => ?DESC(cluster_call_retry_interval),
|
|
default => <<"1m">>
|
|
}
|
|
)},
|
|
{"max_history",
|
|
sc(
|
|
range(100, 10240),
|
|
#{
|
|
desc => ?DESC(cluster_call_max_history),
|
|
default => 1024
|
|
}
|
|
)},
|
|
{"cleanup_interval",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
desc => ?DESC(cluster_call_cleanup_interval),
|
|
default => <<"24h">>
|
|
}
|
|
)}
|
|
];
|
|
fields("rpc") ->
|
|
[
|
|
{"mode",
|
|
sc(
|
|
hoconsc:enum([sync, async]),
|
|
#{
|
|
default => async,
|
|
desc => ?DESC(rpc_mode)
|
|
}
|
|
)},
|
|
{"protocol",
|
|
sc(
|
|
hoconsc:enum([tcp, ssl]),
|
|
#{
|
|
mapping => "gen_rpc.driver",
|
|
aliases => [driver],
|
|
default => tcp,
|
|
desc => ?DESC(rpc_driver)
|
|
}
|
|
)},
|
|
{"async_batch_size",
|
|
sc(
|
|
integer(),
|
|
#{
|
|
mapping => "gen_rpc.max_batch_size",
|
|
default => 256,
|
|
desc => ?DESC(rpc_async_batch_size)
|
|
}
|
|
)},
|
|
{"port_discovery",
|
|
sc(
|
|
hoconsc:enum([manual, stateless]),
|
|
#{
|
|
mapping => "gen_rpc.port_discovery",
|
|
default => stateless,
|
|
desc => ?DESC(rpc_port_discovery)
|
|
}
|
|
)},
|
|
{"tcp_server_port",
|
|
sc(
|
|
integer(),
|
|
#{
|
|
mapping => "gen_rpc.tcp_server_port",
|
|
default => 5369,
|
|
desc => ?DESC(rpc_tcp_server_port)
|
|
}
|
|
)},
|
|
{"ssl_server_port",
|
|
sc(
|
|
integer(),
|
|
#{
|
|
mapping => "gen_rpc.ssl_server_port",
|
|
default => 5369,
|
|
desc => ?DESC(rpc_ssl_server_port)
|
|
}
|
|
)},
|
|
{"tcp_client_num",
|
|
sc(
|
|
range(1, 256),
|
|
#{
|
|
default => 10,
|
|
desc => ?DESC(rpc_tcp_client_num)
|
|
}
|
|
)},
|
|
{"connect_timeout",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
mapping => "gen_rpc.connect_timeout",
|
|
default => <<"5s">>,
|
|
desc => ?DESC(rpc_connect_timeout)
|
|
}
|
|
)},
|
|
{"certfile",
|
|
sc(
|
|
file(),
|
|
#{
|
|
mapping => "gen_rpc.certfile",
|
|
converter => fun ensure_unicode_path/2,
|
|
desc => ?DESC(rpc_certfile)
|
|
}
|
|
)},
|
|
{"keyfile",
|
|
sc(
|
|
file(),
|
|
#{
|
|
mapping => "gen_rpc.keyfile",
|
|
converter => fun ensure_unicode_path/2,
|
|
desc => ?DESC(rpc_keyfile)
|
|
}
|
|
)},
|
|
{"cacertfile",
|
|
sc(
|
|
file(),
|
|
#{
|
|
mapping => "gen_rpc.cacertfile",
|
|
converter => fun ensure_unicode_path/2,
|
|
desc => ?DESC(rpc_cacertfile)
|
|
}
|
|
)},
|
|
{"send_timeout",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
mapping => "gen_rpc.send_timeout",
|
|
default => <<"5s">>,
|
|
desc => ?DESC(rpc_send_timeout)
|
|
}
|
|
)},
|
|
{"authentication_timeout",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
mapping => "gen_rpc.authentication_timeout",
|
|
default => <<"5s">>,
|
|
desc => ?DESC(rpc_authentication_timeout)
|
|
}
|
|
)},
|
|
{"call_receive_timeout",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
mapping => "gen_rpc.call_receive_timeout",
|
|
default => <<"15s">>,
|
|
desc => ?DESC(rpc_call_receive_timeout)
|
|
}
|
|
)},
|
|
{"socket_keepalive_idle",
|
|
sc(
|
|
emqx_schema:timeout_duration_s(),
|
|
#{
|
|
mapping => "gen_rpc.socket_keepalive_idle",
|
|
default => <<"15m">>,
|
|
desc => ?DESC(rpc_socket_keepalive_idle)
|
|
}
|
|
)},
|
|
{"socket_keepalive_interval",
|
|
sc(
|
|
emqx_schema:timeout_duration_s(),
|
|
#{
|
|
mapping => "gen_rpc.socket_keepalive_interval",
|
|
default => <<"75s">>,
|
|
desc => ?DESC(rpc_socket_keepalive_interval)
|
|
}
|
|
)},
|
|
{"socket_keepalive_count",
|
|
sc(
|
|
integer(),
|
|
#{
|
|
mapping => "gen_rpc.socket_keepalive_count",
|
|
default => 9,
|
|
desc => ?DESC(rpc_socket_keepalive_count)
|
|
}
|
|
)},
|
|
{"socket_sndbuf",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
mapping => "gen_rpc.socket_sndbuf",
|
|
default => <<"1MB">>,
|
|
desc => ?DESC(rpc_socket_sndbuf)
|
|
}
|
|
)},
|
|
{"socket_recbuf",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
mapping => "gen_rpc.socket_recbuf",
|
|
default => <<"1MB">>,
|
|
desc => ?DESC(rpc_socket_recbuf)
|
|
}
|
|
)},
|
|
{"socket_buffer",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
mapping => "gen_rpc.socket_buffer",
|
|
default => <<"1MB">>,
|
|
desc => ?DESC(rpc_socket_buffer)
|
|
}
|
|
)},
|
|
{"insecure_fallback",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
mapping => "gen_rpc.insecure_auth_fallback_allowed",
|
|
default => true,
|
|
desc => ?DESC(rpc_insecure_fallback)
|
|
}
|
|
)},
|
|
{"ciphers", emqx_schema:ciphers_schema(tls_all_available)},
|
|
{"tls_versions", emqx_schema:tls_versions_schema(tls_all_available)},
|
|
{"listen_address",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => <<"0.0.0.0">>,
|
|
desc => ?DESC(rpc_listen_address),
|
|
importance => ?IMPORTANCE_MEDIUM
|
|
}
|
|
)},
|
|
{"ipv6_only",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => false,
|
|
mapping => "gen_rpc.ipv6_only",
|
|
desc => ?DESC(rpc_ipv6_only),
|
|
importance => ?IMPORTANCE_LOW
|
|
}
|
|
)}
|
|
];
|
|
fields("log") ->
|
|
[
|
|
{"console",
|
|
sc(?R_REF("console_handler"), #{
|
|
aliases => [console_handler],
|
|
importance => ?IMPORTANCE_HIGH
|
|
})},
|
|
{"file",
|
|
sc(
|
|
?UNION([
|
|
?R_REF("log_file_handler"),
|
|
?MAP(handler_name, ?R_REF("log_file_handler"))
|
|
]),
|
|
#{
|
|
desc => ?DESC("log_file_handlers"),
|
|
converter => fun ensure_file_handlers/2,
|
|
default => #{<<"level">> => <<"warning">>},
|
|
aliases => [file_handlers],
|
|
importance => ?IMPORTANCE_HIGH
|
|
}
|
|
)}
|
|
];
|
|
fields("console_handler") ->
|
|
log_handler_common_confs(console, #{});
|
|
fields("log_file_handler") ->
|
|
[
|
|
{"path",
|
|
sc(
|
|
file(),
|
|
#{
|
|
desc => ?DESC("log_file_handler_file"),
|
|
default => <<"${EMQX_LOG_DIR}/emqx.log">>,
|
|
aliases => [file, to],
|
|
importance => ?IMPORTANCE_HIGH,
|
|
converter => fun(Path, Opts) ->
|
|
emqx_schema:naive_env_interpolation(ensure_unicode_path(Path, Opts))
|
|
end
|
|
}
|
|
)},
|
|
{"rotation_count",
|
|
sc(
|
|
range(1, 128),
|
|
#{
|
|
aliases => [rotation],
|
|
default => 10,
|
|
converter => fun convert_rotation/2,
|
|
desc => ?DESC("log_rotation_count"),
|
|
importance => ?IMPORTANCE_MEDIUM
|
|
}
|
|
)},
|
|
{"rotation_size",
|
|
sc(
|
|
hoconsc:union([infinity, emqx_schema:bytesize()]),
|
|
#{
|
|
default => <<"50MB">>,
|
|
desc => ?DESC("log_file_handler_max_size"),
|
|
aliases => [max_size],
|
|
importance => ?IMPORTANCE_MEDIUM
|
|
}
|
|
)}
|
|
] ++ log_handler_common_confs(file, #{});
|
|
fields("log_overload_kill") ->
|
|
[
|
|
{"enable",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => true,
|
|
desc => ?DESC("log_overload_kill_enable")
|
|
}
|
|
)},
|
|
{"mem_size",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
default => <<"30MB">>,
|
|
desc => ?DESC("log_overload_kill_mem_size")
|
|
}
|
|
)},
|
|
{"qlen",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
default => 20000,
|
|
desc => ?DESC("log_overload_kill_qlen")
|
|
}
|
|
)},
|
|
{"restart_after",
|
|
sc(
|
|
hoconsc:union([emqx_schema:timeout_duration_ms(), infinity]),
|
|
#{
|
|
default => <<"5s">>,
|
|
desc => ?DESC("log_overload_kill_restart_after")
|
|
}
|
|
)}
|
|
];
|
|
fields("log_burst_limit") ->
|
|
[
|
|
{"enable",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => true,
|
|
desc => ?DESC("log_burst_limit_enable")
|
|
}
|
|
)},
|
|
{"max_count",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
default => 10000,
|
|
desc => ?DESC("log_burst_limit_max_count")
|
|
}
|
|
)},
|
|
{"window_time",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
default => <<"1s">>,
|
|
desc => ?DESC("log_burst_limit_window_time")
|
|
}
|
|
)}
|
|
];
|
|
fields("authorization") ->
|
|
emqx_schema:authz_fields() ++
|
|
emqx_authz_schema:authz_fields().
|
|
|
|
desc("cluster") ->
|
|
?DESC("desc_cluster");
|
|
desc(cluster_static) ->
|
|
?DESC("desc_cluster_static");
|
|
desc(cluster_mcast) ->
|
|
?DESC("desc_cluster_mcast");
|
|
desc(cluster_dns) ->
|
|
?DESC("desc_cluster_dns");
|
|
desc(cluster_etcd) ->
|
|
?DESC("desc_cluster_etcd");
|
|
desc(cluster_k8s) ->
|
|
?DESC("desc_cluster_k8s");
|
|
desc("node") ->
|
|
?DESC("desc_node");
|
|
desc("cluster_call") ->
|
|
?DESC("desc_cluster_call");
|
|
desc("rpc") ->
|
|
?DESC("desc_rpc");
|
|
desc("log") ->
|
|
?DESC("desc_log");
|
|
desc("console_handler") ->
|
|
?DESC("desc_console_handler");
|
|
desc("log_file_handler") ->
|
|
?DESC("desc_log_file_handler");
|
|
desc("log_rotation") ->
|
|
?DESC("desc_log_rotation");
|
|
desc("log_overload_kill") ->
|
|
?DESC("desc_log_overload_kill");
|
|
desc("log_burst_limit") ->
|
|
?DESC("desc_log_burst_limit");
|
|
desc("authorization") ->
|
|
?DESC("desc_authorization");
|
|
desc(_) ->
|
|
undefined.
|
|
|
|
translations() -> ["ekka", "kernel", "emqx", "gen_rpc", "prometheus", "vm_args"].
|
|
|
|
translation("ekka") ->
|
|
[{"cluster_discovery", fun tr_cluster_discovery/1}];
|
|
translation("kernel") ->
|
|
[
|
|
{"logger_level", fun emqx_config_logger:tr_level/1},
|
|
{"logger", fun emqx_config_logger:tr_handlers/1},
|
|
{"error_logger", fun(_) -> silent end}
|
|
];
|
|
translation("emqx") ->
|
|
[
|
|
{"config_files", fun tr_config_files/1},
|
|
{"cluster_override_conf_file", fun tr_cluster_override_conf_file/1},
|
|
{"local_override_conf_file", fun tr_local_override_conf_file/1},
|
|
{"cluster_hocon_file", fun tr_cluster_hocon_file/1}
|
|
];
|
|
translation("gen_rpc") ->
|
|
[
|
|
{"default_client_driver", fun tr_gen_rpc_default_client_driver/1},
|
|
{"tcp_client_port", fun tr_gen_rpc_tcp_client_port/1},
|
|
{"ssl_client_port", fun tr_gen_rpc_ssl_client_port/1},
|
|
{"ssl_client_options", fun tr_gen_rpc_ssl_options/1},
|
|
{"ssl_server_options", fun tr_gen_rpc_ssl_options/1},
|
|
{"socket_ip", fun(Conf) ->
|
|
Addr = conf_get("rpc.listen_address", Conf),
|
|
case inet:parse_address(Addr) of
|
|
{ok, Tuple} ->
|
|
Tuple;
|
|
{error, _Reason} ->
|
|
throw(#{bad_ip_address => Addr})
|
|
end
|
|
end}
|
|
];
|
|
translation("prometheus") ->
|
|
[
|
|
{"collectors", fun tr_prometheus_collectors/1}
|
|
];
|
|
translation("vm_args") ->
|
|
[
|
|
{"+P", fun tr_vm_args_process_limit/1}
|
|
].
|
|
|
|
tr_vm_args_process_limit(Conf) ->
|
|
2 * conf_get("node.max_ports", Conf, ?DEFAULT_MAX_PORTS).
|
|
|
|
tr_prometheus_collectors(Conf) ->
|
|
[
|
|
%% builtin collectors
|
|
prometheus_boolean,
|
|
prometheus_counter,
|
|
prometheus_gauge,
|
|
prometheus_histogram,
|
|
prometheus_quantile_summary,
|
|
prometheus_summary,
|
|
%% emqx collectors
|
|
emqx_prometheus,
|
|
emqx_prometheus_mria
|
|
%% builtin vm collectors
|
|
| tr_vm_dist_collector(Conf) ++
|
|
tr_mnesia_collector(Conf) ++
|
|
tr_vm_statistics_collector(Conf) ++
|
|
tr_vm_system_info_collector(Conf) ++
|
|
tr_vm_memory_collector(Conf) ++
|
|
tr_vm_msacc_collector(Conf)
|
|
].
|
|
|
|
tr_vm_dist_collector(Conf) ->
|
|
Enabled = conf_get("prometheus.vm_dist_collector", Conf, disabled),
|
|
collector_enabled(Enabled, prometheus_vm_dist_collector).
|
|
|
|
tr_mnesia_collector(Conf) ->
|
|
Enabled = conf_get("prometheus.mnesia_collector", Conf, disabled),
|
|
collector_enabled(Enabled, prometheus_mnesia_collector).
|
|
|
|
tr_vm_statistics_collector(Conf) ->
|
|
Enabled = conf_get("prometheus.vm_statistics_collector", Conf, disabled),
|
|
collector_enabled(Enabled, prometheus_vm_statistics_collector).
|
|
|
|
tr_vm_system_info_collector(Conf) ->
|
|
Enabled = conf_get("prometheus.vm_system_info_collector", Conf, disabled),
|
|
collector_enabled(Enabled, prometheus_vm_system_info_collector).
|
|
|
|
tr_vm_memory_collector(Conf) ->
|
|
Enabled = conf_get("prometheus.vm_memory_collector", Conf, disabled),
|
|
collector_enabled(Enabled, prometheus_vm_memory_collector).
|
|
|
|
tr_vm_msacc_collector(Conf) ->
|
|
Enabled = conf_get("prometheus.vm_msacc_collector", Conf, disabled),
|
|
collector_enabled(Enabled, prometheus_vm_msacc_collector).
|
|
|
|
collector_enabled(enabled, Collector) -> [Collector];
|
|
collector_enabled(disabled, _) -> [].
|
|
|
|
tr_gen_rpc_default_client_driver(Conf) ->
|
|
conf_get("rpc.protocol", Conf).
|
|
|
|
tr_gen_rpc_tcp_client_port(Conf) ->
|
|
conf_get("rpc.tcp_server_port", Conf).
|
|
|
|
tr_gen_rpc_ssl_client_port(Conf) ->
|
|
conf_get("rpc.ssl_server_port", Conf).
|
|
|
|
tr_gen_rpc_ssl_options(Conf) ->
|
|
Ciphers = conf_get("rpc.ciphers", Conf),
|
|
Versions = conf_get("rpc.tls_versions", Conf),
|
|
[{ciphers, Ciphers}, {versions, Versions}].
|
|
|
|
tr_config_files(_Conf) ->
|
|
case os:getenv("EMQX_ETC_DIR") of
|
|
false ->
|
|
%% testing, or running emqx app as deps
|
|
[filename:join([code:lib_dir(emqx), "etc", "emqx.conf"])];
|
|
Dir ->
|
|
[filename:join([Dir, "emqx.conf"])]
|
|
end.
|
|
|
|
tr_cluster_override_conf_file(Conf) ->
|
|
tr_conf_file(Conf, "cluster-override.conf").
|
|
|
|
tr_local_override_conf_file(Conf) ->
|
|
tr_conf_file(Conf, "local-override.conf").
|
|
|
|
tr_cluster_hocon_file(Conf) ->
|
|
tr_conf_file(Conf, "cluster.hocon").
|
|
|
|
tr_conf_file(Conf, Filename) ->
|
|
DataDir = conf_get("node.data_dir", Conf),
|
|
%% assert, this config is not nullable
|
|
[_ | _] = DataDir,
|
|
filename:join([DataDir, "configs", Filename]).
|
|
|
|
tr_cluster_discovery(Conf) ->
|
|
Strategy = conf_get("cluster.discovery_strategy", Conf),
|
|
{Strategy, filter(cluster_options(Strategy, Conf))}.
|
|
|
|
log_handler_common_confs(Handler, Default) ->
|
|
%% we rarely support dynamic defaults like this
|
|
%% for this one, we have build-time default the same as runtime default
|
|
%% so it's less tricky
|
|
EnableValues =
|
|
case Handler of
|
|
console -> ["console", "both"];
|
|
file -> ["file", "both", "", false]
|
|
end,
|
|
EnvValue = os:getenv("EMQX_DEFAULT_LOG_HANDLER"),
|
|
Enable = lists:member(EnvValue, EnableValues),
|
|
LevelDesc = maps:get(level_desc, Default, "common_handler_level"),
|
|
[
|
|
{"level",
|
|
sc(
|
|
log_level(),
|
|
#{
|
|
default => maps:get(level, Default, warning),
|
|
desc => ?DESC(LevelDesc),
|
|
importance => ?IMPORTANCE_HIGH
|
|
}
|
|
)},
|
|
{"enable",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => Enable,
|
|
desc => ?DESC("common_handler_enable"),
|
|
importance => ?IMPORTANCE_MEDIUM
|
|
}
|
|
)},
|
|
{"formatter",
|
|
sc(
|
|
hoconsc:enum([text, json]),
|
|
#{
|
|
aliases => [format],
|
|
default => maps:get(formatter, Default, text),
|
|
desc => ?DESC("common_handler_formatter"),
|
|
importance => ?IMPORTANCE_MEDIUM
|
|
}
|
|
)},
|
|
{"time_offset",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => <<"system">>,
|
|
desc => ?DESC("common_handler_time_offset"),
|
|
validator => fun validate_time_offset/1,
|
|
importance => ?IMPORTANCE_LOW
|
|
}
|
|
)},
|
|
{"chars_limit",
|
|
sc(
|
|
hoconsc:union([unlimited, range(100, inf)]),
|
|
#{
|
|
default => unlimited,
|
|
desc => ?DESC("common_handler_chars_limit"),
|
|
importance => ?IMPORTANCE_HIDDEN
|
|
}
|
|
)},
|
|
{"single_line",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => true,
|
|
desc => ?DESC("common_handler_single_line"),
|
|
importance => ?IMPORTANCE_HIDDEN
|
|
}
|
|
)},
|
|
{"sync_mode_qlen",
|
|
sc(
|
|
non_neg_integer(),
|
|
#{
|
|
default => 100,
|
|
desc => ?DESC("common_handler_sync_mode_qlen"),
|
|
importance => ?IMPORTANCE_HIDDEN
|
|
}
|
|
)},
|
|
{"drop_mode_qlen",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
default => 3000,
|
|
desc => ?DESC("common_handler_drop_mode_qlen"),
|
|
importance => ?IMPORTANCE_HIDDEN
|
|
}
|
|
)},
|
|
{"flush_qlen",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
default => 8000,
|
|
desc => ?DESC("common_handler_flush_qlen"),
|
|
importance => ?IMPORTANCE_HIDDEN
|
|
}
|
|
)},
|
|
{"overload_kill", sc(?R_REF("log_overload_kill"), #{importance => ?IMPORTANCE_HIDDEN})},
|
|
{"burst_limit", sc(?R_REF("log_burst_limit"), #{importance => ?IMPORTANCE_HIDDEN})},
|
|
{"supervisor_reports",
|
|
sc(
|
|
hoconsc:enum([error, progress]),
|
|
#{
|
|
default => error,
|
|
desc => ?DESC("common_handler_supervisor_reports"),
|
|
importance => ?IMPORTANCE_HIDDEN
|
|
}
|
|
)},
|
|
{"max_depth",
|
|
sc(
|
|
hoconsc:union([unlimited, non_neg_integer()]),
|
|
#{
|
|
default => 100,
|
|
desc => ?DESC("common_handler_max_depth"),
|
|
importance => ?IMPORTANCE_HIDDEN
|
|
}
|
|
)}
|
|
].
|
|
|
|
crash_dump_file_default() ->
|
|
case os:getenv("EMQX_LOG_DIR") of
|
|
false ->
|
|
%% testing, or running emqx app as deps
|
|
<<"log/erl_crash.dump">>;
|
|
Dir ->
|
|
unicode:characters_to_binary(filename:join([Dir, "erl_crash.dump"]), utf8)
|
|
end.
|
|
|
|
%% utils
|
|
-spec conf_get(string() | [string()], hocon:config()) -> term().
|
|
conf_get(Key, Conf) -> emqx_schema:conf_get(Key, Conf).
|
|
|
|
conf_get(Key, Conf, Default) -> emqx_schema:conf_get(Key, Conf, Default).
|
|
|
|
filter(Opts) ->
|
|
[{K, V} || {K, V} <- Opts, V =/= undefined].
|
|
|
|
%% @private return a list of keys in a parent field
|
|
-spec keys(string(), hocon:config()) -> [string()].
|
|
keys(Parent, Conf) ->
|
|
[binary_to_list(B) || B <- maps:keys(conf_get(Parent, Conf, #{}))].
|
|
|
|
%% types
|
|
|
|
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
|
|
|
|
map(Name, Type) -> hoconsc:map(Name, Type).
|
|
|
|
cluster_options(static, Conf) ->
|
|
[{seeds, conf_get("cluster.static.seeds", Conf, [])}];
|
|
cluster_options(mcast, Conf) ->
|
|
{ok, Addr} = inet:parse_address(conf_get("cluster.mcast.addr", Conf)),
|
|
{ok, Iface} = inet:parse_address(conf_get("cluster.mcast.iface", Conf)),
|
|
Ports = conf_get("cluster.mcast.ports", Conf),
|
|
[
|
|
{addr, Addr},
|
|
{ports, Ports},
|
|
{iface, Iface},
|
|
{ttl, conf_get("cluster.mcast.ttl", Conf, 1)},
|
|
{loop, conf_get("cluster.mcast.loop", Conf, true)}
|
|
];
|
|
cluster_options(dns, Conf) ->
|
|
[
|
|
{name, conf_get("cluster.dns.name", Conf)},
|
|
{type, conf_get("cluster.dns.record_type", Conf)}
|
|
];
|
|
cluster_options(etcd, Conf) ->
|
|
Namespace = "cluster.etcd.ssl_options",
|
|
SslOpts = fun(C) ->
|
|
Options = keys(Namespace, C),
|
|
lists:map(fun(Key) -> {to_atom(Key), conf_get([Namespace, Key], Conf)} end, Options)
|
|
end,
|
|
[
|
|
{server, conf_get("cluster.etcd.server", Conf)},
|
|
{prefix, conf_get("cluster.etcd.prefix", Conf, "emqxcl")},
|
|
{node_ttl, conf_get("cluster.etcd.node_ttl", Conf, 60)},
|
|
{ssl_options, filter(SslOpts(Conf))}
|
|
];
|
|
cluster_options(k8s, Conf) ->
|
|
[
|
|
{apiserver, conf_get("cluster.k8s.apiserver", Conf)},
|
|
{service_name, conf_get("cluster.k8s.service_name", Conf)},
|
|
{address_type, conf_get("cluster.k8s.address_type", Conf, ip)},
|
|
{namespace, conf_get("cluster.k8s.namespace", Conf)},
|
|
{suffix, conf_get("cluster.k8s.suffix", Conf, "")}
|
|
];
|
|
cluster_options(manual, _Conf) ->
|
|
[].
|
|
|
|
to_atom(Atom) when is_atom(Atom) ->
|
|
Atom;
|
|
to_atom(Str) when is_list(Str) ->
|
|
list_to_atom(Str);
|
|
to_atom(Bin) when is_binary(Bin) ->
|
|
binary_to_atom(Bin, utf8).
|
|
|
|
roots(Module) ->
|
|
lists:map(fun({_BinName, Root}) -> Root end, hocon_schema:roots(Module)).
|
|
|
|
%% Like authentication schema, authorization schema is incomplete in emqx_schema
|
|
%% module, this function replaces the root field "authorization" with a new schema
|
|
emqx_schema_high_prio_roots() ->
|
|
Roots = emqx_schema:roots(high),
|
|
Authz =
|
|
{"authorization",
|
|
sc(
|
|
?R_REF("authorization"),
|
|
#{
|
|
desc => ?DESC(authorization),
|
|
importance => ?IMPORTANCE_HIGH
|
|
}
|
|
)},
|
|
lists:keyreplace("authorization", 1, Roots, Authz).
|
|
|
|
validate_time_offset(Offset) ->
|
|
ValidTimeOffset = "^([\\-\\+][0-1][0-9]:[0-6][0-9]|system|utc)$",
|
|
Error =
|
|
"Invalid time offset, should be of format: +[hh]:[mm], "
|
|
"i.e. +08:00 or -02:00",
|
|
validator_string_re(Offset, ValidTimeOffset, Error).
|
|
|
|
validator_string_re(Val, RE, Error) ->
|
|
try re:run(Val, RE) of
|
|
nomatch -> {error, Error};
|
|
_ -> ok
|
|
catch
|
|
_:_ -> {error, Error}
|
|
end.
|
|
|
|
node_array() ->
|
|
hoconsc:union([emqx_schema:comma_separated_atoms(), hoconsc:array(atom())]).
|
|
|
|
ensure_file_handlers(Conf, _Opts) ->
|
|
FileFields = lists:flatmap(
|
|
fun({F, Schema}) ->
|
|
Alias = [atom_to_binary(A) || A <- maps:get(aliases, Schema, [])],
|
|
[list_to_binary(F) | Alias]
|
|
end,
|
|
fields("log_file_handler")
|
|
),
|
|
HandlersWithoutName = maps:with(FileFields, Conf),
|
|
HandlersWithName = maps:without(FileFields, Conf),
|
|
%% Make sure the handler with name is high priority than the handler without name
|
|
emqx_utils_maps:deep_merge(#{<<"default">> => HandlersWithoutName}, HandlersWithName).
|
|
|
|
convert_rotation(undefined, _Opts) -> undefined;
|
|
convert_rotation(#{} = Rotation, _Opts) -> maps:get(<<"count">>, Rotation, 10);
|
|
convert_rotation(Count, _Opts) when is_integer(Count) -> Count.
|
|
|
|
ensure_unicode_path(undefined, _) ->
|
|
undefined;
|
|
ensure_unicode_path(Path, #{make_serializable := true}) ->
|
|
%% format back to serializable string
|
|
unicode:characters_to_binary(Path, utf8);
|
|
ensure_unicode_path(Path, Opts) when is_binary(Path) ->
|
|
case unicode:characters_to_list(Path, utf8) of
|
|
{R, _, _} when R =:= error orelse R =:= incomplete ->
|
|
throw({"bad_file_path_string", Path});
|
|
PathStr ->
|
|
ensure_unicode_path(PathStr, Opts)
|
|
end;
|
|
ensure_unicode_path(Path, _) when is_list(Path) ->
|
|
Path;
|
|
ensure_unicode_path(Path, _) ->
|
|
throw({"not_string", Path}).
|