diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 9a28e5e0d..b793bcf37 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -53,11 +53,26 @@ namespace() -> gateway. roots() -> [gateway]. fields(gateway) -> - [{stomp, sc_meta(ref(stomp) , #{nullable => {true, recursively}})}, - {mqttsn, sc_meta(ref(mqttsn) , #{nullable => {true, recursively}})}, - {coap, sc_meta(ref(coap) , #{nullable => {true, recursively}})}, - {lwm2m, sc_meta(ref(lwm2m) , #{nullable => {true, recursively}})}, - {exproto, sc_meta(ref(exproto), #{nullable => {true, recursively}})} + [{stomp, sc(ref(stomp), + #{ nullable => {true, recursively} + , desc => "The Stomp Gateway configuration." + })}, + {mqttsn, sc(ref(mqttsn), + #{ nullable => {true, recursively} + , desc => "The MQTT-SN Gateway configuration" + })}, + {coap, sc(ref(coap), + #{ nullable => {true, recursively} + , desc => "The CoAP Gateway configuration" + })}, + {lwm2m, sc(ref(lwm2m), + #{ nullable => {true, recursively} + , desc => "The LwM2M Gateway configuration" + })}, + {exproto, sc(ref(exproto), + #{ nullable => {true, recursively} + , desc => "The Extension Protocol configuration" + })} ]; fields(stomp) -> @@ -66,61 +81,198 @@ fields(stomp) -> ] ++ gateway_common_options(); fields(stomp_frame) -> - [ {max_headers, sc(integer(), 10)} - , {max_headers_length, sc(integer(), 1024)} - , {max_body_length, sc(integer(), 8192)} + [ {max_headers, + sc(integer(), + #{ default => 10 + , desc => "The maximum number of Header" + })} + , {max_headers_length, + sc(integer(), + #{ default => 1024 + , desc => "The maximum string length of the Header Value" + })} + , {max_body_length, + sc(integer(), + #{ default => 65536 + , desc => "Maximum number of bytes of Body allowed per Stomp packet" + })} ]; fields(mqttsn) -> - [ {gateway_id, sc(integer())} - , {broadcast, sc(boolean(), false)} - , {enable_qos3, sc(boolean(), true)} - , {predefined, hoconsc:array(ref(mqttsn_predefined))} + [ {gateway_id, + sc(integer(), + #{ default => 1 + , desc => +"MQTT-SN Gateway Id.
+When the broadcast option is enabled, +the gateway will broadcast ADVERTISE message with this value" + })} + , {broadcast, + sc(boolean(), + #{ default => false + , desc => "Whether to periodically broadcast ADVERTISE messages" + })} + %% TODO: rename + , {enable_qos3, + sc(boolean(), + #{ default => true + , desc => +"Allows connectionless clients to publish messages with a Qos of -1.
+This feature is defined for very simple client implementations +which do not support any other features except this one.
+There is no connection setup nor tear down, no registration nor subscription.
+The client just sends its PUBLISH messages to a GW" + })} + , {predefined, + sc(hoconsc:array(ref(mqttsn_predefined)), + #{ default => [] + , desc => +"The Pre-defined topic ids and topic names.
+A 'pre-defined' topic id is a topic id whose mapping to a topic name +is known in advance by both the client’s application and the gateway" + })} , {listeners, sc(ref(udp_listeners))} ] ++ gateway_common_options(); fields(mqttsn_predefined) -> - [ {id, sc(integer())} - , {topic, sc(binary())} + [ {id, sc(integer(), #{desc => "Topic Id.
Range: 1-65535"})} + , {topic, sc(binary(), #{desc => "Topic Name"})} ]; fields(coap) -> - [ {heartbeat, sc(duration(), <<"30s">>)} - , {connection_required, sc(boolean(), false)} - , {notify_type, sc(hoconsc:union([non, con, qos]), qos)} - , {subscribe_qos, sc(hoconsc:union([qos0, qos1, qos2, coap]), coap)} - , {publish_qos, sc(hoconsc:union([qos0, qos1, qos2, coap]), coap)} + [ {heartbeat, + sc(duration(), + #{ default => <<"30s">> + , desc => +"The gateway server required minimum hearbeat interval.
+When connection mode is enabled, this parameter is used to set the minimum +heartbeat interval for the connection to be alive." + })} + , {connection_required, + sc(boolean(), + #{ default => false + , desc => +"Enable or disable connection mode.
+Connection mode is a feature of non-standard protocols. When connection mode +is enabled, it is necessary to maintain the creation, authentication and alive +of connection resources" + })} + , {notify_type, + sc(hoconsc:union([non, con, qos]), + #{ default => qos + , desc => +"The Notification Message will be delivered to the CoAP client if a new message +received on an observed topic. +The type of delivered coap message can be set to:
+1. non: Non-confirmable;
+2. con: Confirmable;
+3. qos: Mapping from QoS type of recevied message, QoS0 -> non, QoS1,2 -> con" + })} + , {subscribe_qos, + sc(hoconsc:union([qos0, qos1, qos2, coap]), + #{ default => coap + , desc => +"The Default QoS Level indicator for subscribe request.
+This option specifies the QoS level for the CoAP Client when establishing a +subscription membership, if the subscribe request is not carried `qos` option. +The indicator can be set to: + - qos0, qos1, qos2: Fixed default QoS level + - coap: Dynamic QoS level by the message type of subscribe request + * qos0: If the subscribe request is non-confirmable + * qos1: If the subscribe request is confirmable" + })} + , {publish_qos, + sc(hoconsc:union([qos0, qos1, qos2, coap]), + #{ default => coap + , desc => +"The Default QoS Level indicator for publish request.
+This option specifies the QoS level for the CoAP Client when publishing a +message to EMQ X PUB/SUB system, if the publish request is not carried `qos` +option. The indicator can be set to: + - qos0, qos1, qos2: Fixed default QoS level + - coap: Dynamic QoS level by the message type of publish request + * qos0: If the publish request is non-confirmable + * qos1: If the publish request is confirmable" + })} , {listeners, sc(ref(udp_listeners))} ] ++ gateway_common_options(); fields(lwm2m) -> - [ {xml_dir, sc(binary(), "etc/lwm2m_xml")} - , {lifetime_min, sc(duration(), "1s")} - , {lifetime_max, sc(duration(), "86400s")} - , {qmode_time_window, sc(duration_s(), "22s")} + [ {xml_dir, + sc(binary(), + #{ default =>"etc/lwm2m_xml" + , desc => "The Directory for LwM2M Resource defination" + })} + , {lifetime_min, + sc(duration(), + #{ default => "1s" + , desc => "Minimum value of lifetime allowed to be set by the LwM2M client" + })} + , {lifetime_max, + sc(duration(), + #{ default => "86400s" + , desc => "Maximum value of lifetime allowed to be set by the LwM2M client" + })} + , {qmode_time_window, + sc(duration_s(), + #{ default => "22s" + , desc => +"The value of the time window during which the network link is considered +valid by the LwM2M Gateway in QMode mode.
+For example, after receiving an update message from a client, any messages +within this time window are sent directly to the LwM2M client, and all messages +beyond this time window are temporarily stored in memory." + })} %% TODO: Support config resource path - , {auto_observe, sc(boolean(), false)} - , {update_msg_publish_condition, sc(hoconsc:union([always, contains_object_list]))} - , {translators, sc_meta(ref(translators), #{nullable => false})} + , {auto_observe, + sc(boolean(), + #{ default => false + , desc => "Automatically observe the object list of REGISTER packet" + })} + %% FIXME: not working now + , {update_msg_publish_condition, + sc(hoconsc:union([always, contains_object_list]), + #{ default => "contains_object_list" + , desc => +"Policy for publishing UPDATE event message to EMQ X.
+ - always: send update events as long as the UPDATE request is received. + - contains_object_list: send update events only if the UPDATE request carries any Object List." + })} + , {translators, + sc(ref(lwm2m_translators), + #{ nullable => false + , desc => "Topic configuration for LwM2M's gateway publishing and subscription" + })} , {listeners, sc(ref(udp_listeners))} ] ++ gateway_common_options(); fields(exproto) -> - [ {server, sc(ref(exproto_grpc_server))} - , {handler, sc(ref(exproto_grpc_handler))} + [ {server, + sc(ref(exproto_grpc_server), + #{ desc => "Configurations for starting the ConnectionAdapter service" + })} + , {handler, + sc(ref(exproto_grpc_handler), + #{ desc => "Configurations for request to ConnectionHandler service" + })} , {listeners, sc(ref(udp_tcp_listeners))} ] ++ gateway_common_options(); fields(exproto_grpc_server) -> - [ {bind, sc(hoconsc:union([ip_port(), integer()]))} - , {ssl, sc_meta(ref(ssl_server_opts), - #{nullable => {true, recursively}})} + [ {bind, + sc(hoconsc:union([ip_port(), integer()]))} + , {ssl, + sc(ref(ssl_server_opts), + #{ nullable => {true, recursively} + })} ]; fields(exproto_grpc_handler) -> [ {address, sc(binary())} - , {ssl, sc_meta(ref(ssl_client_opts), - #{nullable => {true, recursively}})} + , {ssl, + sc(ref(ssl_client_opts), + #{ nullable => {true, recursively} + })} ]; fields(ssl_server_opts) -> @@ -140,12 +292,36 @@ fields(clientinfo_override) -> , {clientid, sc(binary())} ]; -fields(translators) -> - [ {command, sc(ref(translator))} - , {response, sc(ref(translator))} - , {notify, sc(ref(translator))} - , {register, sc(ref(translator))} - , {update, sc(ref(translator))} +fields(lwm2m_translators) -> + [ {command, + sc(ref(translator), + #{ desc => +"The topic for receiving downstream commands.
+For each new LwM2M client that succeeds in going online, the gateway creates +a the subscription relationship to receive downstream commands and send it to +the LwM2M client" + })} + , {response, + sc(ref(translator), + #{ desc => +"The topic for gateway to publish the acknowledge events from LwM2M client" + })} + , {notify, + sc(ref(translator), + #{ desc => +"The topic for gateway to publish the notify events from LwM2M client.
+After succeed observe a resource of LwM2M client, Gateway will send the notifyevents via this topic, if the client reports any resource changes" + })} + , {register, + sc(ref(translator), + #{ desc => +"The topic for gateway to publish the register events from LwM2M client.
" + })} + , {update, + sc(ref(translator), + #{ desc => +"The topic for gateway to publish the update events from LwM2M client.
" + })} ]; fields(translator) -> @@ -180,9 +356,11 @@ fields(tcp_listener) -> fields(ssl_listener) -> fields(tcp_listener) ++ - [{ssl, sc_meta(hoconsc:ref(emqx_schema, "listener_ssl_opts"), - #{desc => "SSL listener options"})}]; - + [{ssl, + sc(hoconsc:ref(emqx_schema, "listener_ssl_opts"), + #{ desc => "SSL listener options" + })} + ]; fields(udp_listener) -> [ @@ -192,11 +370,10 @@ fields(udp_listener) -> common_listener_opts(); fields(dtls_listener) -> - [ {acceptors, sc(integer(), 16)} + [ {acceptors, sc(integer(), #{default => 16})} ] ++ fields(udp_listener) ++ - [{dtls, sc_meta(ref(dtls_opts), - #{desc => "DTLS listener options"})}]; + [{dtls, sc(ref(dtls_opts), #{desc => "DTLS listener options"})}]; fields(udp_opts) -> [ {active_n, sc(integer(), 100)} @@ -215,66 +392,113 @@ fields(dtls_opts) -> }, false). authentication() -> - sc_meta(hoconsc:union( - [ hoconsc:ref(emqx_authn_mnesia, config) - , hoconsc:ref(emqx_authn_mysql, config) - , hoconsc:ref(emqx_authn_pgsql, config) - , hoconsc:ref(emqx_authn_mongodb, standalone) - , hoconsc:ref(emqx_authn_mongodb, 'replica-set') - , hoconsc:ref(emqx_authn_mongodb, 'sharded-cluster') - , hoconsc:ref(emqx_authn_redis, standalone) - , hoconsc:ref(emqx_authn_redis, cluster) - , hoconsc:ref(emqx_authn_redis, sentinel) - , hoconsc:ref(emqx_authn_http, get) - , hoconsc:ref(emqx_authn_http, post) - , hoconsc:ref(emqx_authn_jwt, 'hmac-based') - , hoconsc:ref(emqx_authn_jwt, 'public-key') - , hoconsc:ref(emqx_authn_jwt, 'jwks') - , hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config) - ]), - #{nullable => {true, recursively}, - desc => + sc(hoconsc:union( + [ hoconsc:ref(emqx_authn_mnesia, config) + , hoconsc:ref(emqx_authn_mysql, config) + , hoconsc:ref(emqx_authn_pgsql, config) + , hoconsc:ref(emqx_authn_mongodb, standalone) + , hoconsc:ref(emqx_authn_mongodb, 'replica-set') + , hoconsc:ref(emqx_authn_mongodb, 'sharded-cluster') + , hoconsc:ref(emqx_authn_redis, standalone) + , hoconsc:ref(emqx_authn_redis, cluster) + , hoconsc:ref(emqx_authn_redis, sentinel) + , hoconsc:ref(emqx_authn_http, get) + , hoconsc:ref(emqx_authn_http, post) + , hoconsc:ref(emqx_authn_jwt, 'hmac-based') + , hoconsc:ref(emqx_authn_jwt, 'public-key') + , hoconsc:ref(emqx_authn_jwt, 'jwks') + , hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config) + ]), + #{ nullable => {true, recursively} + , desc => """Default authentication configs for all of the gateway listeners.
For per-listener overrides see authentication -in listener configs"""}). +in listener configs""" + }). gateway_common_options() -> - [ {enable, sc(boolean(), true)} - , {enable_stats, sc(boolean(), true)} - , {idle_timeout, sc(duration(), <<"30s">>)} - , {mountpoint, sc(binary(), <<>>)} - , {clientinfo_override, sc(ref(clientinfo_override))} + [ {enable, + sc(boolean(), + #{ default => true + , desc => "Whether to enable this gateway" + })} + , {enable_stats, + sc(boolean(), + #{ default => true + , desc => "Whether to enable client process statistic" + })} + , {idle_timeout, + sc(duration(), + #{ default => <<"30s">> + , desc => +"The idle time of the client connection process.
+it has two purposes: +1. A newly created client process that does not receive any client requests + after that time will be closed directly. +2. A running client process that does not receive any client requests after + this time will go into hibernation to save resources." + })} + , {mountpoint, + sc(binary(), + #{ default => <<>> + %% TODO: variable support? + , desc => "" + })} + , {clientinfo_override, + sc(ref(clientinfo_override), + #{ desc => "" + })} , {authentication, authentication()} ]. common_listener_opts() -> - [ {enable, sc(boolean(), true)} - , {bind, sc(hoconsc:union([ip_port(), integer()]))} - , {max_connections, sc(integer(), 1024)} - , {max_conn_rate, sc(integer())} + [ {enable, + sc(boolean(), + #{ default => true + })} + , {bind, + sc(hoconsc:union([ip_port(), integer()]), + #{})} + , {max_connections, + sc(integer(), + #{ default => 1024 + })} + , {max_conn_rate, + sc(integer(), + #{ default => 1000 + })} , {authentication, authentication()} - , {mountpoint, sc(binary(), undefined)} - , {access_rules, sc(hoconsc:array(string()), [])} + , {mountpoint, + sc(binary(), + #{ default => undefined + })} + , {access_rules, + sc(hoconsc:array(string()), + #{ default => [] + })} ]. tcp_opts() -> - [{tcp, sc_meta(ref(emqx_schema, "tcp_opts"), #{})}]. + [{tcp, sc(ref(emqx_schema, "tcp_opts"), #{})}]. udp_opts() -> - [{udp, sc_meta(ref(udp_opts), #{})}]. + [{udp, sc(ref(udp_opts), #{})}]. proxy_protocol_opts() -> - [ {proxy_protocol, sc(boolean(), false)} - , {proxy_protocol_timeout, sc(duration(), "15s")} + [ {proxy_protocol, + sc(boolean(), + #{ default => false + })} + , {proxy_protocol_timeout, + sc(duration(), + #{ default => "15s" + })} ]. sc(Type) -> - sc_meta(Type, #{}). + sc(Type, #{}). -sc(Type, Default) -> - sc_meta(Type, #{default => Default}). - -sc_meta(Type, Meta) -> +sc(Type, Meta) -> hoconsc:mk(Type, Meta). map(Name, Type) -> diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl index dc967681a..0b90d843c 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl @@ -51,6 +51,7 @@ stop() -> %%-------------------------------------------------------------------- init([GwId, Port]) -> + %% FIXME: Duration = application:get_env(emqx_sn, advertise_duration, ?DEFAULT_DURATION), {ok, Sock} = gen_udp:open(0, [binary, {broadcast, true}]), {ok, ensure_advertise(#state{gwid = GwId, addrs = boradcast_addrs(), diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index 1000eec58..d75f77abe 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -123,7 +123,7 @@ initial_parse_state(Opts) -> limit(Opts) -> #frame_limit{ max_header_num = g(max_header_num, Opts, ?MAX_HEADER_NUM), - max_header_length = g(max_header_length, Opts, ?MAX_BODY_LENGTH), + max_header_length = g(max_header_length, Opts, ?MAX_HEADER_LENGTH), max_body_length = g(max_body_length, Opts, ?MAX_BODY_LENGTH) }.