refactor(schema): prepare for hocon schema doc generation

This commit is contained in:
Zaiming Shi 2021-08-31 22:05:31 +02:00
parent ffbf9b0fab
commit ec13463f4a
29 changed files with 1537 additions and 646 deletions

View File

@ -15,7 +15,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.14.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.15.0"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,34 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_zone_schema).
-export([namespace/0, roots/0, fields/1]).
namespace() -> zone.
roots() -> [].
%% zone schemas are clones from the same name from root level
%% only not allowed to have default values.
fields(Name) ->
[{N, no_default(Sc)} || {N, Sc} <- emqx_schema:fields(Name)].
%% no default values for zone settings
no_default(Sc) ->
fun(default) -> undefined;
(Other) -> hocon_schema:field_schema(Sc, Other)
end.

View File

@ -21,7 +21,8 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1
]).
@ -32,6 +33,8 @@
-export([ authenticators/1
]).
namespace() -> authn.
roots() -> [ "authentication" ].
fields("authentication") ->

View File

@ -21,7 +21,8 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1
]).
@ -74,6 +75,8 @@ mnesia(copy) ->
%% Hocon Schema
%%------------------------------------------------------------------------------
namespace() -> "authn:scram:builtin_db".
roots() -> [config].
fields(config) ->

View File

