Add more configurations for EMQ X R3.0

This commit is contained in:
Feng Lee 2018-08-08 19:23:32 +08:00
parent 8418be0a5b
commit 4cf1815030
2 changed files with 973 additions and 829 deletions

File diff suppressed because it is too large Load Diff

View File

@ -165,10 +165,10 @@
end}. end}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Erlang Node %% Node
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Erlang node name %% @doc Node name
{mapping, "node.name", "vm_args.-name", [ {mapping, "node.name", "vm_args.-name", [
{default, "emqx@127.0.0.1"} {default, "emqx@127.0.0.1"}
]}. ]}.
@ -321,7 +321,7 @@ end}.
]}. ]}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% RPC Args %% RPC
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% RPC server port. %% RPC server port.
@ -550,368 +550,39 @@ end}.
]}. ]}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Allow Anonymous and Default ACL %% Authentication/ACL
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Allow Anonymous %% @doc Allow anonymous authentication.
{mapping, "mqtt.allow_anonymous", "emqx.allow_anonymous", [ {mapping, "allow_anonymous", "emqx.allow_anonymous", [
{default, false}, {default, false},
{datatype, {enum, [true, false]}} {datatype, {enum, [true, false]}}
]}. ]}.
%% @doc ACL nomatch %% @doc Default ACL file.
{mapping, "mqtt.acl_nomatch", "emqx.acl_nomatch", [ {mapping, "acl_file", "emqx.acl_file", [
{default, allow},
{datatype, {enum, [allow, deny]}}
]}.
%% @doc Default ACL File
{mapping, "mqtt.acl_file", "emqx.acl_file", [
{datatype, string}, {datatype, string},
hidden hidden
]}. ]}.
%% @doc Cache ACL for PUBLISH %% @doc Enable ACL cache for publish.
{mapping, "mqtt.cache_acl", "emqx.cache_acl", [ {mapping, "enable_acl_cache", "emqx.enable_acl_cache", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
%%--------------------------------------------------------------------
%% MQTT Protocol
%%--------------------------------------------------------------------
%% @doc Set the Max ClientId Length Allowed.
{mapping, "mqtt.max_clientid_len", "emqx.protocol", [
{default, 1024},
{datatype, integer}
]}.
%% @doc Max Packet Size Allowed, 64K by default.
{mapping, "mqtt.max_packet_size", "emqx.protocol", [
{default, "64KB"},
{datatype, bytesize}
]}.
%% @doc Keepalive backoff
{mapping, "mqtt.keepalive_backoff", "emqx.protocol", [
{default, 1.25},
{datatype, float}
]}.
{translation, "emqx.protocol", fun(Conf) ->
[{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
{max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)},
{keepalive_backoff, cuttlefish:conf_get("mqtt.keepalive_backoff", Conf)}]
end}.
{mapping, "mqtt.websocket_protocol_header", "emqx.websocket_protocol_header", [
{default, on}, {default, on},
{datatype, flag} {datatype, flag}
]}. ]}.
{mapping, "mqtt.websocket_check_upgrade_header", "emqx.websocket_check_upgrade_header", [ %% @doc ACL cache age.
{default, on}, {mapping, "acl_cache_age", "emqx.acl_cache_age", [
{datatype, flag} {default, "5m"},
]}.
%%--------------------------------------------------------------------
%% MQTT Connection
%%--------------------------------------------------------------------
%% @doc Force the client to GC: integer
{mapping, "mqtt.conn.force_gc_count", "emqx.conn_force_gc_count", [
{datatype, integer}
]}.
%%--------------------------------------------------------------------
%% MQTT Client
%%--------------------------------------------------------------------
%% @doc Max Publish Rate of Message
{mapping, "mqtt.client.max_publish_rate", "emqx.client", [
{default, 0},
{datatype, integer}
]}.
%% @doc Client Idle Timeout.
{mapping, "mqtt.client.idle_timeout", "emqx.client", [
{default, "30s"},
{datatype, {duration, ms}} {datatype, {duration, ms}}
]}. ]}.
%% @doc Enable Stats of Client. %% @doc ACL cache size.
{mapping, "mqtt.client.enable_stats", "emqx.client", [ {mapping, "acl_cache_size", "emqx.acl_cache_size", [
{default, off},
{datatype, flag}
]}.
{translation, "emqx.client", fun(Conf) ->
[{max_publish_rate, cuttlefish:conf_get("mqtt.client.max_publish_rate", Conf)},
{client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)},
{client_enable_stats, cuttlefish:conf_get("mqtt.client.enable_stats", Conf)}]
end}.
%%--------------------------------------------------------------------
%% MQTT Session
%%--------------------------------------------------------------------
%% @doc Max Number of Subscriptions Allowed
{mapping, "mqtt.session.max_subscriptions", "emqx.session", [
{default, 0}, {default, 0},
{datatype, integer} {datatype, integer}
]}. ]}.
%% @doc Upgrade QoS?
{mapping, "mqtt.session.upgrade_qos", "emqx.session", [
{default, off},
{datatype, flag}
]}.
%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
%% 0 means no limit
{mapping, "mqtt.session.max_inflight", "emqx.session", [
{default, 100},
{datatype, integer}
]}.
%% @doc Retry interval for redelivering QoS1/2 messages.
{mapping, "mqtt.session.retry_interval", "emqx.session", [
{default, "20s"},
{datatype, {duration, ms}}
]}.
%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
{mapping, "mqtt.session.max_awaiting_rel", "emqx.session", [
{default, 0},
{datatype, integer}
]}.
%% @doc Awaiting PUBREL Timeout
{mapping, "mqtt.session.await_rel_timeout", "emqx.session", [
{default, "20s"},
{datatype, {duration, ms}}
]}.
%% @doc Enable Stats
{mapping, "mqtt.session.enable_stats", "emqx.session", [
{default, off},
{datatype, flag}
]}.
%% @doc Session Expiry Interval
{mapping, "mqtt.session.expiry_interval", "emqx.session", [
{default, "2h"},
{datatype, {duration, ms}}
]}.
%% @doc Ignore message from self publish
{mapping, "mqtt.session.ignore_loop_deliver", "emqx.session", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqx.session", fun(Conf) ->
[{max_subscriptions, cuttlefish:conf_get("mqtt.session.max_subscriptions", Conf)},
{upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)},
{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)},
{retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)},
{max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
{await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)},
{enable_stats, cuttlefish:conf_get("mqtt.session.enable_stats", Conf)},
{expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)},
{ignore_loop_deliver, cuttlefish:conf_get("mqtt.session.ignore_loop_deliver", Conf)}]
end}.
%%--------------------------------------------------------------------
%% MQTT MQueue
%%--------------------------------------------------------------------
%% @doc Type: simple | priority
{mapping, "mqtt.mqueue.type", "emqx.mqueue", [
{default, simple},
{datatype, atom}
]}.
%% @doc Topic Priority: 0~255, Default is 0
{mapping, "mqtt.mqueue.priority", "emqx.mqueue", [
{default, ""},
{datatype, string}
]}.
%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. 0 means no limit.
{mapping, "mqtt.mqueue.max_length", "emqx.mqueue", [
{default, 0},
{datatype, integer}
]}.
%% @doc Low-water mark of queued messages
{mapping, "mqtt.mqueue.low_watermark", "emqx.mqueue", [
{default, "20%"},
{datatype, {percent, float}}
]}.
%% @doc High-water mark of queued messages
{mapping, "mqtt.mqueue.high_watermark", "emqx.mqueue", [
{default, "60%"},
{datatype, {percent, float}}
]}.
%% @doc Queue Qos0 messages?
{mapping, "mqtt.mqueue.store_qos0", "emqx.mqueue", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqx.mqueue", fun(Conf) ->
Opts = [{type, cuttlefish:conf_get("mqtt.mqueue.type", Conf, simple)},
{max_length, cuttlefish:conf_get("mqtt.mqueue.max_length", Conf)},
{low_watermark, cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf)},
{high_watermark, cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf)},
{store_qos0, cuttlefish:conf_get("mqtt.mqueue.store_qos0", Conf)}],
case cuttlefish:conf_get("mqtt.mqueue.priority", Conf) of
undefined -> Opts;
V -> [{priority,
[begin [T, P] = string:tokens(S, "="),
{T, list_to_integer(P)}
end || S <- string:tokens(V, ",")]} | Opts]
end
end}.
%%--------------------------------------------------------------------
%% MQTT Broker
%%--------------------------------------------------------------------
{mapping, "mqtt.broker.sys_interval", "emqx.broker_sys_interval", [
{datatype, {duration, ms}},
{default, "1m"}
]}.
%%--------------------------------------------------------------------
%% MQTT PubSub
%%--------------------------------------------------------------------
{mapping, "mqtt.pubsub.pool_size", "emqx.pubsub", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.pubsub.async", "emqx.pubsub", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqx.pubsub", fun(Conf) ->
[{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)},
{async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}]
end}.
%%--------------------------------------------------------------------
%% MQTT Bridge
%%--------------------------------------------------------------------
{mapping, "mqtt.bridge.max_queue_len", "emqx.bridge", [
{default, 10000},
{datatype, integer}
]}.
{mapping, "mqtt.bridge.ping_down_interval", "emqx.bridge", [
{datatype, {duration, ms}},
{default, "1s"}
]}.
{translation, "emqx.bridge", fun(Conf) ->
[{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)},
{ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}]
end}.
%%-------------------------------------------------------------------
%% Plugins
%%-------------------------------------------------------------------
{mapping, "mqtt.plugins.etc_dir", "emqx.plugins_etc_dir", [
{datatype, string}
]}.
{mapping, "mqtt.plugins.loaded_file", "emqx.plugins_loaded_file", [
{datatype, string}
]}.
{mapping, "mqtt.plugins.expand_plugins_dir", "emqx.expand_plugins_dir", [
{datatype, string}
]}.
%%--------------------------------------------------------------------
%% Modules
%%--------------------------------------------------------------------
{mapping, "module.presence", "emqx.modules", [
{default, off},
{datatype, flag}
]}.
{mapping, "module.presence.qos", "emqx.modules", [
{default, 1},
{datatype, integer},
{validators, ["range:0-2"]}
]}.
{mapping, "module.subscription", "emqx.modules", [
{default, off},
{datatype, flag}
]}.
{mapping, "module.subscription.$id.topic", "emqx.modules", [
{datatype, string}
]}.
{mapping, "module.subscription.$id.qos", "emqx.modules", [
{default, 1},
{datatype, integer},
{validators, ["range:0-2"]}
]}.
{mapping, "module.rewrite", "emqx.modules", [
{default, off},
{datatype, flag}
]}.
{mapping, "module.rewrite.rule.$id", "emqx.modules", [
{datatype, string}
]}.
{translation, "emqx.modules", fun(Conf) ->
Subscriptions = fun() ->
List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf),
QosList = [Qos || {_, Qos} <- lists:sort([{I, Qos} || {[_,"subscription", I,"qos"], Qos} <- List])],
TopicList = [iolist_to_binary(Topic) || {_, Topic} <-
lists:sort([{I, Topic} || {[_,"subscription", I, "topic"], Topic} <- List])],
lists:zip(TopicList, QosList)
end,
Rewrites = fun() ->
Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf),
lists:map(fun({[_, "rewrite", "rule", I], Rule}) ->
[Topic, Re, Dest] = string:tokens(Rule, " "),
{rewrite, list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)}
end, Rules)
end,
lists:append([
case cuttlefish:conf_get("module.presence", Conf) of %% Presence
true -> [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}];
false -> []
end,
case cuttlefish:conf_get("module.subscription", Conf) of %% Subscription
true -> [{emqx_mod_subscription, Subscriptions()}];
false -> []
end,
case cuttlefish:conf_get("module.rewrite", Conf) of %% Rewrite
true -> [{emqx_mod_rewrite, Rewrites()}];
false -> []
end
])
end}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Listeners %% Listeners
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -928,7 +599,7 @@ end}.
{datatype, integer} {datatype, integer}
]}. ]}.
{mapping, "listener.tcp.$name.max_clients", "emqx.listeners", [ {mapping, "listener.tcp.$name.max_connections", "emqx.listeners", [
{default, 1024}, {default, 1024},
{datatype, integer} {datatype, integer}
]}. ]}.
@ -963,7 +634,8 @@ end}.
]}. ]}.
{mapping, "listener.tcp.$name.peer_cert_as_username", "emqx.listeners", [ {mapping, "listener.tcp.$name.peer_cert_as_username", "emqx.listeners", [
{datatype, {enum, [cn, dn]}} {default, false},
{datatype, {enum, [true, false]}}
]}. ]}.
{mapping, "listener.tcp.$name.backlog", "emqx.listeners", [ {mapping, "listener.tcp.$name.backlog", "emqx.listeners", [
@ -1023,7 +695,7 @@ end}.
{datatype, integer} {datatype, integer}
]}. ]}.
{mapping, "listener.ssl.$name.max_clients", "emqx.listeners", [ {mapping, "listener.ssl.$name.max_connections", "emqx.listeners", [
{default, 1024}, {default, 1024},
{datatype, integer} {datatype, integer}
]}. ]}.
@ -1168,7 +840,7 @@ end}.
{datatype, integer} {datatype, integer}
]}. ]}.
{mapping, "listener.ws.$name.max_clients", "emqx.listeners", [ {mapping, "listener.ws.$name.max_connections", "emqx.listeners", [
{default, 1024}, {default, 1024},
{datatype, integer} {datatype, integer}
]}. ]}.
@ -1185,10 +857,20 @@ end}.
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "listener.ws.$name.rate_limit", "emqx.listeners", [
{default, undefined},
{datatype, string}
]}.
{mapping, "listener.ws.$name.access.$id", "emqx.listeners", [ {mapping, "listener.ws.$name.access.$id", "emqx.listeners", [
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "listener.ws.$name.verify_protocol_header", "emqx.listeners", [
{default, on},
{datatype, flag}
]}.
{mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [ {mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [
{datatype, string}, {datatype, string},
hidden hidden
@ -1264,7 +946,7 @@ end}.
{datatype, integer} {datatype, integer}
]}. ]}.
{mapping, "listener.wss.$name.max_clients", "emqx.listeners", [ {mapping, "listener.wss.$name.max_connections", "emqx.listeners", [
{default, 1024}, {default, 1024},
{datatype, integer} {datatype, integer}
]}. ]}.
@ -1285,6 +967,11 @@ end}.
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "listener.wss.$name.verify_protocol_header", "emqx.listeners", [
{default, on},
{datatype, flag}
]}.
{mapping, "listener.wss.$name.access.$id", "emqx.listeners", [ {mapping, "listener.wss.$name.access.$id", "emqx.listeners", [
{datatype, string} {datatype, string}
]}. ]}.
@ -1422,16 +1109,23 @@ end}.
MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end, MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end,
Ratelimit = fun(undefined) ->
undefined;
(S) ->
list_to_tuple([list_to_integer(Token) || Token <- string:tokens(S, ",")])
end,
LisOpts = fun(Prefix) -> LisOpts = fun(Prefix) ->
Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)},
{max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)},
{max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)},
{tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)},
{zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, {zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))},
%%{rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}, {rate_limit, Ratelimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))},
{proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)},
{proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
{mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))}, {mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))},
{verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)},
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
{proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)}, {proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)},
{proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)]) {proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)])
@ -1488,124 +1182,406 @@ end}.
end end
end, end,
ApiListeners = fun(Type, Name) ->
Prefix = string:join(["listener", Type, Name], "."),
case cuttlefish:conf_get(Prefix, Conf, undefined) of
undefined ->
[];
ListenOn ->
SslOpts1 = case SslOpts(Prefix) of [] -> []; SslOpts0 -> [{ssl_options, SslOpts0}] end,
[{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)}|LisOpts(Prefix)] ++ SslOpts1}]
end
end,
lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn}
<- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf)
++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)]
++ ++
[SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn}
<- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf)
++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)] ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)])
++
[ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn}
<- cuttlefish_variable:filter_by_prefix("listener.api", Conf)])
end}. end}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT REST API Listeners %% Zones
%%--------------------------------------------------------------------
{mapping, "listener.api.$name", "emqx.listeners", [ %% @doc Idle timeout of the MQTT connection.
{datatype, [integer, ip]} {mapping, "zone.$name.idle_timeout", "emqx.zones", [
]}. {default, "15s"},
{mapping, "listener.api.$name.acceptors", "emqx.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "listener.api.$name.max_clients", "emqx.listeners", [
{default, 1024},
{datatype, integer}
]}.
{mapping, "listener.api.$name.rate_limit", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.api.$name.access.$id", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.api.$name.backlog", "emqx.listeners", [
{default, 1024},
{datatype, integer}
]}.
{mapping, "listener.api.$name.send_timeout", "emqx.listeners", [
{datatype, {duration, ms}},
{default, "15s"}
]}.
{mapping, "listener.api.$name.send_timeout_close", "emqx.listeners", [
{datatype, flag},
{default, on}
]}.
{mapping, "listener.api.$name.recbuf", "emqx.listeners", [
{datatype, bytesize},
hidden
]}.
{mapping, "listener.api.$name.sndbuf", "emqx.listeners", [
{datatype, bytesize},
hidden
]}.
{mapping, "listener.api.$name.buffer", "emqx.listeners", [
{datatype, bytesize},
hidden
]}.
{mapping, "listener.api.$name.tune_buffer", "emqx.listeners", [
{datatype, flag},
hidden
]}.
{mapping, "listener.api.$name.nodelay", "emqx.listeners", [
{datatype, {enum, [true, false]}},
hidden
]}.
{mapping, "listener.api.$name.reuseaddr", "emqx.listeners", [
{datatype, {enum, [true, false]}},
hidden
]}.
{mapping, "listener.api.$name.handshake_timeout", "emqx.listeners", [
{datatype, {duration, ms}} {datatype, {duration, ms}}
]}. ]}.
{mapping, "listener.api.$name.keyfile", "emqx.listeners", [ %% @doc Enable ACL check.
{mapping, "zone.$name.enable_acl", "emqx.zones", [
{default, off},
{datatype, flag}
]}.
%% @doc Enable per connection statistics.
{mapping, "zone.$name.enable_stats", "emqx.zones", [
{default, off},
{datatype, flag}
]}.
%% @doc Publish limit of the MQTT connections.
{mapping, "zone.$name.publish_limit", "emqx.zones", [
{default, undefined},
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "listener.api.$name.certfile", "emqx.listeners", [ %% @doc Max Packet Size Allowed, 64K by default.
{datatype, string} {mapping, "zone.$name.max_packet_size", "emqx.zones", [
{default, "64KB"},
{datatype, bytesize}
]}. ]}.
{mapping, "listener.api.$name.cacertfile", "emqx.listeners", [ %% @doc Set the Max ClientId Length Allowed.
{datatype, string} {mapping, "zone.$name.max_clientid_len", "emqx.zones", [
{default, 65535},
{datatype, integer}
]}. ]}.
{mapping, "listener.api.$name.verify", "emqx.listeners", [ %% @doc Set the Maximum topic levels.
{datatype, atom} {mapping, "zone.$name.max_topic_levels", "emqx.zones", [
{default, 0},
{datatype, integer}
]}. ]}.
{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqx.listeners", [ %% @doc Set the Maximum QoS allowed.
{mapping, "zone.$name.max_qos_allowed", "emqx.zones", [
{default, 2},
{datatype, integer},
{validators, ["range:0-2"]}
]}.
%% @doc Set the Maximum topic alias.
{mapping, "zone.$name.max_topic_alias", "emqx.zones", [
{default, 0},
{datatype, integer}
]}.
%% @doc Whether the server supports retained messages.
{mapping, "zone.$name.retain_available", "emqx.zones", [
{default, true},
{datatype, {enum, [true, false]}} {datatype, {enum, [true, false]}}
]}. ]}.
%% @doc Whether the Server supports Wildcard Subscriptions.
{mapping, "zone.$name.wildcard_subscription", "emqx.zones", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
%% @doc Whether the Server supports Shared Subscriptions.
{mapping, "zone.$name.shared_subscription", "emqx.zones", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
%% @doc Keepalive backoff
{mapping, "zone.$name.keepalive_backoff", "emqx.zones", [
{default, 0.75},
{datatype, float}
]}.
%% @doc Max Number of Subscriptions Allowed.
{mapping, "zone.$name.max_subscriptions", "emqx.zones", [
{default, 0},
{datatype, integer}
]}.
%% @doc Upgrade QoS according to subscription?
{mapping, "zone.$name.upgrade_qos", "emqx.zones", [
{default, off},
{datatype, flag}
]}.
%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
%% 0 means no limit
{mapping, "zone.$name.max_inflight", "emqx.zones", [
{default, 0},
{datatype, integer}
]}.
%% @doc Retry interval for redelivering QoS1/2 messages.
{mapping, "zone.$name.retry_interval", "emqx.zones", [
{default, "20s"},
{datatype, {duration, ms}}
]}.
%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
{mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [
{default, 0},
{datatype, integer}
]}.
%% @doc Awaiting PUBREL timeout
{mapping, "zone.$name.await_rel_timeout", "emqx.zones", [
{default, "60s"},
{datatype, {duration, ms}}
]}.
%% @doc Ignore message from self publish
{mapping, "zone.$name.ignore_loop_deliver", "emqx.zones", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
%% @doc Session Expiry Interval
{mapping, "zone.$name.session_expiry_interval", "emqx.zones", [
{default, "2h"},
{datatype, {duration, ms}}
]}.
%% @doc Max queue length. Enqueued messages when persistent client
%% disconnected, or inflight window is full. 0 means no limit.
{mapping, "zone.$name.max_mqueue_len", "emqx.zones", [
{default, 0},
{datatype, integer}
]}.
%% @doc Queue Qos0 messages?
{mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqx.zones", fun(Conf) ->
maps:to_list(
lists:foldl(
fun({["zone", Name, Opt], Val}, Acc) ->
ZName = list_to_atom(Name),
case maps:find(ZName, Acc) of
{ok, Opts} ->
maps:put(ZName, [{list_to_atom(Opt), Val} | Opts], Acc);
error ->
maps:put(ZName, [{list_to_atom(Opt), Val}], Acc)
end
end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("zone.", Conf))))
end}.
%%--------------------------------------------------------------------
%% Bridges
%%--------------------------------------------------------------------
{mapping, "bridge.$name.type", "emqx.bridges", [
{default, local},
{datatype, {enum, [local,remote]}}
]}.
{mapping, "bridge.$name.address", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.proto_ver", "emqx.bridges", [
{datatype, {enum, [mqtt3, mqtt4, mqtt5]}}
]}.
{mapping, "bridge.$name.client_id", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.clean_start", "emqx.bridges", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "bridge.$name.username", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.password", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.mountpoint", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.cacertfile", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.certfile", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.keyfile", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.ciphers", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.max_pending_messages", "emqx.bridges", [
{default, 10000},
{datatype, integer}
]}.
{mapping, "bridge.$name.keepalive", "emqx.bridges", [
{default, "10s"},
{datatype, {duration, s}}
]}.
{mapping, "bridge.$name.tls_versions", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.subscription.$id.topic", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.subscription.$id.qos", "emqx.bridges", [
{datatype, integer}
]}.
{translation, "emqx.bridges", fun(Conf) ->
Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
IsSsl = fun(cacertfile) -> true;
(certfile) -> true;
(keyfile) -> true;
(ciphers) -> true;
(tls_versions) -> true;
(_Opt) -> false
end,
Parse = fun(tls_versions, Vers) ->
{versions, [list_to_atom(S) || S <- Split(Vers)]};
(ciphers, Ciphers) ->
{ciphers, Split(Ciphers)};
(Opt, Val) ->
{Opt, Val}
end,
Merge = fun(Opt, Val, Opts) ->
case IsSsl(Opt) of
true ->
SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])],
lists:ukeymerge(1, [{ssl_opts, SslOpts}], Opts);
false ->
[{Opt, Val}|Opts]
end
end,
Subscriptions = fun(Name) ->
Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf),
lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])],
[QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])])
end,
maps:to_list(
lists:foldl(
fun({["bridge", Name, Opt], Val}, Acc) ->
maps:update_with(list_to_atom(Name),
fun(Opts) ->
Merge(list_to_atom(Opt), Val, Opts)
end, [{subscriptions, Subscriptions(Name)}], Acc);
(_, Acc) -> Acc
end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf))))
end}.
%%--------------------------------------------------------------------
%% Modules
%%--------------------------------------------------------------------
{mapping, "module.presence", "emqx.modules", [
{default, off},
{datatype, flag}
]}.
{mapping, "module.presence.qos", "emqx.modules", [
{default, 1},
{datatype, integer},
{validators, ["range:0-2"]}
]}.
{mapping, "module.subscription", "emqx.modules", [
{default, off},
{datatype, flag}
]}.
{mapping, "module.subscription.$id.topic", "emqx.modules", [
{datatype, string}
]}.
{mapping, "module.subscription.$id.qos", "emqx.modules", [
{default, 1},
{datatype, integer},
{validators, ["range:0-2"]}
]}.
{mapping, "module.rewrite", "emqx.modules", [
{default, off},
{datatype, flag}
]}.
{mapping, "module.rewrite.rule.$id", "emqx.modules", [
{datatype, string}
]}.
{translation, "emqx.modules", fun(Conf) ->
Subscriptions = fun() ->
List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf),
QosList = [Qos || {_, Qos} <- lists:sort([{I, Qos} || {[_,"subscription", I,"qos"], Qos} <- List])],
TopicList = [iolist_to_binary(Topic) || {_, Topic} <-
lists:sort([{I, Topic} || {[_,"subscription", I, "topic"], Topic} <- List])],
lists:zip(TopicList, QosList)
end,
Rewrites = fun() ->
Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf),
lists:map(fun({[_, "rewrite", "rule", I], Rule}) ->
[Topic, Re, Dest] = string:tokens(Rule, " "),
{rewrite, list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)}
end, Rules)
end,
lists:append([
case cuttlefish:conf_get("module.presence", Conf) of %% Presence
true -> [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}];
false -> []
end,
case cuttlefish:conf_get("module.subscription", Conf) of %% Subscription
true -> [{emqx_mod_subscription, Subscriptions()}];
false -> []
end,
case cuttlefish:conf_get("module.rewrite", Conf) of %% Rewrite
true -> [{emqx_mod_rewrite, Rewrites()}];
false -> []
end
])
end}.
%%-------------------------------------------------------------------
%% Plugins
%%-------------------------------------------------------------------
{mapping, "plugins.etc_dir", "emqx.plugins_etc_dir", [
{datatype, string}
]}.
{mapping, "plugins.loaded_file", "emqx.plugins_loaded_file", [
{datatype, string}
]}.
{mapping, "plugins.expand_plugins_dir", "emqx.expand_plugins_dir", [
{datatype, string}
]}.
%%--------------------------------------------------------------------
%% Broker
%%--------------------------------------------------------------------
{mapping, "broker.sys_interval", "emqx.broker_sys_interval", [
{datatype, {duration, ms}},
{default, "1m"}
]}.
{mapping, "broker.session_locking_strategy", "emqx.session_locking_strategy", [
{default, quorum},
{datatype, {enum, [local,one,quorum,all]}}
]}.
{mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [
{default, random},
{datatype, {enum, [random, round_robbin, hash]}}
]}.
{mapping, "broker.route_batch_clean", "emqx.route_batch_clean", [
{default, on},
{datatype, flag}
]}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% System Monitor %% System Monitor
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------