1371 lines
40 KiB
Erlang
1371 lines
40 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_conf_schema).
|
|
|
|
-dialyzer(no_return).
|
|
-dialyzer(no_match).
|
|
-dialyzer(no_contracts).
|
|
-dialyzer(no_unused).
|
|
-dialyzer(no_fail_call).
|
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
-include_lib("emqx/include/emqx_authentication.hrl").
|
|
|
|
-type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all.
|
|
-type file() :: string().
|
|
-type cipher() :: map().
|
|
|
|
-behaviour(hocon_schema).
|
|
|
|
-reflect_type([
|
|
log_level/0,
|
|
file/0,
|
|
cipher/0
|
|
]).
|
|
|
|
-export([namespace/0, roots/0, fields/1, translations/0, translation/1, validations/0, desc/1]).
|
|
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
|
|
|
|
%% Static apps which merge their configs into the merged emqx.conf
|
|
%% The list can not be made a dynamic read at run-time as it is used
|
|
%% by nodetool to generate app.<time>.config before EMQX is started
|
|
-define(MERGED_CONFIGS, [
|
|
emqx_bridge_schema,
|
|
emqx_retainer_schema,
|
|
emqx_statsd_schema,
|
|
emqx_authn_schema,
|
|
emqx_authz_schema,
|
|
emqx_auto_subscribe_schema,
|
|
emqx_modules_schema,
|
|
emqx_plugins_schema,
|
|
emqx_dashboard_schema,
|
|
emqx_gateway_schema,
|
|
emqx_prometheus_schema,
|
|
emqx_rule_engine_schema,
|
|
emqx_exhook_schema,
|
|
emqx_psk_schema,
|
|
emqx_limiter_schema,
|
|
emqx_connector_schema,
|
|
emqx_slow_subs_schema
|
|
]).
|
|
|
|
%% root config should not have a namespace
|
|
namespace() -> undefined.
|
|
|
|
roots() ->
|
|
PtKey = ?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY,
|
|
case persistent_term:get(PtKey, undefined) of
|
|
undefined -> persistent_term:put(PtKey, emqx_authn_schema);
|
|
_ -> ok
|
|
end,
|
|
emqx_schema_high_prio_roots() ++
|
|
[
|
|
{"node",
|
|
sc(
|
|
?R_REF("node"),
|
|
#{translate_to => ["emqx"]}
|
|
)},
|
|
{"cluster",
|
|
sc(
|
|
?R_REF("cluster"),
|
|
#{translate_to => ["ekka"]}
|
|
)},
|
|
{"log",
|
|
sc(
|
|
?R_REF("log"),
|
|
#{translate_to => ["kernel"]}
|
|
)},
|
|
{"rpc",
|
|
sc(
|
|
?R_REF("rpc"),
|
|
#{translate_to => ["gen_rpc"]}
|
|
)}
|
|
] ++
|
|
emqx_schema:roots(medium) ++
|
|
emqx_schema:roots(low) ++
|
|
lists:flatmap(fun roots/1, ?MERGED_CONFIGS).
|
|
|
|
validations() ->
|
|
hocon_schema:validations(emqx_schema) ++
|
|
lists:flatmap(fun hocon_schema:validations/1, ?MERGED_CONFIGS).
|
|
|
|
fields("cluster") ->
|
|
[
|
|
{"name",
|
|
sc(
|
|
atom(),
|
|
#{
|
|
mapping => "ekka.cluster_name",
|
|
default => emqxcl,
|
|
desc => ?DESC(cluster_name),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"discovery_strategy",
|
|
sc(
|
|
hoconsc:enum([manual, static, mcast, dns, etcd, k8s]),
|
|
#{
|
|
default => manual,
|
|
desc => ?DESC(cluster_discovery_strategy),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"core_nodes",
|
|
sc(
|
|
emqx_schema:comma_separated_atoms(),
|
|
#{
|
|
mapping => "mria.core_nodes",
|
|
default => [],
|
|
'readOnly' => true,
|
|
desc => ?DESC(db_core_nodes)
|
|
}
|
|
)},
|
|
{"autoclean",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
mapping => "ekka.cluster_autoclean",
|
|
default => "5m",
|
|
desc => ?DESC(cluster_autoclean),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"autoheal",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
mapping => "ekka.cluster_autoheal",
|
|
default => true,
|
|
desc => ?DESC(cluster_autoheal),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"proto_dist",
|
|
sc(
|
|
hoconsc:enum([inet_tcp, inet6_tcp, inet_tls]),
|
|
#{
|
|
mapping => "ekka.proto_dist",
|
|
default => inet_tcp,
|
|
'readOnly' => true,
|
|
desc => ?DESC(cluster_proto_dist)
|
|
}
|
|
)},
|
|
{"static",
|
|
sc(
|
|
?R_REF(cluster_static),
|
|
#{}
|
|
)},
|
|
{"mcast",
|
|
sc(
|
|
?R_REF(cluster_mcast),
|
|
#{}
|
|
)},
|
|
{"dns",
|
|
sc(
|
|
?R_REF(cluster_dns),
|
|
#{}
|
|
)},
|
|
{"etcd",
|
|
sc(
|
|
?R_REF(cluster_etcd),
|
|
#{}
|
|
)},
|
|
{"k8s",
|
|
sc(
|
|
?R_REF(cluster_k8s),
|
|
#{}
|
|
)}
|
|
];
|
|
fields(cluster_static) ->
|
|
[
|
|
{"seeds",
|
|
sc(
|
|
hoconsc:array(atom()),
|
|
#{
|
|
default => [],
|
|
desc => ?DESC(cluster_static_seeds),
|
|
'readOnly' => true
|
|
}
|
|
)}
|
|
];
|
|
fields(cluster_mcast) ->
|
|
[
|
|
{"addr",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => "239.192.0.1",
|
|
desc => ?DESC(cluster_mcast_addr),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"ports",
|
|
sc(
|
|
hoconsc:array(integer()),
|
|
#{
|
|
default => [4369, 4370],
|
|
'readOnly' => true,
|
|
desc => ?DESC(cluster_mcast_ports)
|
|
}
|
|
)},
|
|
{"iface",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => "0.0.0.0",
|
|
desc => ?DESC(cluster_mcast_iface),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"ttl",
|
|
sc(
|
|
range(0, 255),
|
|
#{
|
|
default => 255,
|
|
desc => ?DESC(cluster_mcast_ttl),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"loop",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => true,
|
|
desc => ?DESC(cluster_mcast_loop),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"sndbuf",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
default => "16KB",
|
|
desc => ?DESC(cluster_mcast_sndbuf),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"recbuf",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
default => "16KB",
|
|
desc => ?DESC(cluster_mcast_recbuf),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"buffer",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
default => "32KB",
|
|
desc => ?DESC(cluster_mcast_buffer),
|
|
'readOnly' => true
|
|
}
|
|
)}
|
|
];
|
|
fields(cluster_dns) ->
|
|
[
|
|
{"name",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => "localhost",
|
|
desc => ?DESC(cluster_dns_name),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"record_type",
|
|
sc(
|
|
hoconsc:enum([a, srv]),
|
|
#{
|
|
default => a,
|
|
desc => ?DESC(cluster_dns_record_type),
|
|
'readOnly' => true
|
|
}
|
|
)}
|
|
];
|
|
fields(cluster_etcd) ->
|
|
[
|
|
{"server",
|
|
sc(
|
|
emqx_schema:comma_separated_list(),
|
|
#{
|
|
desc => ?DESC(cluster_etcd_server),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"prefix",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => "emqxcl",
|
|
desc => ?DESC(cluster_etcd_prefix),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"node_ttl",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
default => "1m",
|
|
'readOnly' => true,
|
|
desc => ?DESC(cluster_etcd_node_ttl)
|
|
}
|
|
)},
|
|
{"ssl",
|
|
sc(
|
|
?R_REF(emqx_schema, "ssl_client_opts"),
|
|
#{
|
|
desc => ?DESC(cluster_etcd_ssl),
|
|
'readOnly' => true
|
|
}
|
|
)}
|
|
];
|
|
fields(cluster_k8s) ->
|
|
[
|
|
{"apiserver",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => "http://10.110.111.204:8080",
|
|
desc => ?DESC(cluster_k8s_apiserver),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"service_name",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => "emqx",
|
|
desc => ?DESC(cluster_k8s_service_name),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"address_type",
|
|
sc(
|
|
hoconsc:enum([ip, dns, hostname]),
|
|
#{
|
|
default => ip,
|
|
desc => ?DESC(cluster_k8s_address_type),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"namespace",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => "default",
|
|
desc => ?DESC(cluster_k8s_namespace),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"suffix",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => "pod.local",
|
|
'readOnly' => true,
|
|
desc => ?DESC(cluster_k8s_suffix)
|
|
}
|
|
)}
|
|
];
|
|
fields("node") ->
|
|
[
|
|
{"name",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => "emqx@127.0.0.1",
|
|
'readOnly' => true,
|
|
desc => ?DESC(node_name)
|
|
}
|
|
)},
|
|
{"cookie",
|
|
sc(
|
|
string(),
|
|
#{
|
|
mapping => "vm_args.-setcookie",
|
|
default => "emqxsecretcookie",
|
|
'readOnly' => true,
|
|
sensitive => true,
|
|
desc => ?DESC(node_cookie)
|
|
}
|
|
)},
|
|
{"process_limit",
|
|
sc(
|
|
range(1024, 134217727),
|
|
#{
|
|
mapping => "vm_args.+P",
|
|
desc => ?DESC(process_limit),
|
|
default => 2097152,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"max_ports",
|
|
sc(
|
|
range(1024, 134217727),
|
|
#{
|
|
mapping => "vm_args.+Q",
|
|
desc => ?DESC(max_ports),
|
|
default => 1048576,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"dist_buffer_size",
|
|
sc(
|
|
range(1, 2097151),
|
|
#{
|
|
mapping => "vm_args.+zdbbl",
|
|
desc => ?DESC(dist_buffer_size),
|
|
default => 8192,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"max_ets_tables",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
mapping => "vm_args.+e",
|
|
desc => ?DESC(max_ets_tables),
|
|
default => 262144,
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"data_dir",
|
|
sc(
|
|
string(),
|
|
#{
|
|
required => true,
|
|
'readOnly' => true,
|
|
mapping => "emqx.data_dir",
|
|
desc => ?DESC(node_data_dir)
|
|
}
|
|
)},
|
|
{"config_files",
|
|
sc(
|
|
list(string()),
|
|
#{
|
|
mapping => "emqx.config_files",
|
|
default => undefined,
|
|
'readOnly' => true,
|
|
desc => ?DESC(node_config_files)
|
|
}
|
|
)},
|
|
{"global_gc_interval",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
mapping => "emqx_machine.global_gc_interval",
|
|
default => "15m",
|
|
desc => ?DESC(node_global_gc_interval),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"crash_dump_file",
|
|
sc(
|
|
file(),
|
|
#{
|
|
mapping => "vm_args.-env ERL_CRASH_DUMP",
|
|
desc => ?DESC(node_crash_dump_file),
|
|
default => "log/erl_crash.dump",
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"crash_dump_seconds",
|
|
sc(
|
|
emqx_schema:duration_s(),
|
|
#{
|
|
mapping => "vm_args.-env ERL_CRASH_DUMP_SECONDS",
|
|
default => "30s",
|
|
desc => ?DESC(node_crash_dump_seconds),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"crash_dump_bytes",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
mapping => "vm_args.-env ERL_CRASH_DUMP_BYTES",
|
|
default => "100MB",
|
|
desc => ?DESC(node_crash_dump_bytes),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"dist_net_ticktime",
|
|
sc(
|
|
emqx_schema:duration_s(),
|
|
#{
|
|
mapping => "vm_args.-kernel net_ticktime",
|
|
default => "2m",
|
|
'readOnly' => true,
|
|
desc => ?DESC(node_dist_net_ticktime)
|
|
}
|
|
)},
|
|
{"backtrace_depth",
|
|
sc(
|
|
integer(),
|
|
#{
|
|
mapping => "emqx_machine.backtrace_depth",
|
|
default => 23,
|
|
'readOnly' => true,
|
|
desc => ?DESC(node_backtrace_depth)
|
|
}
|
|
)},
|
|
{"applications",
|
|
sc(
|
|
emqx_schema:comma_separated_atoms(),
|
|
#{
|
|
mapping => "emqx_machine.applications",
|
|
default => [],
|
|
'readOnly' => true,
|
|
desc => ?DESC(node_applications)
|
|
}
|
|
)},
|
|
{"etc_dir",
|
|
sc(
|
|
string(),
|
|
#{
|
|
desc => ?DESC(node_etc_dir),
|
|
'readOnly' => true
|
|
}
|
|
)},
|
|
{"cluster_call",
|
|
sc(
|
|
?R_REF("cluster_call"),
|
|
#{'readOnly' => true}
|
|
)},
|
|
{"db_backend",
|
|
sc(
|
|
hoconsc:enum([mnesia, rlog]),
|
|
#{
|
|
mapping => "mria.db_backend",
|
|
default => rlog,
|
|
'readOnly' => true,
|
|
desc => ?DESC(db_backend)
|
|
}
|
|
)},
|
|
{"db_role",
|
|
sc(
|
|
hoconsc:enum([core, replicant]),
|
|
#{
|
|
mapping => "mria.node_role",
|
|
default => core,
|
|
'readOnly' => true,
|
|
desc => ?DESC(db_role)
|
|
}
|
|
)},
|
|
{"rpc_module",
|
|
sc(
|
|
hoconsc:enum([gen_rpc, rpc]),
|
|
#{
|
|
mapping => "mria.rlog_rpc_module",
|
|
default => gen_rpc,
|
|
'readOnly' => true,
|
|
desc => ?DESC(db_rpc_module)
|
|
}
|
|
)},
|
|
{"tlog_push_mode",
|
|
sc(
|
|
hoconsc:enum([sync, async]),
|
|
#{
|
|
mapping => "mria.tlog_push_mode",
|
|
default => async,
|
|
'readOnly' => true,
|
|
desc => ?DESC(db_tlog_push_mode)
|
|
}
|
|
)},
|
|
{"default_shard_transport",
|
|
sc(
|
|
hoconsc:enum([gen_rpc, distr]),
|
|
#{
|
|
mapping => "mria.shard_transport",
|
|
hidden => true,
|
|
default => gen_rpc,
|
|
desc => ?DESC(db_default_shard_transport)
|
|
}
|
|
)},
|
|
{"shard_transports",
|
|
sc(
|
|
map(shard, hoconsc:enum([gen_rpc, distr])),
|
|
#{
|
|
desc => ?DESC(db_shard_transports),
|
|
hidden => true,
|
|
mapping => "emqx_machine.custom_shard_transports",
|
|
default => #{}
|
|
}
|
|
)}
|
|
];
|
|
fields("cluster_call") ->
|
|
[
|
|
{"retry_interval",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
desc => ?DESC(cluster_call_retry_interval),
|
|
default => "1m"
|
|
}
|
|
)},
|
|
{"max_history",
|
|
sc(
|
|
range(1, 500),
|
|
#{
|
|
desc => ?DESC(cluster_call_max_history),
|
|
default => 100
|
|
}
|
|
)},
|
|
{"cleanup_interval",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
desc => ?DESC(cluster_call_cleanup_interval),
|
|
default => "5m"
|
|
}
|
|
)}
|
|
];
|
|
fields("rpc") ->
|
|
[
|
|
{"mode",
|
|
sc(
|
|
hoconsc:enum([sync, async]),
|
|
#{
|
|
default => async,
|
|
desc => ?DESC(rpc_mode)
|
|
}
|
|
)},
|
|
{"driver",
|
|
sc(
|
|
hoconsc:enum([tcp, ssl]),
|
|
#{
|
|
mapping => "gen_rpc.driver",
|
|
default => tcp,
|
|
desc => ?DESC(rpc_driver)
|
|
}
|
|
)},
|
|
{"async_batch_size",
|
|
sc(
|
|
integer(),
|
|
#{
|
|
mapping => "gen_rpc.max_batch_size",
|
|
default => 256,
|
|
desc => ?DESC(rpc_async_batch_size)
|
|
}
|
|
)},
|
|
{"port_discovery",
|
|
sc(
|
|
hoconsc:enum([manual, stateless]),
|
|
#{
|
|
mapping => "gen_rpc.port_discovery",
|
|
default => stateless,
|
|
desc => ?DESC(rpc_port_discovery)
|
|
}
|
|
)},
|
|
{"tcp_server_port",
|
|
sc(
|
|
integer(),
|
|
#{
|
|
mapping => "gen_rpc.tcp_server_port",
|
|
default => 5369,
|
|
desc => ?DESC(rpc_tcp_server_port)
|
|
}
|
|
)},
|
|
{"ssl_server_port",
|
|
sc(
|
|
integer(),
|
|
#{
|
|
mapping => "gen_rpc.ssl_server_port",
|
|
default => 5369,
|
|
desc => ?DESC(rpc_ssl_server_port)
|
|
}
|
|
)},
|
|
{"tcp_client_num",
|
|
sc(
|
|
range(1, 256),
|
|
#{
|
|
default => 10,
|
|
desc => ?DESC(rpc_tcp_client_num)
|
|
}
|
|
)},
|
|
{"connect_timeout",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
mapping => "gen_rpc.connect_timeout",
|
|
default => "5s",
|
|
desc => ?DESC(rpc_connect_timeout)
|
|
}
|
|
)},
|
|
{"certfile",
|
|
sc(
|
|
file(),
|
|
#{
|
|
mapping => "gen_rpc.certfile",
|
|
desc => ?DESC(rpc_certfile)
|
|
}
|
|
)},
|
|
{"keyfile",
|
|
sc(
|
|
file(),
|
|
#{
|
|
mapping => "gen_rpc.keyfile",
|
|
desc => ?DESC(rpc_keyfile)
|
|
}
|
|
)},
|
|
{"cacertfile",
|
|
sc(
|
|
file(),
|
|
#{
|
|
mapping => "gen_rpc.cacertfile",
|
|
desc => ?DESC(rpc_cacertfile)
|
|
}
|
|
)},
|
|
{"send_timeout",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
mapping => "gen_rpc.send_timeout",
|
|
default => "5s",
|
|
desc => ?DESC(rpc_send_timeout)
|
|
}
|
|
)},
|
|
{"authentication_timeout",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
mapping => "gen_rpc.authentication_timeout",
|
|
default => "5s",
|
|
desc => ?DESC(rpc_authentication_timeout)
|
|
}
|
|
)},
|
|
{"call_receive_timeout",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
mapping => "gen_rpc.call_receive_timeout",
|
|
default => "15s",
|
|
desc => ?DESC(rpc_call_receive_timeout)
|
|
}
|
|
)},
|
|
{"socket_keepalive_idle",
|
|
sc(
|
|
emqx_schema:duration_s(),
|
|
#{
|
|
mapping => "gen_rpc.socket_keepalive_idle",
|
|
default => "15m",
|
|
desc => ?DESC(rpc_socket_keepalive_idle)
|
|
}
|
|
)},
|
|
{"socket_keepalive_interval",
|
|
sc(
|
|
emqx_schema:duration_s(),
|
|
#{
|
|
mapping => "gen_rpc.socket_keepalive_interval",
|
|
default => "75s",
|
|
desc => ?DESC(rpc_socket_keepalive_interval)
|
|
}
|
|
)},
|
|
{"socket_keepalive_count",
|
|
sc(
|
|
integer(),
|
|
#{
|
|
mapping => "gen_rpc.socket_keepalive_count",
|
|
default => 9,
|
|
desc => ?DESC(rpc_socket_keepalive_count)
|
|
}
|
|
)},
|
|
{"socket_sndbuf",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
mapping => "gen_rpc.socket_sndbuf",
|
|
default => "1MB",
|
|
desc => ?DESC(rpc_socket_sndbuf)
|
|
}
|
|
)},
|
|
{"socket_recbuf",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
mapping => "gen_rpc.socket_recbuf",
|
|
default => "1MB",
|
|
desc => ?DESC(rpc_socket_recbuf)
|
|
}
|
|
)},
|
|
{"socket_buffer",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
mapping => "gen_rpc.socket_buffer",
|
|
default => "1MB",
|
|
desc => ?DESC(rpc_socket_buffer)
|
|
}
|
|
)}
|
|
];
|
|
fields("log") ->
|
|
[
|
|
{"console_handler", ?R_REF("console_handler")},
|
|
{"file_handlers",
|
|
sc(
|
|
map(name, ?R_REF("log_file_handler")),
|
|
#{desc => ?DESC("log_file_handlers")}
|
|
)}
|
|
];
|
|
fields("console_handler") ->
|
|
log_handler_common_confs(false);
|
|
fields("log_file_handler") ->
|
|
[
|
|
{"file",
|
|
sc(
|
|
file(),
|
|
#{
|
|
desc => ?DESC("log_file_handler_file"),
|
|
validator => fun validate_file_location/1
|
|
}
|
|
)},
|
|
{"rotation",
|
|
sc(
|
|
?R_REF("log_rotation"),
|
|
#{}
|
|
)},
|
|
{"max_size",
|
|
sc(
|
|
hoconsc:union([infinity, emqx_schema:bytesize()]),
|
|
#{
|
|
default => "50MB",
|
|
desc => ?DESC("log_file_handler_max_size")
|
|
}
|
|
)}
|
|
] ++ log_handler_common_confs(true);
|
|
fields("log_rotation") ->
|
|
[
|
|
{"enable",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => true,
|
|
desc => ?DESC("log_rotation_enable")
|
|
}
|
|
)},
|
|
{"count",
|
|
sc(
|
|
range(1, 2048),
|
|
#{
|
|
default => 10,
|
|
desc => ?DESC("log_rotation_count")
|
|
}
|
|
)}
|
|
];
|
|
fields("log_overload_kill") ->
|
|
[
|
|
{"enable",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => true,
|
|
desc => ?DESC("log_overload_kill_enable")
|
|
}
|
|
)},
|
|
{"mem_size",
|
|
sc(
|
|
emqx_schema:bytesize(),
|
|
#{
|
|
default => "30MB",
|
|
desc => ?DESC("log_overload_kill_mem_size")
|
|
}
|
|
)},
|
|
{"qlen",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
default => 20000,
|
|
desc => ?DESC("log_overload_kill_qlen")
|
|
}
|
|
)},
|
|
{"restart_after",
|
|
sc(
|
|
hoconsc:union([emqx_schema:duration_ms(), infinity]),
|
|
#{
|
|
default => "5s",
|
|
desc => ?DESC("log_overload_kill_restart_after")
|
|
}
|
|
)}
|
|
];
|
|
fields("log_burst_limit") ->
|
|
[
|
|
{"enable",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => true,
|
|
desc => ?DESC("log_burst_limit_enable")
|
|
}
|
|
)},
|
|
{"max_count",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
default => 10000,
|
|
desc => ?DESC("log_burst_limit_max_count")
|
|
}
|
|
)},
|
|
{"window_time",
|
|
sc(
|
|
emqx_schema:duration(),
|
|
#{
|
|
default => "1s",
|
|
desc => ?DESC("log_burst_limit_window_time")
|
|
}
|
|
)}
|
|
];
|
|
fields("authorization") ->
|
|
emqx_schema:fields("authorization") ++
|
|
emqx_authz_schema:fields("authorization").
|
|
|
|
desc("cluster") ->
|
|
?DESC("desc_cluster");
|
|
desc(cluster_static) ->
|
|
?DESC("desc_cluster_static");
|
|
desc(cluster_mcast) ->
|
|
?DESC("desc_cluster_mcast");
|
|
desc(cluster_dns) ->
|
|
?DESC("desc_cluster_dns");
|
|
desc(cluster_etcd) ->
|
|
?DESC("desc_cluster_etcd");
|
|
desc(cluster_k8s) ->
|
|
?DESC("desc_cluster_k8s");
|
|
desc("node") ->
|
|
?DESC("desc_node");
|
|
desc("cluster_call") ->
|
|
?DESC("desc_cluster_call");
|
|
desc("rpc") ->
|
|
?DESC("desc_rpc");
|
|
desc("log") ->
|
|
?DESC("desc_log");
|
|
desc("console_handler") ->
|
|
?DESC("desc_console_handler");
|
|
desc("log_file_handler") ->
|
|
?DESC("desc_log_file_handler");
|
|
desc("log_rotation") ->
|
|
?DESC("desc_log_rotation");
|
|
desc("log_overload_kill") ->
|
|
?DESC("desc_log_overload_kill");
|
|
desc("log_burst_limit") ->
|
|
?DESC("desc_log_burst_limit");
|
|
desc("authorization") ->
|
|
?DESC("desc_authorization");
|
|
desc(_) ->
|
|
undefined.
|
|
|
|
translations() -> ["ekka", "kernel", "emqx", "gen_rpc"].
|
|
|
|
translation("ekka") ->
|
|
[{"cluster_discovery", fun tr_cluster_discovery/1}];
|
|
translation("kernel") ->
|
|
[
|
|
{"logger_level", fun tr_logger_level/1},
|
|
{"logger", fun tr_logger/1},
|
|
{"error_logger", fun(_) -> silent end}
|
|
];
|
|
translation("emqx") ->
|
|
[
|
|
{"config_files", fun tr_config_files/1},
|
|
{"cluster_override_conf_file", fun tr_cluster_override_conf_file/1},
|
|
{"local_override_conf_file", fun tr_local_override_conf_file/1}
|
|
];
|
|
translation("gen_rpc") ->
|
|
[{"default_client_driver", fun tr_default_config_driver/1}].
|
|
|
|
tr_default_config_driver(Conf) ->
|
|
conf_get("rpc.driver", Conf).
|
|
|
|
tr_config_files(Conf) ->
|
|
case conf_get("emqx.config_files", Conf) of
|
|
[_ | _] = Files ->
|
|
Files;
|
|
_ ->
|
|
case os:getenv("EMQX_ETC_DIR") of
|
|
false ->
|
|
[filename:join([code:lib_dir(emqx), "etc", "emqx.conf"])];
|
|
Dir ->
|
|
[filename:join([Dir, "emqx.conf"])]
|
|
end
|
|
end.
|
|
|
|
tr_cluster_override_conf_file(Conf) ->
|
|
tr_override_conf_file(Conf, "cluster-override.conf").
|
|
|
|
tr_local_override_conf_file(Conf) ->
|
|
tr_override_conf_file(Conf, "local-override.conf").
|
|
|
|
tr_override_conf_file(Conf, Filename) ->
|
|
DataDir = conf_get("node.data_dir", Conf),
|
|
%% assert, this config is not nullable
|
|
[_ | _] = DataDir,
|
|
filename:join([DataDir, "configs", Filename]).
|
|
|
|
tr_cluster_discovery(Conf) ->
|
|
Strategy = conf_get("cluster.discovery_strategy", Conf),
|
|
{Strategy, filter(cluster_options(Strategy, Conf))}.
|
|
|
|
-spec tr_logger_level(hocon:config()) -> logger:level().
|
|
tr_logger_level(Conf) ->
|
|
ConsoleLevel = conf_get("log.console_handler.level", Conf, undefined),
|
|
FileLevels = [
|
|
conf_get("level", SubConf)
|
|
|| {_, SubConf} <-
|
|
logger_file_handlers(Conf)
|
|
],
|
|
case FileLevels ++ [ConsoleLevel || ConsoleLevel =/= undefined] of
|
|
%% warning is the default level we should use
|
|
[] -> warning;
|
|
Levels -> least_severe_log_level(Levels)
|
|
end.
|
|
|
|
logger_file_handlers(Conf) ->
|
|
Handlers = maps:to_list(conf_get("log.file_handlers", Conf, #{})),
|
|
lists:filter(
|
|
fun({_Name, Opts}) ->
|
|
B = conf_get("enable", Opts),
|
|
true = is_boolean(B),
|
|
B
|
|
end,
|
|
Handlers
|
|
).
|
|
|
|
tr_logger(Conf) ->
|
|
%% For the default logger that outputs to console
|
|
ConsoleHandler =
|
|
case conf_get("log.console_handler.enable", Conf) of
|
|
true ->
|
|
ConsoleConf = conf_get("log.console_handler", Conf),
|
|
[
|
|
{handler, console, logger_std_h, #{
|
|
level => conf_get("log.console_handler.level", Conf),
|
|
config => (log_handler_conf(ConsoleConf))#{type => standard_io},
|
|
formatter => log_formatter(ConsoleConf),
|
|
filters => log_filter(ConsoleConf)
|
|
}}
|
|
];
|
|
false ->
|
|
[]
|
|
end,
|
|
%% For the file logger
|
|
FileHandlers =
|
|
[
|
|
begin
|
|
{handler, to_atom(HandlerName), logger_disk_log_h, #{
|
|
level => conf_get("level", SubConf),
|
|
config => (log_handler_conf(SubConf))#{
|
|
type =>
|
|
case conf_get("rotation.enable", SubConf) of
|
|
true -> wrap;
|
|
_ -> halt
|
|
end,
|
|
file => conf_get("file", SubConf),
|
|
max_no_files => conf_get("rotation.count", SubConf),
|
|
max_no_bytes => conf_get("max_size", SubConf)
|
|
},
|
|
formatter => log_formatter(SubConf),
|
|
filters => log_filter(SubConf),
|
|
filesync_repeat_interval => no_repeat
|
|
}}
|
|
end
|
|
|| {HandlerName, SubConf} <- logger_file_handlers(Conf)
|
|
],
|
|
[{handler, default, undefined}] ++ ConsoleHandler ++ FileHandlers.
|
|
|
|
log_handler_common_confs(Enable) ->
|
|
[
|
|
{"enable",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => Enable,
|
|
desc => ?DESC("common_handler_enable")
|
|
}
|
|
)},
|
|
{"level",
|
|
sc(
|
|
log_level(),
|
|
#{
|
|
default => warning,
|
|
desc => ?DESC("common_handler_level")
|
|
}
|
|
)},
|
|
{"time_offset",
|
|
sc(
|
|
string(),
|
|
#{
|
|
default => "system",
|
|
desc => ?DESC("common_handler_time_offset"),
|
|
validator => fun validate_time_offset/1
|
|
}
|
|
)},
|
|
{"chars_limit",
|
|
sc(
|
|
hoconsc:union([unlimited, range(100, inf)]),
|
|
#{
|
|
default => unlimited,
|
|
desc => ?DESC("common_handler_chars_limit")
|
|
}
|
|
)},
|
|
{"formatter",
|
|
sc(
|
|
hoconsc:enum([text, json]),
|
|
#{
|
|
default => text,
|
|
desc => ?DESC("common_handler_formatter")
|
|
}
|
|
)},
|
|
{"single_line",
|
|
sc(
|
|
boolean(),
|
|
#{
|
|
default => true,
|
|
desc => ?DESC("common_handler_single_line")
|
|
}
|
|
)},
|
|
{"sync_mode_qlen",
|
|
sc(
|
|
non_neg_integer(),
|
|
#{
|
|
default => 100,
|
|
desc => ?DESC("common_handler_sync_mode_qlen")
|
|
}
|
|
)},
|
|
{"drop_mode_qlen",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
default => 3000,
|
|
desc => ?DESC("common_handler_drop_mode_qlen")
|
|
}
|
|
)},
|
|
{"flush_qlen",
|
|
sc(
|
|
pos_integer(),
|
|
#{
|
|
default => 8000,
|
|
desc => ?DESC("common_handler_flush_qlen")
|
|
}
|
|
)},
|
|
{"overload_kill", sc(?R_REF("log_overload_kill"), #{})},
|
|
{"burst_limit", sc(?R_REF("log_burst_limit"), #{})},
|
|
{"supervisor_reports",
|
|
sc(
|
|
hoconsc:enum([error, progress]),
|
|
#{
|
|
default => error,
|
|
desc => ?DESC("common_handler_supervisor_reports")
|
|
}
|
|
)},
|
|
{"max_depth",
|
|
sc(
|
|
hoconsc:union([unlimited, non_neg_integer()]),
|
|
#{
|
|
default => 100,
|
|
desc => ?DESC("common_handler_max_depth")
|
|
}
|
|
)}
|
|
].
|
|
|
|
log_handler_conf(Conf) ->
|
|
SycModeQlen = conf_get("sync_mode_qlen", Conf),
|
|
DropModeQlen = conf_get("drop_mode_qlen", Conf),
|
|
FlushQlen = conf_get("flush_qlen", Conf),
|
|
Overkill = conf_get("overload_kill", Conf),
|
|
BurstLimit = conf_get("burst_limit", Conf),
|
|
#{
|
|
sync_mode_qlen => SycModeQlen,
|
|
drop_mode_qlen => DropModeQlen,
|
|
flush_qlen => FlushQlen,
|
|
overload_kill_enable => conf_get("enable", Overkill),
|
|
overload_kill_qlen => conf_get("qlen", Overkill),
|
|
overload_kill_mem_size => conf_get("mem_size", Overkill),
|
|
overload_kill_restart_after => conf_get("restart_after", Overkill),
|
|
burst_limit_enable => conf_get("enable", BurstLimit),
|
|
burst_limit_max_count => conf_get("max_count", BurstLimit),
|
|
burst_limit_window_time => conf_get("window_time", BurstLimit)
|
|
}.
|
|
|
|
log_formatter(Conf) ->
|
|
CharsLimit =
|
|
case conf_get("chars_limit", Conf) of
|
|
unlimited -> unlimited;
|
|
V when V > 0 -> V
|
|
end,
|
|
TimeOffSet =
|
|
case conf_get("time_offset", Conf) of
|
|
"system" -> "";
|
|
"utc" -> 0;
|
|
OffSetStr -> OffSetStr
|
|
end,
|
|
SingleLine = conf_get("single_line", Conf),
|
|
Depth = conf_get("max_depth", Conf),
|
|
do_formatter(conf_get("formatter", Conf), CharsLimit, SingleLine, TimeOffSet, Depth).
|
|
|
|
%% helpers
|
|
do_formatter(json, CharsLimit, SingleLine, TimeOffSet, Depth) ->
|
|
{emqx_logger_jsonfmt, #{
|
|
chars_limit => CharsLimit,
|
|
single_line => SingleLine,
|
|
time_offset => TimeOffSet,
|
|
depth => Depth
|
|
}};
|
|
do_formatter(text, CharsLimit, SingleLine, TimeOffSet, Depth) ->
|
|
{emqx_logger_textfmt, #{
|
|
template => [time, " [", level, "] ", msg, "\n"],
|
|
chars_limit => CharsLimit,
|
|
single_line => SingleLine,
|
|
time_offset => TimeOffSet,
|
|
depth => Depth
|
|
}}.
|
|
|
|
log_filter(Conf) ->
|
|
case conf_get("supervisor_reports", Conf) of
|
|
error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}];
|
|
progress -> []
|
|
end.
|
|
|
|
least_severe_log_level(Levels) ->
|
|
hd(sort_log_levels(Levels)).
|
|
|
|
sort_log_levels(Levels) ->
|
|
lists:sort(
|
|
fun(A, B) ->
|
|
case logger:compare_levels(A, B) of
|
|
R when R == lt; R == eq -> true;
|
|
gt -> false
|
|
end
|
|
end,
|
|
Levels
|
|
).
|
|
|
|
%% utils
|
|
-spec conf_get(string() | [string()], hocon:config()) -> term().
|
|
conf_get(Key, Conf) ->
|
|
ensure_list(hocon_maps:get(Key, Conf)).
|
|
|
|
conf_get(Key, Conf, Default) ->
|
|
ensure_list(hocon_maps:get(Key, Conf, Default)).
|
|
|
|
filter(Opts) ->
|
|
[{K, V} || {K, V} <- Opts, V =/= undefined].
|
|
|
|
%% @private return a list of keys in a parent field
|
|
-spec keys(string(), hocon:config()) -> [string()].
|
|
keys(Parent, Conf) ->
|
|
[binary_to_list(B) || B <- maps:keys(conf_get(Parent, Conf, #{}))].
|
|
|
|
%% types
|
|
|
|
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
|
|
|
|
map(Name, Type) -> hoconsc:map(Name, Type).
|
|
|
|
cluster_options(static, Conf) ->
|
|
[{seeds, conf_get("cluster.static.seeds", Conf, [])}];
|
|
cluster_options(mcast, Conf) ->
|
|
{ok, Addr} = inet:parse_address(conf_get("cluster.mcast.addr", Conf)),
|
|
{ok, Iface} = inet:parse_address(conf_get("cluster.mcast.iface", Conf)),
|
|
Ports = conf_get("cluster.mcast.ports", Conf),
|
|
[
|
|
{addr, Addr},
|
|
{ports, Ports},
|
|
{iface, Iface},
|
|
{ttl, conf_get("cluster.mcast.ttl", Conf, 1)},
|
|
{loop, conf_get("cluster.mcast.loop", Conf, true)}
|
|
];
|
|
cluster_options(dns, Conf) ->
|
|
[
|
|
{name, conf_get("cluster.dns.name", Conf)},
|
|
{type, conf_get("cluster.dns.record_type", Conf)}
|
|
];
|
|
cluster_options(etcd, Conf) ->
|
|
Namespace = "cluster.etcd.ssl",
|
|
SslOpts = fun(C) ->
|
|
Options = keys(Namespace, C),
|
|
lists:map(fun(Key) -> {to_atom(Key), conf_get([Namespace, Key], Conf)} end, Options)
|
|
end,
|
|
[
|
|
{server, conf_get("cluster.etcd.server", Conf)},
|
|
{prefix, conf_get("cluster.etcd.prefix", Conf, "emqxcl")},
|
|
{node_ttl, conf_get("cluster.etcd.node_ttl", Conf, 60)},
|
|
{ssl_options, filter(SslOpts(Conf))}
|
|
];
|
|
cluster_options(k8s, Conf) ->
|
|
[
|
|
{apiserver, conf_get("cluster.k8s.apiserver", Conf)},
|
|
{service_name, conf_get("cluster.k8s.service_name", Conf)},
|
|
{address_type, conf_get("cluster.k8s.address_type", Conf, ip)},
|
|
{namespace, conf_get("cluster.k8s.namespace", Conf)},
|
|
{suffix, conf_get("cluster.k8s.suffix", Conf, "")}
|
|
];
|
|
cluster_options(manual, _Conf) ->
|
|
[].
|
|
|
|
to_atom(Atom) when is_atom(Atom) ->
|
|
Atom;
|
|
to_atom(Str) when is_list(Str) ->
|
|
list_to_atom(Str);
|
|
to_atom(Bin) when is_binary(Bin) ->
|
|
binary_to_atom(Bin, utf8).
|
|
|
|
-spec ensure_list(binary() | list(char())) -> list(char()).
|
|
ensure_list(V) ->
|
|
case is_binary(V) of
|
|
true ->
|
|
binary_to_list(V);
|
|
false ->
|
|
V
|
|
end.
|
|
|
|
roots(Module) ->
|
|
lists:map(fun({_BinName, Root}) -> Root end, hocon_schema:roots(Module)).
|
|
|
|
%% Like authentication schema, authorization schema is incomplete in emqx_schema
|
|
%% module, this function replaces the root filed "authorization" with a new schema
|
|
emqx_schema_high_prio_roots() ->
|
|
Roots = emqx_schema:roots(high),
|
|
Authz =
|
|
{"authorization",
|
|
sc(
|
|
?R_REF("authorization"),
|
|
#{desc => ?DESC(authorization)}
|
|
)},
|
|
lists:keyreplace("authorization", 1, Roots, Authz).
|
|
|
|
validate_file_location(File) ->
|
|
ValidFile = "^[/\\_a-zA-Z0-9\\.\\-]*$",
|
|
Error = "Invalid file name: " ++ ValidFile,
|
|
validator_string_re(File, ValidFile, Error).
|
|
|
|
validate_time_offset(Offset) ->
|
|
ValidTimeOffset = "^([\\-\\+][0-1][0-9]:[0-6][0-9]|system|utc)$",
|
|
Error =
|
|
"Invalid time offset, should be of format: +[hh]:[mm], "
|
|
"i.e. +08:00 or -02:00",
|
|
validator_string_re(Offset, ValidTimeOffset, Error).
|
|
|
|
validator_string_re(Val, RE, Error) ->
|
|
try re:run(Val, RE) of
|
|
nomatch -> {error, Error};
|
|
_ -> ok
|
|
catch
|
|
_:_ -> {error, Error}
|
|
end.
|