Add http to listen on port 8080 for the http REST API
This commit is contained in:
parent
0f7a66f810
commit
1a8cc2e146
24
etc/emq.conf
24
etc/emq.conf
|
@ -446,6 +446,30 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
|
||||||
|
|
||||||
## listener.wss.external.fail_if_no_peer_cert = true
|
## listener.wss.external.fail_if_no_peer_cert = true
|
||||||
|
|
||||||
|
##--------------------------------------------------------------------
|
||||||
|
## External MQTT/REST API Listener
|
||||||
|
|
||||||
|
listener.api.external = 8080
|
||||||
|
|
||||||
|
listener.api.external.acceptors = 4
|
||||||
|
|
||||||
|
listener.api.external.max_clients = 64
|
||||||
|
|
||||||
|
listener.api.external.access.1 = allow all
|
||||||
|
|
||||||
|
## TCP Options
|
||||||
|
listener.api.external.backlog = 1024
|
||||||
|
|
||||||
|
listener.api.external.recbuf = 4KB
|
||||||
|
|
||||||
|
listener.api.external.sndbuf = 4KB
|
||||||
|
|
||||||
|
listener.api.external.buffer = 4KB
|
||||||
|
|
||||||
|
listener.api.external.nodelay = true
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
##-------------------------------------------------------------------
|
##-------------------------------------------------------------------
|
||||||
## System Monitor
|
## System Monitor
|
||||||
##-------------------------------------------------------------------
|
##-------------------------------------------------------------------
|
||||||
|
|
|
@ -1027,9 +1027,97 @@ end}.
|
||||||
++
|
++
|
||||||
[SslListeners(Type, Name) || {["listener", Type, Name], ListenOn}
|
[SslListeners(Type, Name) || {["listener", Type, Name], ListenOn}
|
||||||
<- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf)
|
<- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf)
|
||||||
++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)])
|
++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)
|
||||||
|
++ cuttlefish_variable:filter_by_prefix("listener.api", Conf)])
|
||||||
end}.
|
end}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% MQTT REST API Listeners
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name", "emqttd.listeners", [
|
||||||
|
{datatype, [integer, ip]}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.acceptors", "emqttd.listeners", [
|
||||||
|
{default, 8},
|
||||||
|
{datatype, integer}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.max_clients", "emqttd.listeners", [
|
||||||
|
{default, 1024},
|
||||||
|
{datatype, integer}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.zone", "emqttd.listeners", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.mountpoint", "emqttd.listeners", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.rate_limit", "emqttd.listeners", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.access.$id", "emqttd.listeners", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.backlog", "emqttd.listeners", [
|
||||||
|
{default, 1024},
|
||||||
|
{datatype, integer}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.recbuf", "emqttd.listeners", [
|
||||||
|
{datatype, bytesize},
|
||||||
|
hidden
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.sndbuf", "emqttd.listeners", [
|
||||||
|
{datatype, bytesize},
|
||||||
|
hidden
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.buffer", "emqttd.listeners", [
|
||||||
|
{datatype, bytesize},
|
||||||
|
hidden
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.tune_buffer", "emqttd.listeners", [
|
||||||
|
{datatype, flag},
|
||||||
|
hidden
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.nodelay", "emqttd.listeners", [
|
||||||
|
{datatype, {enum, [true, false]}},
|
||||||
|
hidden
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.handshake_timeout", "emqttd.listeners", [
|
||||||
|
{datatype, {duration, ms}}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.keyfile", "emqttd.listeners", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.certfile", "emqttd.listeners", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.cacertfile", "emqttd.listeners", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.verify", "emqttd.listeners", [
|
||||||
|
{datatype, atom}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqttd.listeners", [
|
||||||
|
{datatype, {enum, [true, false]}}
|
||||||
|
]}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% System Monitor
|
%% System Monitor
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -165,11 +165,14 @@ start_listener({ssl, ListenOn, Opts}) ->
|
||||||
|
|
||||||
%% Start http listener
|
%% Start http listener
|
||||||
start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws ->
|
start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws ->
|
||||||
mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqttd_http, handle_request, []});
|
mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqttd_ws, handle_request, []});
|
||||||
|
|
||||||
%% Start https listener
|
%% Start https listener
|
||||||
start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss ->
|
start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss ->
|
||||||
mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_http, handle_request, []}).
|
mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqttd_ws, handle_request, []});
|
||||||
|
|
||||||
|
start_listener({Proto, ListenOn, Opts}) when Proto == api ->
|
||||||
|
mochiweb:start_http('mqtt:api', ListenOn, Opts, {emqttd_http, handle_request, []}).
|
||||||
|
|
||||||
start_listener(Proto, ListenOn, Opts) ->
|
start_listener(Proto, ListenOn, Opts) ->
|
||||||
Env = lists:append(emqttd:env(client, []), emqttd:env(protocol, [])),
|
Env = lists:append(emqttd:env(client, []), emqttd:env(protocol, [])),
|
||||||
|
|
|
@ -52,25 +52,6 @@ handle_request('POST', "/mqtt/publish", Req) ->
|
||||||
false -> Req:respond({401, [], <<"Unauthorized">>})
|
false -> Req:respond({401, [], <<"Unauthorized">>})
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% MQTT Over WebSocket
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
handle_request('GET', "/mqtt", Req) ->
|
|
||||||
lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
|
|
||||||
Upgrade = Req:get_header_value("Upgrade"),
|
|
||||||
Proto = check_protocol_header(Req),
|
|
||||||
case {is_websocket(Upgrade), Proto} of
|
|
||||||
{true, "mqtt" ++ _Vsn} ->
|
|
||||||
emqttd_ws:handle_request(Req);
|
|
||||||
{false, _} ->
|
|
||||||
lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
|
|
||||||
Req:respond({400, [], <<"Bad Request">>});
|
|
||||||
{_, Proto} ->
|
|
||||||
lager:error("WebSocket with error Protocol: ~s", [Proto]),
|
|
||||||
Req:respond({400, [], <<"Bad WebSocket Protocol">>})
|
|
||||||
end;
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Get static files
|
%% Get static files
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -83,18 +64,6 @@ handle_request(Method, Path, Req) ->
|
||||||
lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]),
|
lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]),
|
||||||
Req:not_found().
|
Req:not_found().
|
||||||
|
|
||||||
check_protocol_header(Req) ->
|
|
||||||
case emqttd:env(websocket_protocol_header, false) of
|
|
||||||
true -> get_protocol_header(Req);
|
|
||||||
false -> "mqtt-v3.1.1"
|
|
||||||
end.
|
|
||||||
|
|
||||||
get_protocol_header(Req) ->
|
|
||||||
case Req:get_header_value("EMQ-WebSocket-Protocol") of
|
|
||||||
undefined -> Req:get_header_value("Sec-WebSocket-Protocol");
|
|
||||||
Proto -> Proto
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% HTTP Publish
|
%% HTTP Publish
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -174,9 +143,6 @@ bool("1") -> true;
|
||||||
bool(<<"0">>) -> false;
|
bool(<<"0">>) -> false;
|
||||||
bool(<<"1">>) -> true.
|
bool(<<"1">>) -> true.
|
||||||
|
|
||||||
is_websocket(Upgrade) ->
|
|
||||||
Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
|
|
||||||
|
|
||||||
docroot() ->
|
docroot() ->
|
||||||
{file, Here} = code:is_loaded(?MODULE),
|
{file, Here} = code:is_loaded(?MODULE),
|
||||||
Dir = filename:dirname(filename:dirname(Here)),
|
Dir = filename:dirname(filename:dirname(Here)),
|
||||||
|
|
|
@ -31,20 +31,53 @@
|
||||||
lager:Level("WsClient(~s): " ++ Format,
|
lager:Level("WsClient(~s): " ++ Format,
|
||||||
[esockd_net:format(State#wsocket_state.peername) | Args])).
|
[esockd_net:format(State#wsocket_state.peername) | Args])).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Handle WebSocket Request
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc Handle WebSocket Request.
|
|
||||||
handle_request(Req) ->
|
handle_request(Req) ->
|
||||||
{ok, ProtoEnv} = emqttd:env(protocol),
|
handle_request(Req:get(method), Req:get(path), Req).
|
||||||
PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
|
|
||||||
Parser = emqttd_parser:initial_state(PacketSize),
|
%%--------------------------------------------------------------------
|
||||||
%% Upgrade WebSocket.
|
%% MQTT Over WebSocket
|
||||||
{ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
|
%%--------------------------------------------------------------------
|
||||||
{ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
|
handle_request('GET', "/mqtt", Req) ->
|
||||||
ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser,
|
lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
|
||||||
max_packet_size = PacketSize, client_pid = ClientPid}).
|
Upgrade = Req:get_header_value("Upgrade"),
|
||||||
|
Proto = check_protocol_header(Req),
|
||||||
|
case {is_websocket(Upgrade), Proto} of
|
||||||
|
{true, "mqtt" ++ _Vsn} ->
|
||||||
|
{ok, ProtoEnv} = emqttd:env(protocol),
|
||||||
|
PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
|
||||||
|
Parser = emqttd_parser:initial_state(PacketSize),
|
||||||
|
%% Upgrade WebSocket.
|
||||||
|
{ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
|
||||||
|
{ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
|
||||||
|
ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser,
|
||||||
|
max_packet_size = PacketSize, client_pid = ClientPid});
|
||||||
|
{false, _} ->
|
||||||
|
lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
|
||||||
|
Req:respond({400, [], <<"Bad Request">>});
|
||||||
|
{_, Proto} ->
|
||||||
|
lager:error("WebSocket with error Protocol: ~s", [Proto]),
|
||||||
|
Req:respond({400, [], <<"Bad WebSocket Protocol">>})
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_request(Method, Path, Req) ->
|
||||||
|
lager:error("Unexpected WS Request: ~s ~s", [Method, Path]),
|
||||||
|
Req:not_found().
|
||||||
|
|
||||||
|
is_websocket(Upgrade) ->
|
||||||
|
Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
|
||||||
|
|
||||||
|
check_protocol_header(Req) ->
|
||||||
|
case emqttd:env(websocket_protocol_header, false) of
|
||||||
|
true -> get_protocol_header(Req);
|
||||||
|
false -> "mqtt-v3.1.1"
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_protocol_header(Req) ->
|
||||||
|
case Req:get_header_value("EMQ-WebSocket-Protocol") of
|
||||||
|
undefined -> Req:get_header_value("Sec-WebSocket-Protocol");
|
||||||
|
Proto -> Proto
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Receive Loop
|
%% Receive Loop
|
||||||
|
|
Loading…
Reference in New Issue