@ -22,7 +22,8 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1
, validations/0
]).
@ -37,9 +38,11 @@
%% Hocon Schema
%%------------------------------------------------------------------------------
namespace() -> "authn:http".
roots() ->
[ {config, {union, [ hoconsc:t(get)
, hoconsc:t(post)
[ {config, {union, [ hoconsc:ref(?MODULE, get)
, hoconsc:ref(?MODULE, post)
]}}
].

View File

@ -20,7 +20,8 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1
]).
@ -34,10 +35,12 @@
%% Hocon Schema
%%------------------------------------------------------------------------------
namespace() -> "authn:jwt".
roots() ->
[ {config, {union, [ hoconsc:t('hmac-based')
, hoconsc:t('public-key')
, hoconsc:t('jwks')
[ {config, {union, [ hoconsc:mk('hmac-based')
, hoconsc:mk('public-key')
, hoconsc:mk('jwks')
]}}
].

View File

@ -21,7 +21,7 @@
-behaviour(hocon_schema).
-export([ roots/0, fields/1 ]).
-export([ namespace/0, roots/0, fields/1 ]).
-export([ create/1
, update/2
@ -79,6 +79,8 @@ mnesia(copy) ->
%% Hocon Schema
%%------------------------------------------------------------------------------
namespace() -> "authn:builtin_db".
roots() -> [config].
fields(config) ->
@ -102,7 +104,8 @@ user_id_type(type) -> user_id_type();
user_id_type(default) -> username;
user_id_type(_) -> undefined.
password_hash_algorithm(type) -> {union, [hoconsc:ref(bcrypt), hoconsc:ref(other_algorithms)]};
password_hash_algorithm(type) -> hoconsc:union([hoconsc:ref(?MODULE, bcrypt),
hoconsc:ref(?MODULE, other_algorithms)]);
password_hash_algorithm(default) -> #{<<"name">> => sha256};
password_hash_algorithm(_) -> undefined.

View File

@ -22,7 +22,8 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1
]).
@ -36,10 +37,12 @@
%% Hocon Schema
%%------------------------------------------------------------------------------
namespace() -> "authn:mongodb".
roots() ->
[ {config, {union, [ hoconsc:t(standalone)
, hoconsc:t('replica-set')
, hoconsc:t('sharded-cluster')
[ {config, {union, [ hoconsc:mk(standalone)
, hoconsc:mk('replica-set')
, hoconsc:mk('sharded-cluster')
]}}
].

View File

@ -22,7 +22,8 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1
]).
@ -36,6 +37,8 @@
%% Hocon Schema
%%------------------------------------------------------------------------------
namespace() -> "authn:mysql".
roots() -> [config].
fields(config) ->

View File

@ -23,7 +23,7 @@
-behaviour(hocon_schema).
-export([ roots/0, fields/1 ]).
-export([ namespace/0, roots/0, fields/1 ]).
-export([ create/1
, update/2
@ -35,6 +35,8 @@
%% Hocon Schema
%%------------------------------------------------------------------------------
namespace() -> "authn:postgres".
roots() -> [config].
fields(config) ->

View File

@ -22,7 +22,8 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1
]).
@ -36,10 +37,11 @@
%% Hocon Schema
%%------------------------------------------------------------------------------
namespace() -> "authn:redis".
roots() ->
[ {config, {union, [ hoconsc:t(standalone)
, hoconsc:t(cluster)
, hoconsc:t(sentinel)
[ {config, {union, [ hoconsc:mk(standalone)
, hoconsc:mk(cluster)
, hoconsc:mk(sentinel)
]}}
].

View File

@ -13,11 +13,14 @@
-type permission() :: allow | deny.
-type url() :: emqx_http_lib:uri_map().
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1
]).
roots() -> ["authorization"].
namespace() -> authz.
roots() -> [].
fields("authorization") ->
[ {sources, #{type => union_array(

View File

@ -19,25 +19,30 @@
-include_lib("typerefl/include/types.hrl").
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1]).
namespace() -> "auto_subscribe".
roots() ->
["auto_subscribe"].
fields("auto_subscribe") ->
[ {topics, hoconsc:array(hoconsc:ref(?MODULE, "topic"))}];
[ {topics, hoconsc:array(hoconsc:ref(?MODULE, "topic"))}
];
fields("topic") ->
[ {topic, emqx_schema:t(binary())}
, {qos, t(hoconsc:union([0, 1, 2]), 0)}
, {rh, t(hoconsc:union([0, 1, 2]), 0)}
, {rap, t(hoconsc:union([0, 1]), 0)}
, {nl, t(hoconsc:union([0, 1]), 0)}
[ {topic, sc(binary(), #{})}
, {qos, sc(typerefl:union([0, 1, 2]), #{default => 0})}
, {rh, sc(typerefl:union([0, 1, 2]), #{default => 0})}
, {rap, sc(typerefl:union([0, 1]), #{default => 0})}
, {nl, sc(typerefl:union([0, 1]), #{default => 0})}
].
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
t(Type, Default) ->
hoconsc:t(Type, #{default => Default}).
sc(Type, Meta) ->
hoconsc:mk(Type, Meta).

View File

@ -20,55 +20,60 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1]).
namespace() -> "bridge_mqtt".
roots() -> [array("bridge_mqtt")].
array(Name) -> {Name, hoconsc:array(hoconsc:ref(Name))}.
array(Name) -> {Name, hoconsc:array(hoconsc:ref(?MODULE, Name))}.
fields("bridge_mqtt") ->
[ {name, emqx_schema:t(string(), undefined, true)}
[ {name, sc(string(), #{default => true})}
, {start_type, fun start_type/1}
, {forwards, fun forwards/1}
, {forward_mountpoint, emqx_schema:t(string())}
, {reconnect_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")}
, {batch_size, emqx_schema:t(integer(), undefined, 100)}
, {queue, emqx_schema:t(hoconsc:ref(?MODULE, "queue"))}
, {config, hoconsc:union([hoconsc:ref(?MODULE, "mqtt"), hoconsc:ref(?MODULE, "rpc")])}
, {forward_mountpoint, sc(string(), #{})}
, {reconnect_interval, sc(emqx_schema:duration_ms(), #{default => "30s"})}
, {batch_size, sc(integer(), #{default => 100})}
, {queue, sc(hoconsc:ref(?MODULE, "queue"), #{})}
, {config, sc(hoconsc:union([hoconsc:ref(?MODULE, "mqtt"),
hoconsc:ref(?MODULE, "rpc")]),
#{})}
];
fields("mqtt") ->
[ {conn_type, fun conn_type/1}
, {address, emqx_schema:t(string(), undefined, "127.0.0.1:1883")}
, {address, sc(string(), #{default => "127.0.0.1:1883"})}
, {proto_ver, fun proto_ver/1}
, {bridge_mode, emqx_schema:t(boolean(), undefined, true)}
, {clientid, emqx_schema:t(string())}
, {username, emqx_schema:t(string())}
, {password, emqx_schema:t(string())}
, {clean_start, emqx_schema:t(boolean(), undefined, true)}
, {keepalive, emqx_schema:t(integer(), undefined, 300)}
, {subscriptions, hoconsc:array("subscriptions")}
, {receive_mountpoint, emqx_schema:t(string())}
, {retry_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")}
, {max_inflight, emqx_schema:t(integer(), undefined, 32)}
, {bridge_mode, sc(boolean(), #{default => true})}
, {clientid, sc(string(), #{})}
, {username, sc(string(), #{})}
, {password, sc(string(), #{})}
, {clean_start, sc(boolean(), #{default => true})}
, {keepalive, sc(integer(), #{default => 300})}
, {subscriptions, sc(hoconsc:array(hoconsc:ref(?MODULE, "subscriptions")), #{})}
, {receive_mountpoint, sc(string(), #{})}
, {retry_interval, sc(emqx_schema:duration_ms(), #{default => "30s"})}
, {max_inflight, sc(integer(), #{default => 32})}
];
fields("rpc") ->
[ {conn_type, fun conn_type/1}
, {node, emqx_schema:t(atom(), undefined, 'emqx@127.0.0.1')}
, {node, sc(atom(), #{default => 'emqx@127.0.0.1'})}
];
fields("subscriptions") ->
[ {topic, #{type => binary(), nullable => false}}
, {qos, emqx_schema:t(integer(), undefined, 1)}
, {qos, sc(integer(), #{default => 1})}
];
fields("queue") ->
[ {replayq_dir, hoconsc:union([boolean(), string()])}
, {replayq_seg_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "100MB")}
, {replayq_offload_mode, emqx_schema:t(boolean(), undefined, false)}
, {replayq_max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")}
, {replayq_seg_bytes, sc(emqx_schema:bytesize(), #{default => "100MB"})}
, {replayq_offload_mode, sc(boolean(), #{default => false})}
, {replayq_max_total_bytes, sc(emqx_schema:bytesize(), #{default => "1024MB"})}
].
conn_type(type) -> hoconsc:enum([mqtt, rpc]);
@ -85,3 +90,5 @@ start_type(_) -> undefined.
forwards(type) -> hoconsc:array(binary());
forwards(default) -> [];
forwards(_) -> undefined.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -82,10 +82,7 @@ mongo_fields() ->
, {auth_source, #{type => binary(),
nullable => true}}
, {database, fun emqx_connector_schema_lib:database/1}
, {topology, #{type => hoconsc:ref(?MODULE, topology),
default => #{}}}
%% TODO: Does the ref type support nullable=ture ?
% nullable => true}}
, {topology, #{type => hoconsc:ref(?MODULE, topology)}}
] ++
emqx_connector_schema_lib:ssl_fields().
@ -178,7 +175,7 @@ do_start(InstId, Opts0, Config = #{mongo_type := Type,
];
false -> [{ssl, false}]
end,
Topology= maps:get(topology, Config, #{}),
Topology= maps:get(topology, Config, #{}),
Opts = Opts0 ++
[{pool_size, PoolSize},
{options, init_topology_options(maps:to_list(Topology), [])},

View File

@ -27,19 +27,19 @@ fields("emqx_dashboard") ->
hoconsc:ref(?MODULE, "https")]))}
, {default_username, fun default_username/1}
, {default_password, fun default_password/1}
, {sample_interval, emqx_schema:t(emqx_schema:duration_s(), undefined, "10s")}
, {token_expired_time, emqx_schema:t(emqx_schema:duration(), undefined, "30m")}
, {sample_interval, sc(emqx_schema:duration_s(), #{default => "10s"})}
, {token_expired_time, sc(emqx_schema:duration(), #{default => "30m"})}
];
fields("http") ->
[ {"protocol", hoconsc:enum([http, https])}
, {"port", emqx_schema:t(integer(), undefined, 18083)}
, {"num_acceptors", emqx_schema:t(integer(), undefined, 4)}
, {"max_connections", emqx_schema:t(integer(), undefined, 512)}
, {"backlog", emqx_schema:t(integer(), undefined, 1024)}
, {"send_timeout", emqx_schema:t(emqx_schema:duration(), undefined, "5s")}
, {"inet6", emqx_schema:t(boolean(), undefined, false)}
, {"ipv6_v6only", emqx_schema:t(boolean(), undefined, false)}
, {"port", hoconsc:mk(integer(), #{default => 18083})}
, {"num_acceptors", sc(integer(), #{default => 4})}
, {"max_connections", sc(integer(), #{default => 512})}
, {"backlog", sc(integer(), #{default => 1024})}
, {"send_timeout", sc(emqx_schema:duration(), #{default => "5s"})}
, {"inet6", sc(boolean(), #{default => false})}
, {"ipv6_v6only", sc(boolean(), #{dfeault => false})}
];
fields("https") ->
@ -54,3 +54,5 @@ default_password(type) -> string();
default_password(default) -> "public";
default_password(nullable) -> false;
default_password(_) -> undefined.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -22,5 +22,5 @@ fields(ldap) -> connector_fields(ldap).
connector_fields(DB) ->
Mod = list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
[{name, hoconsc:t(typerefl:binary())},
[{name, hoconsc:mk(typerefl:binary())},
{type, #{type => DB}}] ++ Mod:roots().

View File

@ -32,43 +32,57 @@
-reflect_type([duration/0]).
-export([roots/0, fields/1]).
-export([namespace/0, roots/0, fields/1]).
-export([t/1, t/3, t/4, ref/1]).
namespace() -> exhook.
roots() -> [exhook].
fields(exhook) ->
[ {request_failed_action, t(union([deny, ignore]), undefined, deny)}
, {request_timeout, t(duration(), undefined, "5s")}
, {auto_reconnect, t(union([false, duration()]), undefined, "60s")}
, {servers, t(hoconsc:array(ref(servers)), undefined, [])}
[ {request_failed_action,
sc(union([deny, ignore]),
#{default => deny})}
, {request_timeout,
sc(duration(),
#{default => "5s"})}
, {auto_reconnect,
sc(union([false, duration()]),
#{ default => "60s"
})}
, {servers,
sc(hoconsc:array(ref(servers)),
#{default => []})}
];
fields(servers) ->
[ {name, string()}
, {url, string()}
, {ssl, t(ref(ssl_conf_group))}
[ {name,
sc(string(),
#{})}
, {url,
sc(string(),
#{})}
, {ssl,
sc(ref(ssl_conf),
#{})}
];
fields(ssl_conf_group) ->
[ {cacertfile, string()}
, {certfile, string()}
, {keyfile, string()}
fields(ssl_conf) ->
[ {cacertfile,
sc(string(),
#{})
}
, {certfile,
sc(string(),
#{})
}
, {keyfile,
sc(string(),
#{})}
].
%% types
t(Type) -> #{type => Type}.
t(Type, Mapping, Default) ->
hoconsc:t(Type, #{mapping => Mapping, default => Default}).
t(Type, Mapping, Default, OverrideEnv) ->
hoconsc:t(Type, #{ mapping => Mapping
, default => Default
, override_env => OverrideEnv
}).
sc(Type, Meta) -> Meta#{type => Type}.
ref(Field) ->
hoconsc:ref(?MODULE, Field).

View File

@ -43,148 +43,149 @@
, ip_port/0
]).
-export([roots/0 , fields/1]).
-export([t/1, t/3, t/4, ref/1]).
-export([namespace/0, roots/0 , fields/1]).
namespace() -> gateway.
roots() -> [gateway].
fields(gateway) ->
[{stomp, t(ref(stomp_structs))},
{mqttsn, t(ref(mqttsn_structs))},
{coap, t(ref(coap_structs))},
{lwm2m, t(ref(lwm2m_structs))},
{exproto, t(ref(exproto_structs))}
[{stomp, sc(ref(stomp_structs))},
{mqttsn, sc(ref(mqttsn_structs))},
{coap, sc(ref(coap_structs))},
{lwm2m, sc(ref(lwm2m_structs))},
{exproto, sc(ref(exproto_structs))}
];
fields(stomp_structs) ->
[ {frame, t(ref(stomp_frame))}
, {listeners, t(ref(tcp_listener_group))}
[ {frame, sc(ref(stomp_frame))}
, {listeners, sc(ref(tcp_listener_group))}
] ++ gateway_common_options();
fields(stomp_frame) ->
[ {max_headers, t(integer(), undefined, 10)}
, {max_headers_length, t(integer(), undefined, 1024)}
, {max_body_length, t(integer(), undefined, 8192)}
[ {max_headers, sc(integer(), undefined, 10)}
, {max_headers_length, sc(integer(), undefined, 1024)}
, {max_body_length, sc(integer(), undefined, 8192)}
];
fields(mqttsn_structs) ->
[ {gateway_id, t(integer())}
, {broadcast, t(boolean())}
, {enable_qos3, t(boolean())}
[ {gateway_id, sc(integer())}
, {broadcast, sc(boolean())}
, {enable_qos3, sc(boolean())}
, {predefined, hoconsc:array(ref(mqttsn_predefined))}
, {listeners, t(ref(udp_listener_group))}
, {listeners, sc(ref(udp_listener_group))}
] ++ gateway_common_options();
fields(mqttsn_predefined) ->
[ {id, t(integer())}
, {topic, t(binary())}
[ {id, sc(integer())}
, {topic, sc(binary())}
];
fields(coap_structs) ->
[ {heartbeat, t(duration(), undefined, <<"30s">>)}
, {notify_type, t(union([non, con, qos]), undefined, qos)}
, {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {listeners, t(ref(udp_listener_group))}
[ {heartbeat, sc(duration(), undefined, <<"30s">>)}
, {notify_type, sc(union([non, con, qos]), undefined, qos)}
, {subscribe_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {publish_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)}
, {listeners, sc(ref(udp_listener_group))}
] ++ gateway_common_options();
fields(lwm2m_structs) ->
[ {xml_dir, t(binary())}
, {lifetime_min, t(duration())}
, {lifetime_max, t(duration())}
, {qmode_time_windonw, t(integer())}
, {auto_observe, t(boolean())}
, {mountpoint, t(string())}
, {update_msg_publish_condition, t(union([always, contains_object_list]))}
, {translators, t(ref(translators))}
, {listeners, t(ref(udp_listener_group))}
[ {xml_dir, sc(binary())}
, {lifetime_min, sc(duration())}
, {lifetime_max, sc(duration())}
, {qmode_time_windonw, sc(integer())}
, {auto_observe, sc(boolean())}
, {mountpoint, sc(string())}
, {update_msg_publish_condition, sc(union([always, contains_object_list]))}
, {translators, sc(ref(translators))}
, {listeners, sc(ref(udp_listener_group))}
] ++ gateway_common_options();
fields(exproto_structs) ->
[ {server, t(ref(exproto_grpc_server))}
, {handler, t(ref(exproto_grpc_handler))}
, {listeners, t(ref(udp_tcp_listener_group))}
[ {server, sc(ref(exproto_grpc_server))}
, {handler, sc(ref(exproto_grpc_handler))}
, {listeners, sc(ref(udp_tcp_listener_group))}
] ++ gateway_common_options();
fields(exproto_grpc_server) ->
[ {bind, t(union(ip_port(), integer()))}
[ {bind, sc(union(ip_port(), integer()))}
%% TODO: ssl options
];
fields(exproto_grpc_handler) ->
[ {address, t(binary())}
[ {address, sc(binary())}
%% TODO: ssl
];
fields(clientinfo_override) ->
[ {username, t(binary())}
, {password, t(binary())}
, {clientid, t(binary())}
[ {username, sc(binary())}
, {password, sc(binary())}
, {clientid, sc(binary())}
];
fields(translators) ->
[ {command, t(ref(translator))}
, {response, t(ref(translator))}
, {notify, t(ref(translator))}
, {register, t(ref(translator))}
, {update, t(ref(translator))}
[ {command, sc(ref(translator))}
, {response, sc(ref(translator))}
, {notify, sc(ref(translator))}
, {register, sc(ref(translator))}
, {update, sc(ref(translator))}
];
fields(translator) ->
[ {topic, t(binary())}
, {qos, t(range(0, 2))}
[ {topic, sc(binary())}
, {qos, sc(range(0, 2))}
];
fields(udp_listener_group) ->
[ {udp, t(ref(udp_listener))}
, {dtls, t(ref(dtls_listener))}
[ {udp, sc(ref(udp_listener))}
, {dtls, sc(ref(dtls_listener))}
];
fields(tcp_listener_group) ->
[ {tcp, t(ref(tcp_listener))}
, {ssl, t(ref(ssl_listener))}
[ {tcp, sc(ref(tcp_listener))}
, {ssl, sc(ref(ssl_listener))}
];
fields(udp_tcp_listener_group) ->
[ {udp, t(ref(udp_listener))}
, {dtls, t(ref(dtls_listener))}
, {tcp, t(ref(tcp_listener))}
, {ssl, t(ref(ssl_listener))}
[ {udp, sc(ref(udp_listener))}
, {dtls, sc(ref(dtls_listener))}
, {tcp, sc(ref(tcp_listener))}
, {ssl, sc(ref(ssl_listener))}
];
fields(tcp_listener) ->
[ {"$name", t(ref(tcp_listener_settings))}];
[ {"$name", sc(ref(tcp_listener_settings))}];
fields(ssl_listener) ->
[ {"$name", t(ref(ssl_listener_settings))}];
[ {"$name", sc(ref(ssl_listener_settings))}];
fields(udp_listener) ->
[ {"$name", t(ref(udp_listener_settings))}];
[ {"$name", sc(ref(udp_listener_settings))}];
fields(dtls_listener) ->
[ {"$name", t(ref(dtls_listener_settings))}];
[ {"$name", sc(ref(dtls_listener_settings))}];
fields(listener_settings) ->
[ {enable, t(boolean(), undefined, true)}
, {bind, t(union(ip_port(), integer()))}
, {acceptors, t(integer(), undefined, 8)}
, {max_connections, t(integer(), undefined, 1024)}
, {max_conn_rate, t(integer())}
, {active_n, t(integer(), undefined, 100)}
%, {rate_limit, t(comma_separated_list())}
, {access, t(ref(access))}
, {proxy_protocol, t(boolean())}
, {proxy_protocol_timeout, t(duration())}
, {backlog, t(integer(), undefined, 1024)}
, {send_timeout, t(duration(), undefined, <<"15s">>)}
, {send_timeout_close, t(boolean(), undefined, true)}
, {recbuf, t(bytesize())}
, {sndbuf, t(bytesize())}
, {buffer, t(bytesize())}
, {high_watermark, t(bytesize(), undefined, <<"1MB">>)}
, {tune_buffer, t(boolean())}
, {nodelay, t(boolean())}
, {reuseaddr, t(boolean())}
[ {enable, sc(boolean(), undefined, true)}
, {bind, sc(union(ip_port(), integer()))}
, {acceptors, sc(integer(), undefined, 8)}
, {max_connections, sc(integer(), undefined, 1024)}
, {max_conn_rate, sc(integer())}
, {active_n, sc(integer(), undefined, 100)}
%, {rate_limit, sc(comma_separated_list())}
, {access, sc(ref(access))}
, {proxy_protocol, sc(boolean())}
, {proxy_protocol_timeout, sc(duration())}
, {backlog, sc(integer(), undefined, 1024)}
, {send_timeout, sc(duration(), undefined, <<"15s">>)}
, {send_timeout_close, sc(boolean(), undefined, true)}
, {recbuf, sc(bytesize())}
, {sndbuf, sc(bytesize())}
, {buffer, sc(bytesize())}
, {high_watermark, sc(bytesize(), undefined, <<"1MB">>)}
, {tune_buffer, sc(boolean())}
, {nodelay, sc(boolean())}
, {reuseaddr, sc(boolean())}
];
fields(tcp_listener_settings) ->
@ -242,12 +243,12 @@ authentication() ->
]).
gateway_common_options() ->
[ {enable, t(boolean(), undefined, true)}
, {enable_stats, t(boolean(), undefined, true)}
, {idle_timeout, t(duration(), undefined, <<"30s">>)}
, {mountpoint, t(binary())}
, {clientinfo_override, t(ref(clientinfo_override))}
, {authentication, t(authentication(), undefined, undefined)}
[ {enable, sc(boolean(), undefined, true)}
, {enable_stats, sc(boolean(), undefined, true)}
, {idle_timeout, sc(duration(), undefined, <<"30s">>)}
, {mountpoint, sc(binary())}
, {clientinfo_override, sc(ref(clientinfo_override))}
, {authentication, sc(authentication(), undefined, undefined)}
].
%%--------------------------------------------------------------------
@ -255,16 +256,10 @@ gateway_common_options() ->
%% types
t(Type) -> #{type => Type}.
sc(Type) -> #{type => Type}.
t(Type, Mapping, Default) ->
hoconsc:t(Type, #{mapping => Mapping, default => Default}).
t(Type, Mapping, Default, OverrideEnv) ->
hoconsc:t(Type, #{ mapping => Mapping
, default => Default
, override_env => OverrideEnv
}).
sc(Type, Mapping, Default) ->
hoconsc:mk(Type, #{mapping => Mapping, default => Default}).
ref(Field) ->
hoconsc:ref(?MODULE, Field).
@ -273,10 +268,10 @@ ref(Field) ->
%% generate a ssl field.
%% ssl("emqx", #{"verify" => verify_peer}) will return
%% [ {"cacertfile", t(string(), "emqx.cacertfile", undefined)}
%% , {"certfile", t(string(), "emqx.certfile", undefined)}
%% , {"keyfile", t(string(), "emqx.keyfile", undefined)}
%% , {"verify", t(union(verify_peer, verify_none), "emqx.verify", verify_peer)}
%% [ {"cacertfile", sc(string(), "emqx.cacertfile", undefined)}
%% , {"certfile", sc(string(), "emqx.certfile", undefined)}
%% , {"keyfile", sc(string(), "emqx.keyfile", undefined)}
%% , {"verify", sc(union(verify_peer, verify_none), "emqx.verify", verify_peer)}
%% , {"server_name_indication", "emqx.server_name_indication", undefined)}
%% ...
ssl(Mapping, Defaults) ->
@ -286,24 +281,24 @@ ssl(Mapping, Defaults) ->
_ -> Mapping ++ "." ++ Field
end end,
D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end,
[ {"enable", t(boolean(), M("enable"), D("enable"))}
, {"cacertfile", t(binary(), M("cacertfile"), D("cacertfile"))}
, {"certfile", t(binary(), M("certfile"), D("certfile"))}
, {"keyfile", t(binary(), M("keyfile"), D("keyfile"))}
, {"verify", t(union(verify_peer, verify_none), M("verify"), D("verify"))}
, {"fail_if_no_peer_cert", t(boolean(), M("fail_if_no_peer_cert"), D("fail_if_no_peer_cert"))}
, {"secure_renegotiate", t(boolean(), M("secure_renegotiate"), D("secure_renegotiate"))}
, {"reuse_sessions", t(boolean(), M("reuse_sessions"), D("reuse_sessions"))}
, {"honor_cipher_order", t(boolean(), M("honor_cipher_order"), D("honor_cipher_order"))}
, {"handshake_timeout", t(duration(), M("handshake_timeout"), D("handshake_timeout"))}
, {"depth", t(integer(), M("depth"), D("depth"))}
, {"password", hoconsc:t(binary(), #{mapping => M("key_password"),
default => D("key_password"),
sensitive => true
[ {"enable", sc(boolean(), M("enable"), D("enable"))}
, {"cacertfile", sc(binary(), M("cacertfile"), D("cacertfile"))}
, {"certfile", sc(binary(), M("certfile"), D("certfile"))}
, {"keyfile", sc(binary(), M("keyfile"), D("keyfile"))}
, {"verify", sc(union(verify_peer, verify_none), M("verify"), D("verify"))}
, {"fail_if_no_peer_cert", sc(boolean(), M("fail_if_no_peer_cert"), D("fail_if_no_peer_cert"))}
, {"secure_renegotiate", sc(boolean(), M("secure_renegotiate"), D("secure_renegotiate"))}
, {"reuse_sessions", sc(boolean(), M("reuse_sessions"), D("reuse_sessions"))}
, {"honor_cipher_order", sc(boolean(), M("honor_cipher_order"), D("honor_cipher_order"))}
, {"handshake_timeout", sc(duration(), M("handshake_timeout"), D("handshake_timeout"))}
, {"depth", sc(integer(), M("depth"), D("depth"))}
, {"password", hoconsc:mk(binary(), #{ mapping => M("key_password")
, default => D("key_password")
, sensitive => true
})}
, {"dhfile", t(binary(), M("dhfile"), D("dhfile"))}
, {"server_name_indication", t(union(disable, binary()), M("server_name_indication"),
, {"dhfile", sc(binary(), M("dhfile"), D("dhfile"))}
, {"server_name_indication", sc(union(disable, binary()), M("server_name_indication"),
D("server_name_indication"))}
, {"tls_versions", t(comma_separated_list(), M("tls_versions"), D("tls_versions"))}
, {"ciphers", t(comma_separated_list(), M("ciphers"), D("ciphers"))}
, {"psk_ciphers", t(comma_separated_list(), M("ciphers"), D("ciphers"))}].
, {"tls_versions", sc(comma_separated_list(), M("tls_versions"), D("tls_versions"))}
, {"ciphers", sc(comma_separated_list(), M("ciphers"), D("ciphers"))}
, {"psk_ciphers", sc(comma_separated_list(), M("ciphers"), D("ciphers"))}].

View File

@ -23,6 +23,7 @@
-dialyzer(no_fail_call).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all.
-type file() :: string().
@ -34,8 +35,7 @@
file/0,
cipher/0]).
-export([roots/0, fields/1, translations/0, translation/1]).
-export([t/1, t/3, t/4, ref/1]).
-export([namespace/0, roots/0, fields/1, translations/0, translation/1]).
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
%% Static apps which merge their configs into the merged emqx.conf
@ -58,167 +58,406 @@
, emqx_exhook_schema
]).
%% TODO: add a test case to ensure the list elements are unique
namespace() -> undefined.
roots() ->
["cluster", "node", "rpc", "log"]
++ lists:flatmap(fun(Mod) -> Mod:roots() end, ?MERGED_CONFIGS).
["cluster", "node", "rpc", "log"] ++ lists:flatmap(fun roots/1, ?MERGED_CONFIGS).
fields("cluster") ->
[ {"name", t(atom(), "ekka.cluster_name", emqxcl)}
, {"discovery_strategy", t(union([manual, static, mcast, dns, etcd, k8s]),
undefined, manual)}
, {"autoclean", t(emqx_schema:duration(), "ekka.cluster_autoclean", "5m")}
, {"autoheal", t(boolean(), "ekka.cluster_autoheal", true)}
, {"static", ref("static")}
, {"mcast", ref("mcast")}
, {"proto_dist", t(union([inet_tcp, inet6_tcp, inet_tls]), "ekka.proto_dist", inet_tcp)}
, {"dns", ref("dns")}
, {"etcd", ref("etcd")}
, {"k8s", ref("k8s")}
, {"db_backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)}
, {"rlog", ref("rlog")}
[ {"name",
sc(atom(),
#{ mapping => "ekka.cluster_name"
, default => emqxcl
})}
, {"discovery_strategy",
sc(union([manual, static, mcast, dns, etcd, k8s]),
#{ default => manual
})}
, {"autoclean",
sc(emqx_schema:duration(),
#{ mapping => "ekka.cluster_autoclean"
, default => "5m"
})}
, {"autoheal",
sc(boolean(),
#{ mapping => "ekka.cluster_autoheal"
, default => true
})}
, {"static",
sc(ref(cluster_static),
#{})}
, {"mcast",
sc(ref(cluster_mcast),
#{})}
, {"proto_dist",
sc(union([inet_tcp, inet6_tcp, inet_tls]),
#{ mapping => "ekka.proto_dist"
, default => inet_tcp
})}
, {"dns",
sc(ref(cluster_dns),
#{})}
, {"etcd",
sc(ref(cluster_etcd),
#{})}
, {"k8s",
sc(ref(cluster_k8s),
#{})}
, {"db_backend",
sc(union([mnesia, rlog]),
#{ mapping => "ekka.db_backend"
, default => mnesia
})}
, {"rlog",
sc(ref("rlog"),
#{})}
];
fields("static") ->
[ {"seeds", t(hoconsc:array(string()), undefined, [])}];
fields("mcast") ->
[ {"addr", t(string(), undefined, "239.192.0.1")}
, {"ports", t(hoconsc:array(integer()), undefined, [4369, 4370])}
, {"iface", t(string(), undefined, "0.0.0.0")}
, {"ttl", t(range(0, 255), undefined, 255)}
, {"loop", t(boolean(), undefined, true)}
, {"sndbuf", t(emqx_schema:bytesize(), undefined, "16KB")}
, {"recbuf", t(emqx_schema:bytesize(), undefined, "16KB")}
, {"buffer", t(emqx_schema:bytesize(), undefined, "32KB")}
fields(cluster_static) ->
[ {"seeds",
sc(hoconsc:array(string()),
#{ default => []
})}
];
fields("dns") ->
[ {"name", t(string(), undefined, "localhost")}
, {"app", t(string(), undefined, "emqx")}];
fields("etcd") ->
[ {"server", t(emqx_schema:comma_separated_list())}
, {"prefix", t(string(), undefined, "emqxcl")}
, {"node_ttl", t(emqx_schema:duration(), undefined, "1m")}
, {"ssl", ref("etcd_ssl")}
fields(cluster_mcast) ->
[ {"addr",
sc(string(),
#{ default => "239.192.0.1"
})}
, {"ports",
sc(hoconsc:array(integer()),
#{ default => [4369, 4370]
})}
, {"iface",
sc(string(),
#{ default => "0.0.0.0"
})}
, {"ttl",
sc(range(0, 255),
#{ default => 255
})}
, {"loop",
sc(boolean(),
#{ default => true
})}
, {"sndbuf",
sc(emqx_schema:bytesize(),
#{ default => "16KB"
})}
, {"recbuf",
sc(emqx_schema:bytesize(),
#{ default => "16KB"
})}
, {"buffer",
sc(emqx_schema:bytesize(),
#{ default =>"32KB"
})}
];
fields("etcd_ssl") ->
fields(cluster_dns) ->
[ {"name",
sc(string(),
#{ default => "localhost"
})}
, {"app",
sc(string(),
#{ default => "emqx"
})}
];
fields(cluster_etcd) ->
[ {"server",
sc(emqx_schema:comma_separated_list(),
#{})}
, {"prefix",
sc(string(),
#{ default => "emqxcl"
})}
, {"node_ttl",
sc(emqx_schema:duration(),
#{ default => "1m"
})}
, {"ssl",
sc(ref(etcd_ssl_opts),
#{})}
];
fields(etcd_ssl_opts) ->
emqx_schema:ssl(#{});
fields("k8s") ->
[ {"apiserver", t(string())}
, {"service_name", t(string(), undefined, "emqx")}
, {"address_type", t(union([ip, dns, hostname]))}
, {"app_name", t(string(), undefined, "emqx")}
, {"namespace", t(string(), undefined, "default")}
, {"suffix", t(string(), undefined, "pod.local")}
fields(cluster_k8s) ->
[ {"apiserver",
sc(string(),
#{})}
, {"service_name",
sc(string(),
#{ default => "emqx"
})}
, {"address_type",
sc(union([ip, dns, hostname]),
#{})}
, {"app_name",
sc(string(),
#{ default => "emqx"
})}
, {"namespace",
sc(string(),
#{ default => "default"
})}
, {"suffix",
sc(string(),
#{default => "pod.local"
})}
];
fields("rlog") ->
[ {"role", t(union([core, replicant]), "ekka.node_role", core)}
, {"core_nodes", t(emqx_schema:comma_separated_atoms(), "ekka.core_nodes", [])}
[ {"role",
sc(union([core, replicant]),
#{ mapping => "ekka.node_role"
, default => core
})}
, {"core_nodes",
sc(emqx_schema:comma_separated_atoms(),
#{ mapping => "ekka.core_nodes"
, default => []
})}
];
fields("node") ->
[ {"name", hoconsc:t(string(), #{default => "emqx@127.0.0.1",
override_env => "EMQX_NODE_NAME"
})}
, {"cookie", hoconsc:t(string(), #{mapping => "vm_args.-setcookie",
default => "emqxsecretcookie",
sensitive => true,
override_env => "EMQX_NODE_COOKIE"
})}
, {"data_dir", hoconsc:t(string(), #{nullable => false})}
, {"config_files", t(list(string()), "emqx.config_files", undefined)}
, {"global_gc_interval", t(emqx_schema:duration(), undefined, "15m")}
, {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
, {"dist_net_ticktime", t(emqx_schema:duration(), "vm_args.-kernel net_ticktime", "2m")}
, {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)}
, {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)}
, {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)}
, {"cluster_call", ref("cluster_call")}
[ {"name",
sc(string(),
#{ default => "emqx@127.0.0.1"
, override_env => "EMQX_NODE_NAME"
})}
, {"cookie",
sc(string(),
#{ mapping => "vm_args.-setcookie",
default => "emqxsecretcookie",
sensitive => true,
override_env => "EMQX_NODE_COOKIE"
})}
, {"data_dir",
sc(string(),
#{ nullable => false
})}
, {"config_files",
sc(list(string()),
#{ mapping => "emqx.config_files"
, default => undefined
})}
, {"global_gc_interval",
sc(emqx_schema:duration(),
#{ default => "15m"
})}
, {"crash_dump_dir",
sc(file(),
#{ mapping => "vm_args.-env ERL_CRASH_DUMP"
})}
, {"dist_net_ticktime",
sc(emqx_schema:duration(),
#{ mapping => "vm_args.-kernel net_ticktime"
, default => "2m"
})}
, {"dist_listen_min",
sc(range(1024, 65535),
#{ mapping => "kernel.inet_dist_listen_min"
, default => 6369
})}
, {"dist_listen_max",
sc(range(1024, 65535),
#{ mapping => "kernel.inet_dist_listen_max"
, default => 6369
})}
, {"backtrace_depth",
sc(integer(),
#{ mapping => "emqx_machine.backtrace_depth"
, default => 23
})}
, {"cluster_call",
sc(ref("cluster_call"),
#{}
)}
];
fields("cluster_call") ->
[ {"retry_interval", t(emqx_schema:duration(), "emqx_machine.retry_interval", "1s")}
, {"max_history", t(range(1, 500), "emqx_machine.max_history", 100)}
, {"cleanup_interval", t(emqx_schema:duration(), "emqx_machine.cleanup_interval", "5m")}
[ {"retry_interval",
sc(emqx_schema:duration(),
#{ mapping => "emqx_machine.retry_interval"
, default => "1s"
})}
, {"max_history",
sc(range(1, 500),
#{mapping => "emqx_machine.max_history",
default => 100
})}
, {"cleanup_interval",
sc(emqx_schema:duration(),
#{mapping => "emqx_machine.cleanup_interval",
default => "5m"
})}
];
fields("rpc") ->
[ {"mode", t(union(sync, async), undefined, async)}
, {"async_batch_size", t(integer(), "gen_rpc.max_batch_size", 256)}
, {"port_discovery",t(union(manual, stateless), "gen_rpc.port_discovery", stateless)}
, {"tcp_server_port", t(integer(), "gen_rpc.tcp_server_port", 5369)}
, {"tcp_client_num", t(range(1, 256), undefined, 1)}
, {"connect_timeout", t(emqx_schema:duration(), "gen_rpc.connect_timeout", "5s")}
, {"send_timeout", t(emqx_schema:duration(), "gen_rpc.send_timeout", "5s")}
, {"authentication_timeout", t(emqx_schema:duration(), "gen_rpc.authentication_timeout", "5s")}
, {"call_receive_timeout", t(emqx_schema:duration(), "gen_rpc.call_receive_timeout", "15s")}
, {"socket_keepalive_idle", t(emqx_schema:duration_s(), "gen_rpc.socket_keepalive_idle", "7200s")}
, {"socket_keepalive_interval", t(emqx_schema:duration_s(), "gen_rpc.socket_keepalive_interval", "75s")}
, {"socket_keepalive_count", t(integer(), "gen_rpc.socket_keepalive_count", 9)}
, {"socket_sndbuf", t(emqx_schema:bytesize(), "gen_rpc.socket_sndbuf", "1MB")}
, {"socket_recbuf", t(emqx_schema:bytesize(), "gen_rpc.socket_recbuf", "1MB")}
, {"socket_buffer", t(emqx_schema:bytesize(), "gen_rpc.socket_buffer", "1MB")}
[ {"mode",
sc(union(sync, async),
#{ default => async
})}
, {"async_batch_size",
sc(integer(),
#{ mapping => "gen_rpc.max_batch_size"
, default => 256
})}
, {"port_discovery",
sc(union(manual, stateless),
#{ mapping => "gen_rpc.port_discovery"
, default => stateless
})}
, {"tcp_server_port",
sc(integer(),
#{ mapping => "gen_rpc.tcp_server_port"
, default => 5369
})}
, {"tcp_client_num",
sc(range(1, 256),
#{ default => 1
})}
, {"connect_timeout",
sc(emqx_schema:duration(),
#{ mapping => "gen_rpc.connect_timeout",
default => "5s"
})}
, {"send_timeout",
sc(emqx_schema:duration(),
#{ mapping => "gen_rpc.send_timeout"
, default => "5s"
})}
, {"authentication_timeout",
sc(emqx_schema:duration(),
#{ mapping=> "gen_rpc.authentication_timeout"
, default => "5s"
})}
, {"call_receive_timeout",
sc(emqx_schema:duration(),
#{ mapping => "gen_rpc.call_receive_timeout"
, default => "15s"
})}
, {"socket_keepalive_idle",
sc(emqx_schema:duration_s(),
#{ mapping => "gen_rpc.socket_keepalive_idle"
, default => "7200s"
})}
, {"socket_keepalive_interval",
sc(emqx_schema:duration_s(),
#{ mapping => "gen_rpc.socket_keepalive_interval",
default => "75s"
})}
, {"socket_keepalive_count",
sc(integer(),
#{ mapping => "gen_rpc.socket_keepalive_count"
, default => 9
})}
, {"socket_sndbuf",
sc(emqx_schema:bytesize(),
#{ mapping => "gen_rpc.socket_sndbuf"
, default => "1MB"
})}
, {"socket_recbuf",
sc(emqx_schema:bytesize(),
#{ mapping => "gen_rpc.socket_recbuf"
, default => "1MB"
})}
, {"socket_buffer",
sc(emqx_schema:bytesize(),
#{ mapping => "gen_rpc.socket_buffer"
, default => "1MB"
})}
];
fields("log") ->
[ {"console_handler", ref("console_handler")}
, {"file_handlers", ref("file_handlers")}
, {"error_logger", t(atom(), "kernel.error_logger", silent)}
, {"file_handlers",
sc(ref("file_handlers"),
#{})}
, {"error_logger",
sc(atom(),
#{mapping => "kernel.error_logger",
default => silent})}
];
fields("console_handler") ->
[ {"enable", t(boolean(), undefined, false)}
[ {"enable",
sc(boolean(),
#{ default => false
})}
] ++ log_handler_common_confs();
fields("file_handlers") ->
[ {"$name", ref("log_file_handler")}
[ {"$name",
sc(ref("log_file_handler"),
#{})}
];
fields("log_file_handler") ->
[ {"file", t(file(), undefined, undefined)}
, {"rotation", ref("log_rotation")}
, {"max_size", #{type => union([infinity, emqx_schema:bytesize()]),
default => "10MB"}}
[ {"file",
sc(file(),
#{})}
, {"rotation",
sc(ref("log_rotation"),
#{})}
, {"max_size",
sc(union([infinity, emqx_schema:bytesize()]),
#{ default => "10MB"
})}
] ++ log_handler_common_confs();
fields("log_rotation") ->
[ {"enable", t(boolean(), undefined, true)}
, {"count", t(range(1, 2048), undefined, 10)}
[ {"enable",
sc(boolean(),
#{ default => true
})}
, {"count",
sc(range(1, 2048),
#{ default => 10
})}
];
fields("log_overload_kill") ->
[ {"enable", t(boolean(), undefined, true)}
, {"mem_size", t(emqx_schema:bytesize(), undefined, "30MB")}
, {"qlen", t(integer(), undefined, 20000)}
, {"restart_after", t(union(emqx_schema:duration(), infinity), undefined, "5s")}
[ {"enable",
sc(boolean(),
#{ default => true
})}
, {"mem_size",
sc(emqx_schema:bytesize(),
#{ default => "30MB"
})}
, {"qlen",
sc(integer(),
#{ default => 20000
})}
, {"restart_after",
sc(union(emqx_schema:duration(), infinity),
#{ default => "5s"
})}
];
fields("log_burst_limit") ->
[ {"enable", t(boolean(), undefined, true)}
, {"max_count", t(integer(), undefined, 10000)}
, {"window_time", t(emqx_schema:duration(), undefined, "1s")}
[ {"enable",
sc(boolean(),
#{ default => true
})}
, {"max_count",
sc(integer(),
#{ default => 10000
})}
, {"window_time",
sc(emqx_schema:duration(),
#{default => "1s"})}
];
fields("authorization") ->
emqx_schema:fields("authorization") ++
emqx_authz_schema:fields("authorization");
fields(Name) ->
find_field(Name, ?MERGED_CONFIGS).
find_field(Name, []) ->
error({unknown_config_struct_field, Name});
find_field(Name, [SchemaModule | Rest]) ->
case lists:member(bin(Name), hocon_schema:root_names(SchemaModule)) of
true -> SchemaModule:fields(Name);
false -> find_field(Name, Rest)
end.
emqx_authz_schema:fields("authorization").
translations() -> ["ekka", "kernel", "emqx"].
@ -302,20 +541,52 @@ tr_logger(Conf) ->
[{handler, default, undefined}] ++ ConsoleHandler ++ FileHandlers.
log_handler_common_confs() ->
[ {"level", t(log_level(), undefined, warning)}
, {"time_offset", t(string(), undefined, "system")}
, {"chars_limit", #{type => hoconsc:union([unlimited, range(1, inf)]),
default => unlimited
}}
, {"formatter", t(union([text, json]), undefined, text)}
, {"single_line", t(boolean(), undefined, true)}
, {"sync_mode_qlen", t(integer(), undefined, 100)}
, {"drop_mode_qlen", t(integer(), undefined, 3000)}
, {"flush_qlen", t(integer(), undefined, 8000)}
, {"overload_kill", ref("log_overload_kill")}
, {"burst_limit", ref("log_burst_limit")}
, {"supervisor_reports", t(union([error, progress]), undefined, error)}
, {"max_depth", t(union([unlimited, integer()]), undefined, 100)}
[ {"level",
sc(log_level(),
#{ default => warning
})}
, {"time_offset",
sc(string(),
#{ default => "system"
})}
, {"chars_limit",
sc(hoconsc:union([unlimited, range(1, inf)]),
#{ default => unlimited
})}
, {"formatter",
sc(union([text, json]),
#{ default => text
})}
, {"single_line",
sc(boolean(),
#{ default => true
})}
, {"sync_mode_qlen",
sc(integer(),
#{ default => 100
})}
, {"drop_mode_qlen",
sc(integer(),
#{ default => 3000
})}
, {"flush_qlen",
sc(integer(),
#{ default => 8000
})}
, {"overload_kill",
sc(ref("log_overload_kill"),
#{})}
, {"burst_limit",
sc(ref("log_burst_limit"),
#{})}
, {"supervisor_reports",
sc(union([error, progress]),
#{ default => error
})}
, {"max_depth",
sc(union([unlimited, integer()]),
#{ default => 100
})}
].
log_handler_conf(Conf) ->
@ -424,18 +695,9 @@ keys(Parent, Conf) ->
%% types
t(Type) -> hoconsc:t(Type).
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
t(Type, Mapping, Default) ->
hoconsc:t(Type, #{mapping => Mapping, default => Default}).
t(Type, Mapping, Default, OverrideEnv) ->
hoconsc:t(Type, #{ mapping => Mapping
, default => Default
, override_env => OverrideEnv
}).
ref(Field) -> hoconsc:t(hoconsc:ref(Field)).
ref(Field) -> hoconsc:ref(?MODULE, Field).
options(static, Conf) ->
[{seeds, [to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, [])]}];
@ -475,6 +737,6 @@ to_atom(Str) when is_list(Str) ->
to_atom(Bin) when is_binary(Bin) ->
binary_to_atom(Bin, utf8).
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
bin(Bin) when is_binary(Bin) -> Bin;
bin(L) when is_list(L) -> iolist_to_binary(L).
roots(Module) ->
lists:map(fun({_BinName, Root}) -> Root end,
maps:to_list(hocon_schema:roots(Module))).

View File

@ -19,9 +19,12 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1]).
namespace() -> management.
roots() -> [].
fields(_) -> [].

View File

@ -20,9 +20,12 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1]).
namespace() -> modules.
roots() ->
["delayed",
"recon",
@ -33,32 +36,34 @@ roots() ->
fields(Name) when Name =:= "recon";
Name =:= "telemetry" ->
[ {enable, emqx_schema:t(boolean(), undefined, false)}
[ {enable, hoconsc:mk(boolean(), #{default => false})}
];
fields("delayed") ->
[ {enable, emqx_schema:t(boolean(), undefined, false)}
, {max_delayed_messages, emqx_schema:t(integer())}
[ {enable, hoconsc:mk(boolean(), #{default => false})}
, {max_delayed_messages, sc(integer(), #{})}
];
fields("rewrite") ->
[ {action, hoconsc:enum([publish, subscribe])}
, {source_topic, emqx_schema:t(binary())}
, {re, emqx_schema:t(binary())}
, {dest_topic, emqx_schema:t(binary())}
, {source_topic, sc(binary(), #{})}
, {re, sc(binary(), #{})}
, {dest_topic, sc(binary(), #{})}
];
fields("event_message") ->
[ {"$event/client_connected", emqx_schema:t(boolean(), undefined, false)}
, {"$event/client_disconnected", emqx_schema:t(boolean(), undefined, false)}
, {"$event/client_subscribed", emqx_schema:t(boolean(), undefined, false)}
, {"$event/client_unsubscribed", emqx_schema:t(boolean(), undefined, false)}
, {"$event/message_delivered", emqx_schema:t(boolean(), undefined, false)}
, {"$event/message_acked", emqx_schema:t(boolean(), undefined, false)}
, {"$event/message_dropped", emqx_schema:t(boolean(), undefined, false)}
[ {"$event/client_connected", sc(boolean(), #{default => false})}
, {"$event/client_disconnected", sc(boolean(), #{default => false})}
, {"$event/client_subscribed", sc(boolean(), #{default => false})}
, {"$event/client_unsubscribed", sc(boolean(), #{default => false})}
, {"$event/message_delivered", sc(boolean(), #{default => false})}
, {"$event/message_acked", sc(boolean(), #{default => false})}
, {"$event/message_dropped", sc(boolean(), #{default => false})}
];
fields("topic_metrics") ->
[{topic, emqx_schema:t(binary())}].
[{topic, sc(binary(), #{})}].
array(Name) -> {Name, hoconsc:array(hoconsc:ref(Name))}.
array(Name) -> {Name, hoconsc:array(hoconsc:ref(?MODULE, Name))}.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -19,13 +19,18 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1]).
namespace() -> "prometheus".
roots() -> ["prometheus"].
fields("prometheus") ->
[ {push_gateway_server, emqx_schema:t(string())}
, {interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "15s")}
, {enable, emqx_schema:t(boolean(), undefined, false)}
[ {push_gateway_server, sc(string(), #{})}
, {interval, sc(emqx_schema:duration_ms(), #{default => "15s"})}
, {enable, sc(boolean(), #{default => false})}
].
sc(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -4,40 +4,40 @@
-export([roots/0, fields/1]).
-define(TYPE(Type), hoconsc:t(Type)).
-define(TYPE(Type), hoconsc:mk(Type)).
roots() -> ["emqx_retainer"].
fields("emqx_retainer") ->
[ {enable, t(boolean(), false)}
, {msg_expiry_interval, t(emqx_schema:duration_ms(), "0s")}
, {msg_clear_interval, t(emqx_schema:duration_ms(), "0s")}
[ {enable, sc(boolean(), false)}
, {msg_expiry_interval, sc(emqx_schema:duration_ms(), "0s")}
, {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")}
, {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))}
, {max_payload_size, t(emqx_schema:bytesize(), "1MB")}
, {max_payload_size, sc(emqx_schema:bytesize(), "1MB")}
, {config, config()}
];
fields(mnesia_config) ->
[ {type, ?TYPE(hoconsc:union([built_in_database]))}
, {storage_type, t(hoconsc:union([ram, disc, disc_only]), ram)}
, {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)}
, {storage_type, sc(hoconsc:union([ram, disc, disc_only]), ram)}
, {max_retained_messages, sc(integer(), 0, fun is_pos_integer/1)}
];
fields(flow_control) ->
[ {max_read_number, t(integer(), 0, fun is_pos_integer/1)}
, {msg_deliver_quota, t(integer(), 0, fun is_pos_integer/1)}
, {quota_release_interval, t(emqx_schema:duration_ms(), "0ms")}
[ {max_read_number, sc(integer(), 0, fun is_pos_integer/1)}
, {msg_deliver_quota, sc(integer(), 0, fun is_pos_integer/1)}
, {quota_release_interval, sc(emqx_schema:duration_ms(), "0ms")}
].
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
t(Type, Default) ->
hoconsc:t(Type, #{default => Default}).
sc(Type, Default) ->
hoconsc:mk(Type, #{default => Default}).
t(Type, Default, Validator) ->
hoconsc:t(Type, #{default => Default,
validator => Validator}).
sc(Type, Default, Validator) ->
hoconsc:mk(Type, #{default => Default,
validator => Validator}).
is_pos_integer(V) ->
V >= 0.

View File

@ -20,10 +20,13 @@
-behaviour(hocon_schema).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1]).
roots() -> ["emqx_rule_engine"].
namespace() -> rule_engine.
fields("emqx_rule_engine") ->
[{ignore_sys_message, emqx_schema:t(boolean(), undefined, true)}].
roots() -> ["rule_engine"].
fields("rule_engine") ->
[{ignore_sys_message, hoconsc:mk(boolean(), #{default => true})}].

View File

@ -594,5 +594,6 @@ printable_maps(Headers) ->
end, #{}, Headers).
ignore_sys_message(#message{flags = Flags}) ->
ConfigRootKey = emqx_rule_engine_schema:namespace(),
maps:get(sys, Flags, false) andalso
emqx:get_config([emqx_rule_engine, ignore_sys_message]).
emqx:get_config([ConfigRootKey, ignore_sys_message]).

View File

@ -6,15 +6,18 @@
-export([to_ip_port/1]).
-export([ roots/0
-export([ namespace/0
, roots/0
, fields/1]).
-typerefl_from_string({ip_port/0, emqx_statsd_schema, to_ip_port}).
namespace() -> "statsd".
roots() -> ["statsd"].
fields("statsd") ->
[ {enable, emqx_schema:t(boolean(), undefined, false)}
[ {enable, hoconsc:mk(boolean(), #{default => false})}
, {server, fun server/1}
, {sample_time_interval, fun duration_ms/1}
, {flush_time_interval, fun duration_ms/1}

View File

@ -60,7 +60,7 @@
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
, {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.14.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.15.0"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.0"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
]}.