From cf95c5e1b9969d881aa09e65f14a1aeb6d373e4f Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 31 Oct 2015 18:43:04 +0800 Subject: [PATCH] 0.13 refactor --- src/emqttd_client.erl | 2 +- src/emqttd_message.erl | 19 ++--- src/emqttd_protocol.erl | 2 +- src/emqttd_retained.erl | 2 +- src/emqttd_ws_client.erl | 165 ++++++++++++++++++++++----------------- 5 files changed, 104 insertions(+), 86 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 40b8a59cd..f471d311b 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -177,7 +177,7 @@ handle_info(activate_sock, State) -> handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> Size = size(Data), - ?LOG(debug, "RECV <- ~p", [Data], State), + ?LOG(debug, "RECV ~p", [Data], State), emqttd_metrics:inc('bytes/received', Size), received(Data, rate_limit(Size, State#client_state{await_recv = false})); diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index d8e55e202..9d340b2b3 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_message). -author("Feng Lee "). @@ -170,14 +169,12 @@ unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. %% @doc Format MQTT Message %% @end %%------------------------------------------------------------------------------ -format(#mqtt_message{msgid=MsgId, - pktid = PktId, - from = From, - qos=Qos, - retain=Retain, - dup=Dup, - topic=Topic}) -> - io_lib:format("Message(MsgId=~p, PktId=~p, from=~s, " - "Qos=~p, Retain=~s, Dup=~s, Topic=~s)", - [MsgId, PktId, From, Qos, Retain, Dup, Topic]). +format(#mqtt_message{msgid = MsgId, pktid = PktId, from = From, + qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> + io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Topic=~s)", + [i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Topic]). + +i(true) -> 1; +i(false) -> 0; +i(I) when is_integer(I) -> I. diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index d0af0d420..954332afe 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -263,7 +263,7 @@ send(Packet, State = #proto_state{sendfun = SendFun}) trace(send, Packet, State), emqttd_metrics:sent(Packet), Data = emqttd_serialiser:serialise(Packet), - ?LOG(debug, "SENT -> ~p", [Data], State), + ?LOG(debug, "SEND ~p", [Data], State), emqttd_metrics:inc('bytes/sent', size(Data)), SendFun(Data), {ok, State}. diff --git a/src/emqttd_retained.erl b/src/emqttd_retained.erl index 8e797255b..685006200 100644 --- a/src/emqttd_retained.erl +++ b/src/emqttd_retained.erl @@ -98,7 +98,7 @@ retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) -> case {TabSize < limit(table), size(Payload) < limit(payload)} of {true, true} -> Retained = #mqtt_retained{topic = Topic, message = Msg}, - lager:debug("Retained ~s", [emqttd_message:format(Msg)]), + lager:debug("RETAIN ~s", [emqttd_message:format(Msg)]), mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]), emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size)); {false, _}-> diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index b4f82c043..1d02ab46c 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_ws_client). -author("Feng Lee "). @@ -46,10 +45,13 @@ terminate/2, code_change/3]). %% WebSocket Loop State --record(wsocket_state, {request, client_pid, packet_opts, parser}). +-record(wsocket_state, {request, client_pid, packet_opts, parser_fun}). -%% Client State --record(client_state, {ws_pid, request, proto_state, keepalive}). +%% WebSocket Client State +-record(wsclient_state, {ws_pid, request, proto_state, keepalive}). + +-define(WSLOG(Level, Format, Args, Req), + lager:Level("WsClient(~s): " ++ Format, [Req:get(peer) | Args])). %%------------------------------------------------------------------------------ %% @doc Start WebSocket client. @@ -57,12 +59,14 @@ %%------------------------------------------------------------------------------ start_link(Req) -> PktOpts = emqttd:env(mqtt, packet), + ParserFun = emqttd_parser:new(PktOpts), {ReentryWs, ReplyChannel} = upgrade(Req), - {ok, ClientPid} = gen_server:start_link(?MODULE, [self(), Req, ReplyChannel, PktOpts], []), - ReentryWs(#wsocket_state{request = Req, - client_pid = ClientPid, - packet_opts = PktOpts, - parser = emqttd_parser:new(PktOpts)}). + Params = [self(), Req, ReplyChannel, PktOpts], + {ok, ClientPid} = gen_server:start_link(?MODULE, Params, []), + ReentryWs(#wsocket_state{request = Req, + client_pid = ClientPid, + packet_opts = PktOpts, + parser_fun = ParserFun}). session(CPid) -> gen_server:call(CPid, session, infinity). @@ -97,22 +101,25 @@ ws_loop([<<>>], State, _ReplyChannel) -> State; ws_loop(Data, State = #wsocket_state{request = Req, client_pid = ClientPid, - parser = Parser}, ReplyChannel) -> - Peer = Req:get(peer), - lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]), - case Parser(iolist_to_binary(Data)) of + parser_fun = ParserFun}, ReplyChannel) -> + ?WSLOG(debug, "RECV ~p", [Data], Req), + case catch ParserFun(iolist_to_binary(Data)) of {more, NewParser} -> - State#wsocket_state{parser = NewParser}; + State#wsocket_state{parser_fun = NewParser}; {ok, Packet, Rest} -> gen_server:cast(ClientPid, {received, Packet}), ws_loop(Rest, reset_parser(State), ReplyChannel); {error, Error} -> - lager:error("MQTT(WebSocket) frame error ~p for connection ~s", [Error, Peer]), - exit({shutdown, Error}) + ?WSLOG(error, "Frame error: ~p", [Error], Req), + exit({shutdown, Error}); + {'EXIT', Reason} -> + ?WSLOG(error, "Frame error: ~p", [Reason], Req), + ?WSLOG(error, "Error data: ~p", [Data], Req), + exit({shutdown, parser_error}) end. reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> - State#wsocket_state{parser = emqttd_parser:new(PktOpts)}. + State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}. %%%============================================================================= %%% gen_server callbacks @@ -125,98 +132,105 @@ init([WsPid, Req, ReplyChannel, PktOpts]) -> Headers = mochiweb_request:get(headers, Req), HeadersList = mochiweb_headers:to_list(Headers), ProtoState = emqttd_protocol:init(Peername, SendFun, - [{ws_initial_headers, HeadersList}|PktOpts]), - {ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. + [{ws_initial_headers, HeadersList} | PktOpts]), + {ok, #wsclient_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. -handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> +handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> {reply, emqttd_protocol:session(ProtoState), State}; -handle_call(info, _From, State = #client_state{request = Req, - proto_state = ProtoState}) -> - {reply, [{websocket, true}, {peer, Req:get(peer)} - | emqttd_protocol:info(ProtoState)], State}; +handle_call(info, _From, State = #wsclient_state{request = Req, + proto_state = ProtoState}) -> + ProtoInfo = emqttd_protocol:info(ProtoState), + {reply, [{websocket, true}, {peer, Req:get(peer)}| ProtoInfo], State}; handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; -handle_call(_Req, _From, State) -> - {reply, error, State}. +handle_call(Req, _From, State = #wsclient_state{request = HttpReq}) -> + ?WSLOG(critical, "Unexpected request: ~p", [Req], HttpReq), + {reply, {error, unsupported_request}, State}. handle_cast({subscribe, TopicTable}, State) -> - with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State); + with_session(fun(SessPid) -> + emqttd_session:subscribe(SessPid, TopicTable) + end, State); handle_cast({unsubscribe, Topics}, State) -> - with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); + with_session(fun(SessPid) -> + emqttd_session:unsubscribe(SessPid, Topics) + end, State); -handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) -> +handle_cast({received, Packet}, State = #wsclient_state{request = Req, + proto_state = ProtoState}) -> case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - noreply(State#client_state{proto_state = ProtoState1}); + noreply(State#wsclient_state{proto_state = ProtoState1}); {error, Error} -> - lager:error("MQTT protocol error ~p", [Error]), - stop({shutdown, Error}, State); + ?WSLOG(error, "Protocol error - ~p", [Error], Req), + shutdown(Error, State); {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#client_state{proto_state = ProtoState1}); + shutdown(Error, State#wsclient_state{proto_state = ProtoState1}); {stop, Reason, ProtoState1} -> - stop(Reason, State#client_state{proto_state = ProtoState1}) + stop(Reason, State#wsclient_state{proto_state = ProtoState1}) end; -handle_cast(_Msg, State) -> +handle_cast(Msg, State = #wsclient_state{request = Req}) -> + ?WSLOG(critical, "Unexpected msg: ~p", [Msg], Req), {noreply, State}. -%% Asynchronous SUBACK -handle_info({suback, PacketId, GrantedQos}, State = #client_state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState), - noreply(State#client_state{proto_state = ProtoState1}); +handle_info({suback, PacketId, GrantedQos}, State) -> + with_proto_state(fun(ProtoState) -> + Packet = ?SUBACK_PACKET(PacketId, GrantedQos), + emqttd_protocol:send(Packet, ProtoState) + end, State); -handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), - noreply(State#client_state{proto_state = ProtoState1}); +handle_info({deliver, Message}, State) -> + with_proto_state(fun(ProtoState) -> + emqttd_protocol:send(Message, ProtoState) + end, State); -handle_info({redeliver, {?PUBREL, PacketId}}, State = #client_state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), - noreply(State#client_state{proto_state = ProtoState1}); +handle_info({redeliver, {?PUBREL, PacketId}}, State) -> + with_proto_state(fun(ProtoState) -> + emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState) + end, State); -handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = ProtoState}) -> - lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]), - stop({shutdown, duplicate_id}, State); +handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{request = Req}) -> + ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], Req), + shutdown(confict, State); -handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}) -> - lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]), - Socket = Req:get(socket), +handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req}) -> + ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], Req), + Conn = Req:get(connection), StatFun = fun() -> - case esockd_transport:getstat(Socket, [recv_oct]) of + case Conn:getstat([recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; {error, Error} -> {error, Error} end end, - KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}), - noreply(State#client_state{keepalive = KeepAlive}); + KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), + noreply(State#wsclient_state{keepalive = KeepAlive}); -handle_info({keepalive, check}, State = #client_state{request = Req, keepalive = KeepAlive}) -> +handle_info({keepalive, check}, State = #wsclient_state{request = Req, + keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> - noreply(State#client_state{keepalive = KeepAlive1}); + noreply(State#wsclient_state{keepalive = KeepAlive1}); {error, timeout} -> - lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), - stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); + ?WSLOG(debug, "Keepalive Timeout!", [], Req), + shutdown(keepalive_timeout, State); {error, Error} -> - lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]), - stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined}) + ?WSLOG(warning, "Keepalive error - ~p", [Error], Req), + shutdown(keepalive_error, State) end; -handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, - proto_state = ProtoState}) -> - ClientId = emqttd_protocol:clientid(ProtoState), - lager:warning("Websocket client ~s exit: reason=~p", [ClientId, Reason]), - stop({shutdown, websocket_closed}, State); +handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) -> + stop(Reason, State); -handle_info(Info, State = #client_state{request = Req}) -> - lager:error("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]), +handle_info(Info, State = #wsclient_state{request = Req}) -> + ?WSLOG(error, "Unexpected Info: ~p", [Info], Req), noreply(State). -terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) -> - lager:info("WebSocket client terminated: ~p", [Reason]), +terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) -> emqttd_keepalive:cancel(KeepAlive), case Reason of {shutdown, Error} -> @@ -232,12 +246,19 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= +with_proto_state(Fun, State = #wsclient_state{proto_state = ProtoState}) -> + {ok, ProtoState1} = Fun(ProtoState), + noreply(State#wsclient_state{proto_state = ProtoState1}). + +with_session(Fun, State = #wsclient_state{proto_state = ProtoState}) -> + Fun(emqttd_protocol:session(ProtoState)), noreply(State). + noreply(State) -> {noreply, State, hibernate}. +shutdown(Reason, State) -> + stop({shutdown, Reason}, State). + stop(Reason, State ) -> {stop, Reason, State}. -with_session(Fun, State = #client_state{proto_state = ProtoState}) -> - Fun(emqttd_protocol:session(ProtoState)), noreply(State). -