docs(gw): add docs for gateway schema

This commit is contained in:
JianBo He 2021-10-25 16:40:02 +08:00 committed by JianBo He
parent bc3f3b4c55
commit 1f067d6f3b
3 changed files with 311 additions and 86 deletions

View File

@ -53,11 +53,26 @@ namespace() -> gateway.
roots() -> [gateway].
fields(gateway) ->
[{stomp, sc_meta(ref(stomp) , #{nullable => {true, recursively}})},
{mqttsn, sc_meta(ref(mqttsn) , #{nullable => {true, recursively}})},
{coap, sc_meta(ref(coap) , #{nullable => {true, recursively}})},
{lwm2m, sc_meta(ref(lwm2m) , #{nullable => {true, recursively}})},
{exproto, sc_meta(ref(exproto), #{nullable => {true, recursively}})}
[{stomp, sc(ref(stomp),
#{ nullable => {true, recursively}
, desc => "The Stomp Gateway configuration."
})},
{mqttsn, sc(ref(mqttsn),
#{ nullable => {true, recursively}
, desc => "The MQTT-SN Gateway configuration"
})},
{coap, sc(ref(coap),
#{ nullable => {true, recursively}
, desc => "The CoAP Gateway configuration"
})},
{lwm2m, sc(ref(lwm2m),
#{ nullable => {true, recursively}
, desc => "The LwM2M Gateway configuration"
})},
{exproto, sc(ref(exproto),
#{ nullable => {true, recursively}
, desc => "The Extension Protocol configuration"
})}
];
fields(stomp) ->
@ -66,61 +81,198 @@ fields(stomp) ->
] ++ gateway_common_options();
fields(stomp_frame) ->
[ {max_headers, sc(integer(), 10)}
, {max_headers_length, sc(integer(), 1024)}
, {max_body_length, sc(integer(), 8192)}
[ {max_headers,
sc(integer(),
#{ default => 10
, desc => "The maximum number of Header"
})}
, {max_headers_length,
sc(integer(),
#{ default => 1024
, desc => "The maximum string length of the Header Value"
})}
, {max_body_length,
sc(integer(),
#{ default => 65536
, desc => "Maximum number of bytes of Body allowed per Stomp packet"
})}
];
fields(mqttsn) ->
[ {gateway_id, sc(integer())}
, {broadcast, sc(boolean(), false)}
, {enable_qos3, sc(boolean(), true)}
, {predefined, hoconsc:array(ref(mqttsn_predefined))}
[ {gateway_id,
sc(integer(),
#{ default => 1
, desc =>
"MQTT-SN Gateway Id.<br>
When the <code>broadcast</code> option is enabled,
the gateway will broadcast ADVERTISE message with this value"
})}
, {broadcast,
sc(boolean(),
#{ default => false
, desc => "Whether to periodically broadcast ADVERTISE messages"
})}
%% TODO: rename
, {enable_qos3,
sc(boolean(),
#{ default => true
, desc =>
"Allows connectionless clients to publish messages with a Qos of -1.<br>
This feature is defined for very simple client implementations
which do not support any other features except this one.<br>
There is no connection setup nor tear down, no registration nor subscription.<br>
The client just sends its PUBLISH messages to a GW"
})}
, {predefined,
sc(hoconsc:array(ref(mqttsn_predefined)),
#{ default => []
, desc =>
"The Pre-defined topic ids and topic names.<br>
A 'pre-defined' topic id is a topic id whose mapping to a topic name
is known in advance by both the clients application and the gateway"
})}
, {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options();
fields(mqttsn_predefined) ->
[ {id, sc(integer())}
, {topic, sc(binary())}
[ {id, sc(integer(), #{desc => "Topic Id.<br>Range: 1-65535"})}
, {topic, sc(binary(), #{desc => "Topic Name"})}
];
fields(coap) ->
[ {heartbeat, sc(duration(), <<"30s">>)}
, {connection_required, sc(boolean(), false)}
, {notify_type, sc(hoconsc:union([non, con, qos]), qos)}
, {subscribe_qos, sc(hoconsc:union([qos0, qos1, qos2, coap]), coap)}
, {publish_qos, sc(hoconsc:union([qos0, qos1, qos2, coap]), coap)}
[ {heartbeat,
sc(duration(),
#{ default => <<"30s">>
, desc =>
"The gateway server required minimum hearbeat interval.<br>
When connection mode is enabled, this parameter is used to set the minimum
heartbeat interval for the connection to be alive."
})}
, {connection_required,
sc(boolean(),
#{ default => false
, desc =>
"Enable or disable connection mode.<br>
Connection mode is a feature of non-standard protocols. When connection mode
is enabled, it is necessary to maintain the creation, authentication and alive
of connection resources"
})}
, {notify_type,
sc(hoconsc:union([non, con, qos]),
#{ default => qos
, desc =>
"The Notification Message will be delivered to the CoAP client if a new message
received on an observed topic.
The type of delivered coap message can be set to:<br>
1. non: Non-confirmable;<br>
2. con: Confirmable;<br>
3. qos: Mapping from QoS type of recevied message, QoS0 -> non, QoS1,2 -> con"
})}
, {subscribe_qos,
sc(hoconsc:union([qos0, qos1, qos2, coap]),
#{ default => coap
, desc =>
"The Default QoS Level indicator for subscribe request.<br>
This option specifies the QoS level for the CoAP Client when establishing a
subscription membership, if the subscribe request is not carried `qos` option.
The indicator can be set to:
- qos0, qos1, qos2: Fixed default QoS level
- coap: Dynamic QoS level by the message type of subscribe request
* qos0: If the subscribe request is non-confirmable
* qos1: If the subscribe request is confirmable"
})}
, {publish_qos,
sc(hoconsc:union([qos0, qos1, qos2, coap]),
#{ default => coap
, desc =>
"The Default QoS Level indicator for publish request.<br>
This option specifies the QoS level for the CoAP Client when publishing a
message to EMQ X PUB/SUB system, if the publish request is not carried `qos`
option. The indicator can be set to:
- qos0, qos1, qos2: Fixed default QoS level
- coap: Dynamic QoS level by the message type of publish request
* qos0: If the publish request is non-confirmable
* qos1: If the publish request is confirmable"
})}
, {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options();
fields(lwm2m) ->
[ {xml_dir, sc(binary(), "etc/lwm2m_xml")}
, {lifetime_min, sc(duration(), "1s")}
, {lifetime_max, sc(duration(), "86400s")}
, {qmode_time_window, sc(duration_s(), "22s")}
[ {xml_dir,
sc(binary(),
#{ default =>"etc/lwm2m_xml"
, desc => "The Directory for LwM2M Resource defination"
})}
, {lifetime_min,
sc(duration(),
#{ default => "1s"
, desc => "Minimum value of lifetime allowed to be set by the LwM2M client"
})}
, {lifetime_max,
sc(duration(),
#{ default => "86400s"
, desc => "Maximum value of lifetime allowed to be set by the LwM2M client"
})}
, {qmode_time_window,
sc(duration_s(),
#{ default => "22s"
, desc =>
"The value of the time window during which the network link is considered
valid by the LwM2M Gateway in QMode mode.<br>
For example, after receiving an update message from a client, any messages
within this time window are sent directly to the LwM2M client, and all messages
beyond this time window are temporarily stored in memory."
})}
%% TODO: Support config resource path
, {auto_observe, sc(boolean(), false)}
, {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))}
, {translators, sc_meta(ref(translators), #{nullable => false})}
, {auto_observe,
sc(boolean(),
#{ default => false
, desc => "Automatically observe the object list of REGISTER packet"
})}
%% FIXME: not working now
, {update_msg_publish_condition,
sc(hoconsc:union([always, contains_object_list]),
#{ default => "contains_object_list"
, desc =>
"Policy for publishing UPDATE event message to EMQ X.<br>
- always: send update events as long as the UPDATE request is received.
- contains_object_list: send update events only if the UPDATE request carries any Object List."
})}
, {translators,
sc(ref(lwm2m_translators),
#{ nullable => false
, desc => "Topic configuration for LwM2M's gateway publishing and subscription"
})}
, {listeners, sc(ref(udp_listeners))}
] ++ gateway_common_options();
fields(exproto) ->
[ {server, sc(ref(exproto_grpc_server))}
, {handler, sc(ref(exproto_grpc_handler))}
[ {server,
sc(ref(exproto_grpc_server),
#{ desc => "Configurations for starting the <code>ConnectionAdapter</code> service"
})}
, {handler,
sc(ref(exproto_grpc_handler),
#{ desc => "Configurations for request to <code>ConnectionHandler</code> service"
})}
, {listeners, sc(ref(udp_tcp_listeners))}
] ++ gateway_common_options();
fields(exproto_grpc_server) ->
[ {bind, sc(hoconsc:union([ip_port(), integer()]))}
, {ssl, sc_meta(ref(ssl_server_opts),
#{nullable => {true, recursively}})}
[ {bind,
sc(hoconsc:union([ip_port(), integer()]))}
, {ssl,
sc(ref(ssl_server_opts),
#{ nullable => {true, recursively}
})}
];
fields(exproto_grpc_handler) ->
[ {address, sc(binary())}
, {ssl, sc_meta(ref(ssl_client_opts),
#{nullable => {true, recursively}})}
, {ssl,
sc(ref(ssl_client_opts),
#{ nullable => {true, recursively}
})}
];
fields(ssl_server_opts) ->
@ -140,12 +292,36 @@ fields(clientinfo_override) ->
, {clientid, sc(binary())}
];
fields(translators) ->
[ {command, sc(ref(translator))}
, {response, sc(ref(translator))}
, {notify, sc(ref(translator))}
, {register, sc(ref(translator))}
, {update, sc(ref(translator))}
fields(lwm2m_translators) ->
[ {command,
sc(ref(translator),
#{ desc =>
"The topic for receiving downstream commands.<br>
For each new LwM2M client that succeeds in going online, the gateway creates
a the subscription relationship to receive downstream commands and send it to
the LwM2M client"
})}
, {response,
sc(ref(translator),
#{ desc =>
"The topic for gateway to publish the acknowledge events from LwM2M client"
})}
, {notify,
sc(ref(translator),
#{ desc =>
"The topic for gateway to publish the notify events from LwM2M client.<br>
After succeed observe a resource of LwM2M client, Gateway will send the notifyevents via this topic, if the client reports any resource changes"
})}
, {register,
sc(ref(translator),
#{ desc =>
"The topic for gateway to publish the register events from LwM2M client.<br>"
})}
, {update,
sc(ref(translator),
#{ desc =>
"The topic for gateway to publish the update events from LwM2M client.<br>"
})}
];
fields(translator) ->
@ -180,9 +356,11 @@ fields(tcp_listener) ->
fields(ssl_listener) ->
fields(tcp_listener) ++
[{ssl, sc_meta(hoconsc:ref(emqx_schema, "listener_ssl_opts"),
#{desc => "SSL listener options"})}];
[{ssl,
sc(hoconsc:ref(emqx_schema, "listener_ssl_opts"),
#{ desc => "SSL listener options"
})}
];
fields(udp_listener) ->
[
@ -192,11 +370,10 @@ fields(udp_listener) ->
common_listener_opts();
fields(dtls_listener) ->
[ {acceptors, sc(integer(), 16)}
[ {acceptors, sc(integer(), #{default => 16})}
] ++
fields(udp_listener) ++
[{dtls, sc_meta(ref(dtls_opts),
#{desc => "DTLS listener options"})}];
[{dtls, sc(ref(dtls_opts), #{desc => "DTLS listener options"})}];
fields(udp_opts) ->
[ {active_n, sc(integer(), 100)}
@ -215,66 +392,113 @@ fields(dtls_opts) ->
}, false).
authentication() ->
sc_meta(hoconsc:union(
[ hoconsc:ref(emqx_authn_mnesia, config)
, hoconsc:ref(emqx_authn_mysql, config)
, hoconsc:ref(emqx_authn_pgsql, config)
, hoconsc:ref(emqx_authn_mongodb, standalone)
, hoconsc:ref(emqx_authn_mongodb, 'replica-set')
, hoconsc:ref(emqx_authn_mongodb, 'sharded-cluster')
, hoconsc:ref(emqx_authn_redis, standalone)
, hoconsc:ref(emqx_authn_redis, cluster)
, hoconsc:ref(emqx_authn_redis, sentinel)
, hoconsc:ref(emqx_authn_http, get)
, hoconsc:ref(emqx_authn_http, post)
, hoconsc:ref(emqx_authn_jwt, 'hmac-based')
, hoconsc:ref(emqx_authn_jwt, 'public-key')
, hoconsc:ref(emqx_authn_jwt, 'jwks')
, hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config)
]),
#{nullable => {true, recursively},
desc =>
sc(hoconsc:union(
[ hoconsc:ref(emqx_authn_mnesia, config)
, hoconsc:ref(emqx_authn_mysql, config)
, hoconsc:ref(emqx_authn_pgsql, config)
, hoconsc:ref(emqx_authn_mongodb, standalone)
, hoconsc:ref(emqx_authn_mongodb, 'replica-set')
, hoconsc:ref(emqx_authn_mongodb, 'sharded-cluster')
, hoconsc:ref(emqx_authn_redis, standalone)
, hoconsc:ref(emqx_authn_redis, cluster)
, hoconsc:ref(emqx_authn_redis, sentinel)
, hoconsc:ref(emqx_authn_http, get)
, hoconsc:ref(emqx_authn_http, post)
, hoconsc:ref(emqx_authn_jwt, 'hmac-based')
, hoconsc:ref(emqx_authn_jwt, 'public-key')
, hoconsc:ref(emqx_authn_jwt, 'jwks')
, hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config)
]),
#{ nullable => {true, recursively}
, desc =>
"""Default authentication configs for all of the gateway listeners.<br>
For per-listener overrides see <code>authentication</code>
in listener configs"""}).
in listener configs"""
}).
gateway_common_options() ->
[ {enable, sc(boolean(), true)}
, {enable_stats, sc(boolean(), true)}
, {idle_timeout, sc(duration(), <<"30s">>)}
, {mountpoint, sc(binary(), <<>>)}
, {clientinfo_override, sc(ref(clientinfo_override))}
[ {enable,
sc(boolean(),
#{ default => true
, desc => "Whether to enable this gateway"
})}
, {enable_stats,
sc(boolean(),
#{ default => true
, desc => "Whether to enable client process statistic"
})}
, {idle_timeout,
sc(duration(),
#{ default => <<"30s">>
, desc =>
"The idle time of the client connection process.<br>
it has two purposes:
1. A newly created client process that does not receive any client requests
after that time will be closed directly.
2. A running client process that does not receive any client requests after
this time will go into hibernation to save resources."
})}
, {mountpoint,
sc(binary(),
#{ default => <<>>
%% TODO: variable support?
, desc => ""
})}
, {clientinfo_override,
sc(ref(clientinfo_override),
#{ desc => ""
})}
, {authentication, authentication()}
].
common_listener_opts() ->
[ {enable, sc(boolean(), true)}
, {bind, sc(hoconsc:union([ip_port(), integer()]))}
, {max_connections, sc(integer(), 1024)}
, {max_conn_rate, sc(integer())}
[ {enable,
sc(boolean(),
#{ default => true
})}
, {bind,
sc(hoconsc:union([ip_port(), integer()]),
#{})}
, {max_connections,
sc(integer(),
#{ default => 1024
})}
, {max_conn_rate,
sc(integer(),
#{ default => 1000
})}
, {authentication, authentication()}
, {mountpoint, sc(binary(), undefined)}
, {access_rules, sc(hoconsc:array(string()), [])}
, {mountpoint,
sc(binary(),
#{ default => undefined
})}
, {access_rules,
sc(hoconsc:array(string()),
#{ default => []
})}
].
tcp_opts() ->
[{tcp, sc_meta(ref(emqx_schema, "tcp_opts"), #{})}].
[{tcp, sc(ref(emqx_schema, "tcp_opts"), #{})}].
udp_opts() ->
[{udp, sc_meta(ref(udp_opts), #{})}].
[{udp, sc(ref(udp_opts), #{})}].
proxy_protocol_opts() ->
[ {proxy_protocol, sc(boolean(), false)}
, {proxy_protocol_timeout, sc(duration(), "15s")}
[ {proxy_protocol,
sc(boolean(),
#{ default => false
})}
, {proxy_protocol_timeout,
sc(duration(),
#{ default => "15s"
})}
].
sc(Type) ->
sc_meta(Type, #{}).
sc(Type, #{}).
sc(Type, Default) ->
sc_meta(Type, #{default => Default}).
sc_meta(Type, Meta) ->
sc(Type, Meta) ->
hoconsc:mk(Type, Meta).
map(Name, Type) ->

View File

@ -51,6 +51,7 @@ stop() ->
%%--------------------------------------------------------------------
init([GwId, Port]) ->
%% FIXME:
Duration = application:get_env(emqx_sn, advertise_duration, ?DEFAULT_DURATION),
{ok, Sock} = gen_udp:open(0, [binary, {broadcast, true}]),
{ok, ensure_advertise(#state{gwid = GwId, addrs = boradcast_addrs(),

View File

@ -123,7 +123,7 @@ initial_parse_state(Opts) ->
limit(Opts) ->
#frame_limit{
max_header_num = g(max_header_num, Opts, ?MAX_HEADER_NUM),
max_header_length = g(max_header_length, Opts, ?MAX_BODY_LENGTH),
max_header_length = g(max_header_length, Opts, ?MAX_HEADER_LENGTH),
max_body_length = g(max_body_length, Opts, ?MAX_BODY_LENGTH)
}.