diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index e44c1f462..1dcad8bae 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -71,7 +71,7 @@ open_listener({mqtts, Port, Options}) -> %% open http port open_listener({http, Port, Options}) -> - MFArgs = {emqttd_http, handle, []}, + MFArgs = {emqttd_http, handle_req, []}, mochiweb:start_http(Port, Options, MFArgs). open_listener(Protocol, Port, Options) -> diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index 9009c6f25..86c93c6e1 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -34,14 +34,14 @@ -import(proplists, [get_value/2, get_value/3]). --export([handle/1]). +-export([handle_req/1]). -handle(Req) -> - handle(Req:get(method), Req:get(path), Req). +handle_req(Req) -> + handle_req(Req:get(method), Req:get(path), Req). -handle('POST', "/mqtt/publish", Req) -> +handle_req('POST', "/mqtt/publish", Req) -> Params = mochiweb_request:parse_post(Req), - lager:info("HTTP Publish: ~p~n", [Params]), + lager:info("HTTP Publish: ~p", [Params]), case authorized(Req) of true -> Qos = int(get_value("qos", Params, "0")), @@ -64,29 +64,28 @@ handle('POST', "/mqtt/publish", Req) -> Req:respond({401, [], <<"Fobbiden">>}) end; - -handle(_Method, "/mqtt/wsocket", Req) -> - lager:info("Websocket Headers: ~p~n", [Req:get(headers)]), +handle_req(_Method, "/mqtt/wsocket", Req) -> + lager:info("Websocket Connection from: ~s", [Req:get(peer)]), Up = Req:get_header_value("Upgrade"), case Up =/= undefined andalso string:to_lower(Up) =:= "websocket" of true -> - emqttd_websocket:init(Req); + emqttd_websocket:start_link(Req); false -> Req:respond({400, [], <<"Bad Request">>}) end; -handle('GET', "/" ++ File, Req) -> - lager:info("GET File: ~s", [File]), +handle_req('GET', "/" ++ File, Req) -> + lager:info("HTTP GET File: ~s", [File]), mochiweb_request:serve_file(File, docroot(), Req); -handle(_Method, _Path, Req) -> +handle_req(_Method, _Path, Req) -> Req:not_found(). %%------------------------------------------------------------------------------ %% basic authorization %%------------------------------------------------------------------------------ authorized(Req) -> - case mochiweb_request:get_header_value("Authorization", Req) of + case Req:get_header_value("Authorization") of undefined -> false; "Basic " ++ BasicAuth -> diff --git a/apps/emqttd/src/emqttd_websocket.erl b/apps/emqttd/src/emqttd_websocket.erl index 71e59f418..a6aa3f645 100644 --- a/apps/emqttd/src/emqttd_websocket.erl +++ b/apps/emqttd/src/emqttd_websocket.erl @@ -24,19 +24,39 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_websocket). --export([init/1, loop/3]). +-export([start_link/1, init/1, loop/3]). --record(state, {}). +-record(state, {request, + peername, + parse_state, + proto_state, + %packet_opts, + keepalive}). -init(Req) -> - {ReentryWs, _ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:loop/3), - ReentryWs(#state{}). +-record(client_state, {request, sender}). -loop(Payload, State, ReplyChannel) -> +-define(PACKET_OPTS, [{max_clientid_len, 1024}, + {max_packet_size, 4096}]). + +start_link(Req) -> + {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:loop/3), + {ok, Client} = gen_server:start_link(?MODULE, [Req, ReplyChannel], []), + ReentryWs(#state{client = Client, + parse_state = emqtt_parser:init(?PACKET_OPTS)}). + + +init([Req, ReplyChannel]) -> + {ok, + peername = Req:get_header_value(peername), + %ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts), + +loop(Payload, State = #state{parse_state = ParserState}, ReplyChannel) -> io:format("Received data: ~p~n", [Payload]), ReplyChannel(Payload), State. +