diff --git a/etc/emq.conf b/etc/emq.conf index 1b78c9ae4..ea7e3e2ba 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -446,6 +446,30 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## listener.wss.external.fail_if_no_peer_cert = true +##-------------------------------------------------------------------- +## External MQTT/REST API Listener + +listener.api.external = 8080 + +listener.api.external.acceptors = 4 + +listener.api.external.max_clients = 64 + +listener.api.external.access.1 = allow all + +## TCP Options +listener.api.external.backlog = 1024 + +listener.api.external.recbuf = 4KB + +listener.api.external.sndbuf = 4KB + +listener.api.external.buffer = 4KB + +listener.api.external.nodelay = true + + + ##------------------------------------------------------------------- ## System Monitor ##------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index 5d9617e64..b812c52b6 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -1027,9 +1027,97 @@ end}. ++ [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf) + ++ cuttlefish_variable:filter_by_prefix("listener.api", Conf)]) end}. +%%-------------------------------------------------------------------- +%% MQTT REST API Listeners + +{mapping, "listener.api.$name", "emqttd.listeners", [ + {datatype, [integer, ip]} +]}. + +{mapping, "listener.api.$name.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.zone", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.mountpoint", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.api.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.api.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.api.$name.handshake_timeout", "emqttd.listeners", [ + {datatype, {duration, ms}} +]}. + +{mapping, "listener.api.$name.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.api.$name.verify", "emqttd.listeners", [ + {datatype, atom} +]}. + +{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 992e22da6..272ccffc9 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -165,11 +165,14 @@ start_listener({ssl, ListenOn, Opts}) -> %% Start http listener start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws -> - mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqttd_http, handle_request, []}); + mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqttd_ws, handle_request, []}); %% Start https listener start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss -> - mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_http, handle_request, []}). + mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_ws, handle_request, []}); + +start_listener({Proto, ListenOn, Opts}) when Proto == api -> + mochiweb:start_http('mqtt:api', ListenOn, Opts, {emqttd_http, handle_request, []}). start_listener(Proto, ListenOn, Opts) -> Env = lists:append(emqttd:env(client, []), emqttd:env(protocol, [])), diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 9b50cf34a..cf705ff00 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -52,25 +52,6 @@ handle_request('POST', "/mqtt/publish", Req) -> false -> Req:respond({401, [], <<"Unauthorized">>}) end; -%%-------------------------------------------------------------------- -%% MQTT Over WebSocket -%%-------------------------------------------------------------------- - -handle_request('GET', "/mqtt", Req) -> - lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), - Upgrade = Req:get_header_value("Upgrade"), - Proto = check_protocol_header(Req), - case {is_websocket(Upgrade), Proto} of - {true, "mqtt" ++ _Vsn} -> - emqttd_ws:handle_request(Req); - {false, _} -> - lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]), - Req:respond({400, [], <<"Bad Request">>}); - {_, Proto} -> - lager:error("WebSocket with error Protocol: ~s", [Proto]), - Req:respond({400, [], <<"Bad WebSocket Protocol">>}) - end; - %%-------------------------------------------------------------------- %% Get static files %%-------------------------------------------------------------------- @@ -83,18 +64,6 @@ handle_request(Method, Path, Req) -> lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]), Req:not_found(). -check_protocol_header(Req) -> - case emqttd:env(websocket_protocol_header, false) of - true -> get_protocol_header(Req); - false -> "mqtt-v3.1.1" - end. - -get_protocol_header(Req) -> - case Req:get_header_value("EMQ-WebSocket-Protocol") of - undefined -> Req:get_header_value("Sec-WebSocket-Protocol"); - Proto -> Proto - end. - %%-------------------------------------------------------------------- %% HTTP Publish %%-------------------------------------------------------------------- @@ -174,9 +143,6 @@ bool("1") -> true; bool(<<"0">>) -> false; bool(<<"1">>) -> true. -is_websocket(Upgrade) -> - Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket". - docroot() -> {file, Here} = code:is_loaded(?MODULE), Dir = filename:dirname(filename:dirname(Here)), diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index c6a68150d..dbf0ea08c 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -31,20 +31,53 @@ lager:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#wsocket_state.peername) | Args])). -%%-------------------------------------------------------------------- -%% Handle WebSocket Request -%%-------------------------------------------------------------------- -%% @doc Handle WebSocket Request. handle_request(Req) -> - {ok, ProtoEnv} = emqttd:env(protocol), - PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), - Parser = emqttd_parser:initial_state(PacketSize), - %% Upgrade WebSocket. - {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3), - {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel), - ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser, - max_packet_size = PacketSize, client_pid = ClientPid}). + handle_request(Req:get(method), Req:get(path), Req). + +%%-------------------------------------------------------------------- +%% MQTT Over WebSocket +%%-------------------------------------------------------------------- +handle_request('GET', "/mqtt", Req) -> + lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), + Upgrade = Req:get_header_value("Upgrade"), + Proto = check_protocol_header(Req), + case {is_websocket(Upgrade), Proto} of + {true, "mqtt" ++ _Vsn} -> + {ok, ProtoEnv} = emqttd:env(protocol), + PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE), + Parser = emqttd_parser:initial_state(PacketSize), + %% Upgrade WebSocket. + {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3), + {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel), + ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser, + max_packet_size = PacketSize, client_pid = ClientPid}); + {false, _} -> + lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]), + Req:respond({400, [], <<"Bad Request">>}); + {_, Proto} -> + lager:error("WebSocket with error Protocol: ~s", [Proto]), + Req:respond({400, [], <<"Bad WebSocket Protocol">>}) + end; + +handle_request(Method, Path, Req) -> + lager:error("Unexpected WS Request: ~s ~s", [Method, Path]), + Req:not_found(). + +is_websocket(Upgrade) -> + Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket". + +check_protocol_header(Req) -> + case emqttd:env(websocket_protocol_header, false) of + true -> get_protocol_header(Req); + false -> "mqtt-v3.1.1" + end. + +get_protocol_header(Req) -> + case Req:get_header_value("EMQ-WebSocket-Protocol") of + undefined -> Req:get_header_value("Sec-WebSocket-Protocol"); + Proto -> Proto + end. %%-------------------------------------------------------------------- %% Receive Loop