sync with air

This commit is contained in:
Feng Lee 2015-04-28 12:25:11 +08:00
parent 1b96f93ab7
commit 5afe4a62b4
3 changed files with 39 additions and 20 deletions

View File

@ -71,7 +71,7 @@ open_listener({mqtts, Port, Options}) ->
%% open http port
open_listener({http, Port, Options}) ->
MFArgs = {emqttd_http, handle, []},
MFArgs = {emqttd_http, handle_req, []},
mochiweb:start_http(Port, Options, MFArgs).
open_listener(Protocol, Port, Options) ->

View File

@ -34,14 +34,14 @@
-import(proplists, [get_value/2, get_value/3]).
-export([handle/1]).
-export([handle_req/1]).
handle(Req) ->
handle(Req:get(method), Req:get(path), Req).
handle_req(Req) ->
handle_req(Req:get(method), Req:get(path), Req).
handle('POST', "/mqtt/publish", Req) ->
handle_req('POST', "/mqtt/publish", Req) ->
Params = mochiweb_request:parse_post(Req),
lager:info("HTTP Publish: ~p~n", [Params]),
lager:info("HTTP Publish: ~p", [Params]),
case authorized(Req) of
true ->
Qos = int(get_value("qos", Params, "0")),
@ -64,29 +64,28 @@ handle('POST', "/mqtt/publish", Req) ->
Req:respond({401, [], <<"Fobbiden">>})
end;
handle(_Method, "/mqtt/wsocket", Req) ->
lager:info("Websocket Headers: ~p~n", [Req:get(headers)]),
handle_req(_Method, "/mqtt/wsocket", Req) ->
lager:info("Websocket Connection from: ~s", [Req:get(peer)]),
Up = Req:get_header_value("Upgrade"),
case Up =/= undefined andalso string:to_lower(Up) =:= "websocket" of
true ->
emqttd_websocket:init(Req);
emqttd_websocket:start_link(Req);
false ->
Req:respond({400, [], <<"Bad Request">>})
end;
handle('GET', "/" ++ File, Req) ->
lager:info("GET File: ~s", [File]),
handle_req('GET', "/" ++ File, Req) ->
lager:info("HTTP GET File: ~s", [File]),
mochiweb_request:serve_file(File, docroot(), Req);
handle(_Method, _Path, Req) ->
handle_req(_Method, _Path, Req) ->
Req:not_found().
%%------------------------------------------------------------------------------
%% basic authorization
%%------------------------------------------------------------------------------
authorized(Req) ->
case mochiweb_request:get_header_value("Authorization", Req) of
case Req:get_header_value("Authorization") of
undefined ->
false;
"Basic " ++ BasicAuth ->

View File

@ -24,19 +24,39 @@
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_websocket).
-export([init/1, loop/3]).
-export([start_link/1, init/1, loop/3]).
-record(state, {}).
-record(state, {request,
peername,
parse_state,
proto_state,
%packet_opts,
keepalive}).
init(Req) ->
{ReentryWs, _ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:loop/3),
ReentryWs(#state{}).
-record(client_state, {request, sender}).
loop(Payload, State, ReplyChannel) ->
-define(PACKET_OPTS, [{max_clientid_len, 1024},
{max_packet_size, 4096}]).
start_link(Req) ->
{ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:loop/3),
{ok, Client} = gen_server:start_link(?MODULE, [Req, ReplyChannel], []),
ReentryWs(#state{client = Client,
parse_state = emqtt_parser:init(?PACKET_OPTS)}).
init([Req, ReplyChannel]) ->
{ok,
peername = Req:get_header_value(peername),
%ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts),
loop(Payload, State = #state{parse_state = ParserState}, ReplyChannel) ->
io:format("Received data: ~p~n", [Payload]),
ReplyChannel(Payload),
State.