From 88c2b4eaa3cfe2d523fbe896aaaec66dd8484980 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 22 Feb 2017 15:43:24 +0800 Subject: [PATCH] Use the new emqttd_parser API to parse Websocket frame --- src/emqttd_ws.erl | 48 +++++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index 8c483344e..b292c39bc 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -18,13 +18,18 @@ -author("Feng Lee "). +-include("emqttd_protocol.hrl"). + +-import(proplists, [get_value/3]). + -export([handle_request/1, ws_loop/3]). %% WebSocket Loop State --record(wsocket_state, {peer, client_pid, packet_opts, parser_fun}). +-record(wsocket_state, {peername, client_pid, max_packet_size, parser}). --define(WSLOG(Level, Peer, Format, Args), - lager:Level("WsClient(~s): " ++ Format, [Peer | Args])). +-define(WSLOG(Level, Format, Args, State), + lager:Level("WsClient(~s): " ++ Format, + [esockd_net:format(State#wsocket_state.peername) | Args])). %%-------------------------------------------------------------------- %% Handle WebSocket Request @@ -32,18 +37,14 @@ %% @doc Handle WebSocket Request. handle_request(Req) -> - Peer = Req:get(peer), - {ok, PktOpts} = emqttd:env(protocol), - ParserFun = emqttd_parser:new(PktOpts), - {ReentryWs, ReplyChannel} = upgrade(Req), + {ok, Env} = emqttd:env(protocol), + PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE), + Parser = emqttd_parser:initial_state(PacketSize), + %% Upgrade WebSocket. + {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3), {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel), - ReentryWs(#wsocket_state{peer = Peer, client_pid = ClientPid, - packet_opts = PktOpts, parser_fun = ParserFun}). - -%% @doc Upgrade WebSocket. -%% @private -upgrade(Req) -> - mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3). + ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser, + max_packet_size = PacketSize, client_pid = ClientPid}). %%-------------------------------------------------------------------- %% Receive Loop @@ -54,25 +55,24 @@ ws_loop(<<>>, State, _ReplyChannel) -> State; ws_loop([<<>>], State, _ReplyChannel) -> State; -ws_loop(Data, State = #wsocket_state{peer = Peer, client_pid = ClientPid, - parser_fun = ParserFun}, ReplyChannel) -> - ?WSLOG(debug, Peer, "RECV ~p", [Data]), +ws_loop(Data, State = #wsocket_state{client_pid = ClientPid, parser = Parser}, ReplyChannel) -> + ?WSLOG(debug, "RECV ~p", [Data], State), emqttd_metrics:inc('bytes/received', iolist_size(Data)), - case catch ParserFun(iolist_to_binary(Data)) of + case catch emqttd_parser:parse(iolist_to_binary(Data), Parser) of {more, NewParser} -> - State#wsocket_state{parser_fun = NewParser}; + State#wsocket_state{parser = NewParser}; {ok, Packet, Rest} -> gen_server:cast(ClientPid, {received, Packet}), ws_loop(Rest, reset_parser(State), ReplyChannel); {error, Error} -> - ?WSLOG(error, Peer, "Frame error: ~p", [Error]), + ?WSLOG(error, "Frame error: ~p", [Error], State), exit({shutdown, Error}); {'EXIT', Reason} -> - ?WSLOG(error, Peer, "Frame error: ~p", [Reason]), - ?WSLOG(error, Peer, "Error data: ~p", [Data]), + ?WSLOG(error, "Frame error: ~p", [Reason], State), + ?WSLOG(error, "Error data: ~p", [Data], State), exit({shutdown, parser_error}) end. -reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> - State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}. +reset_parser(State = #wsocket_state{max_packet_size = PacketSize}) -> + State#wsocket_state{parser = emqttd_parser:initial_state(PacketSize)}.