0.13 refactor

This commit is contained in:
Feng 2015-10-31 18:43:04 +08:00
parent 3f41a6c241
commit cf95c5e1b9
5 changed files with 104 additions and 86 deletions

View File

@ -177,7 +177,7 @@ handle_info(activate_sock, State) ->
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
Size = size(Data), Size = size(Data),
?LOG(debug, "RECV <- ~p", [Data], State), ?LOG(debug, "RECV ~p", [Data], State),
emqttd_metrics:inc('bytes/received', Size), emqttd_metrics:inc('bytes/received', Size),
received(Data, rate_limit(Size, State#client_state{await_recv = false})); received(Data, rate_limit(Size, State#client_state{await_recv = false}));

View File

@ -24,7 +24,6 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_message). -module(emqttd_message).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
@ -170,14 +169,12 @@ unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
%% @doc Format MQTT Message %% @doc Format MQTT Message
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
format(#mqtt_message{msgid=MsgId, format(#mqtt_message{msgid = MsgId, pktid = PktId, from = From,
pktid = PktId, qos = Qos, retain = Retain, dup = Dup, topic =Topic}) ->
from = From, io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Topic=~s)",
qos=Qos, [i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Topic]).
retain=Retain,
dup=Dup, i(true) -> 1;
topic=Topic}) -> i(false) -> 0;
io_lib:format("Message(MsgId=~p, PktId=~p, from=~s, " i(I) when is_integer(I) -> I.
"Qos=~p, Retain=~s, Dup=~s, Topic=~s)",
[MsgId, PktId, From, Qos, Retain, Dup, Topic]).

View File

@ -263,7 +263,7 @@ send(Packet, State = #proto_state{sendfun = SendFun})
trace(send, Packet, State), trace(send, Packet, State),
emqttd_metrics:sent(Packet), emqttd_metrics:sent(Packet),
Data = emqttd_serialiser:serialise(Packet), Data = emqttd_serialiser:serialise(Packet),
?LOG(debug, "SENT -> ~p", [Data], State), ?LOG(debug, "SEND ~p", [Data], State),
emqttd_metrics:inc('bytes/sent', size(Data)), emqttd_metrics:inc('bytes/sent', size(Data)),
SendFun(Data), SendFun(Data),
{ok, State}. {ok, State}.

View File

@ -98,7 +98,7 @@ retain(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}) ->
case {TabSize < limit(table), size(Payload) < limit(payload)} of case {TabSize < limit(table), size(Payload) < limit(payload)} of
{true, true} -> {true, true} ->
Retained = #mqtt_retained{topic = Topic, message = Msg}, Retained = #mqtt_retained{topic = Topic, message = Msg},
lager:debug("Retained ~s", [emqttd_message:format(Msg)]), lager:debug("RETAIN ~s", [emqttd_message:format(Msg)]),
mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]), mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]),
emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size)); emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size));
{false, _}-> {false, _}->

View File

@ -24,7 +24,6 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_ws_client). -module(emqttd_ws_client).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
@ -46,10 +45,13 @@
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% WebSocket Loop State %% WebSocket Loop State
-record(wsocket_state, {request, client_pid, packet_opts, parser}). -record(wsocket_state, {request, client_pid, packet_opts, parser_fun}).
%% Client State %% WebSocket Client State
-record(client_state, {ws_pid, request, proto_state, keepalive}). -record(wsclient_state, {ws_pid, request, proto_state, keepalive}).
-define(WSLOG(Level, Format, Args, Req),
lager:Level("WsClient(~s): " ++ Format, [Req:get(peer) | Args])).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Start WebSocket client. %% @doc Start WebSocket client.
@ -57,12 +59,14 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
start_link(Req) -> start_link(Req) ->
PktOpts = emqttd:env(mqtt, packet), PktOpts = emqttd:env(mqtt, packet),
ParserFun = emqttd_parser:new(PktOpts),
{ReentryWs, ReplyChannel} = upgrade(Req), {ReentryWs, ReplyChannel} = upgrade(Req),
{ok, ClientPid} = gen_server:start_link(?MODULE, [self(), Req, ReplyChannel, PktOpts], []), Params = [self(), Req, ReplyChannel, PktOpts],
{ok, ClientPid} = gen_server:start_link(?MODULE, Params, []),
ReentryWs(#wsocket_state{request = Req, ReentryWs(#wsocket_state{request = Req,
client_pid = ClientPid, client_pid = ClientPid,
packet_opts = PktOpts, packet_opts = PktOpts,
parser = emqttd_parser:new(PktOpts)}). parser_fun = ParserFun}).
session(CPid) -> session(CPid) ->
gen_server:call(CPid, session, infinity). gen_server:call(CPid, session, infinity).
@ -97,22 +101,25 @@ ws_loop([<<>>], State, _ReplyChannel) ->
State; State;
ws_loop(Data, State = #wsocket_state{request = Req, ws_loop(Data, State = #wsocket_state{request = Req,
client_pid = ClientPid, client_pid = ClientPid,
parser = Parser}, ReplyChannel) -> parser_fun = ParserFun}, ReplyChannel) ->
Peer = Req:get(peer), ?WSLOG(debug, "RECV ~p", [Data], Req),
lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]), case catch ParserFun(iolist_to_binary(Data)) of
case Parser(iolist_to_binary(Data)) of
{more, NewParser} -> {more, NewParser} ->
State#wsocket_state{parser = NewParser}; State#wsocket_state{parser_fun = NewParser};
{ok, Packet, Rest} -> {ok, Packet, Rest} ->
gen_server:cast(ClientPid, {received, Packet}), gen_server:cast(ClientPid, {received, Packet}),
ws_loop(Rest, reset_parser(State), ReplyChannel); ws_loop(Rest, reset_parser(State), ReplyChannel);
{error, Error} -> {error, Error} ->
lager:error("MQTT(WebSocket) frame error ~p for connection ~s", [Error, Peer]), ?WSLOG(error, "Frame error: ~p", [Error], Req),
exit({shutdown, Error}) exit({shutdown, Error});
{'EXIT', Reason} ->
?WSLOG(error, "Frame error: ~p", [Reason], Req),
?WSLOG(error, "Error data: ~p", [Data], Req),
exit({shutdown, parser_error})
end. end.
reset_parser(State = #wsocket_state{packet_opts = PktOpts}) -> reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
State#wsocket_state{parser = emqttd_parser:new(PktOpts)}. State#wsocket_state{parser_fun = emqttd_parser:new(PktOpts)}.
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -126,97 +133,104 @@ init([WsPid, Req, ReplyChannel, PktOpts]) ->
HeadersList = mochiweb_headers:to_list(Headers), HeadersList = mochiweb_headers:to_list(Headers),
ProtoState = emqttd_protocol:init(Peername, SendFun, ProtoState = emqttd_protocol:init(Peername, SendFun,
[{ws_initial_headers, HeadersList} | PktOpts]), [{ws_initial_headers, HeadersList} | PktOpts]),
{ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}. {ok, #wsclient_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
{reply, emqttd_protocol:session(ProtoState), State}; {reply, emqttd_protocol:session(ProtoState), State};
handle_call(info, _From, State = #client_state{request = Req, handle_call(info, _From, State = #wsclient_state{request = Req,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
{reply, [{websocket, true}, {peer, Req:get(peer)} ProtoInfo = emqttd_protocol:info(ProtoState),
| emqttd_protocol:info(ProtoState)], State}; {reply, [{websocket, true}, {peer, Req:get(peer)}| ProtoInfo], State};
handle_call(kick, _From, State) -> handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State}; {stop, {shutdown, kick}, ok, State};
handle_call(_Req, _From, State) -> handle_call(Req, _From, State = #wsclient_state{request = HttpReq}) ->
{reply, error, State}. ?WSLOG(critical, "Unexpected request: ~p", [Req], HttpReq),
{reply, {error, unsupported_request}, State}.
handle_cast({subscribe, TopicTable}, State) -> handle_cast({subscribe, TopicTable}, State) ->
with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State); with_session(fun(SessPid) ->
emqttd_session:subscribe(SessPid, TopicTable)
end, State);
handle_cast({unsubscribe, Topics}, State) -> handle_cast({unsubscribe, Topics}, State) ->
with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); with_session(fun(SessPid) ->
emqttd_session:unsubscribe(SessPid, Topics)
end, State);
handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) -> handle_cast({received, Packet}, State = #wsclient_state{request = Req,
proto_state = ProtoState}) ->
case emqttd_protocol:received(Packet, ProtoState) of case emqttd_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} -> {ok, ProtoState1} ->
noreply(State#client_state{proto_state = ProtoState1}); noreply(State#wsclient_state{proto_state = ProtoState1});
{error, Error} -> {error, Error} ->
lager:error("MQTT protocol error ~p", [Error]), ?WSLOG(error, "Protocol error - ~p", [Error], Req),
stop({shutdown, Error}, State); shutdown(Error, State);
{error, Error, ProtoState1} -> {error, Error, ProtoState1} ->
stop({shutdown, Error}, State#client_state{proto_state = ProtoState1}); shutdown(Error, State#wsclient_state{proto_state = ProtoState1});
{stop, Reason, ProtoState1} -> {stop, Reason, ProtoState1} ->
stop(Reason, State#client_state{proto_state = ProtoState1}) stop(Reason, State#wsclient_state{proto_state = ProtoState1})
end; end;
handle_cast(_Msg, State) -> handle_cast(Msg, State = #wsclient_state{request = Req}) ->
?WSLOG(critical, "Unexpected msg: ~p", [Msg], Req),
{noreply, State}. {noreply, State}.
%% Asynchronous SUBACK handle_info({suback, PacketId, GrantedQos}, State) ->
handle_info({suback, PacketId, GrantedQos}, State = #client_state{proto_state = ProtoState}) -> with_proto_state(fun(ProtoState) ->
{ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState), Packet = ?SUBACK_PACKET(PacketId, GrantedQos),
noreply(State#client_state{proto_state = ProtoState1}); emqttd_protocol:send(Packet, ProtoState)
end, State);
handle_info({deliver, Message}, State = #client_state{proto_state = ProtoState}) -> handle_info({deliver, Message}, State) ->
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), with_proto_state(fun(ProtoState) ->
noreply(State#client_state{proto_state = ProtoState1}); emqttd_protocol:send(Message, ProtoState)
end, State);
handle_info({redeliver, {?PUBREL, PacketId}}, State = #client_state{proto_state = ProtoState}) -> handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), with_proto_state(fun(ProtoState) ->
noreply(State#client_state{proto_state = ProtoState1}); emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState)
end, State);
handle_info({stop, duplicate_id, _NewPid}, State = #client_state{proto_state = ProtoState}) -> handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{request = Req}) ->
lager:error("Shutdown for duplicate clientid: ~s", [emqttd_protocol:clientid(ProtoState)]), ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], Req),
stop({shutdown, duplicate_id}, State); shutdown(confict, State);
handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}) -> handle_info({keepalive, start, Interval}, State = #wsclient_state{request = Req}) ->
lager:debug("Client(WebSocket) ~s: Start KeepAlive with ~p seconds", [Req:get(peer), TimeoutSec]), ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], Req),
Socket = Req:get(socket), Conn = Req:get(connection),
StatFun = fun() -> StatFun = fun() ->
case esockd_transport:getstat(Socket, [recv_oct]) of case Conn:getstat([recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
{error, Error} -> {error, Error} {error, Error} -> {error, Error}
end end
end, end,
KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}), KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
noreply(State#client_state{keepalive = KeepAlive}); noreply(State#wsclient_state{keepalive = KeepAlive});
handle_info({keepalive, check}, State = #client_state{request = Req, keepalive = KeepAlive}) -> handle_info({keepalive, check}, State = #wsclient_state{request = Req,
keepalive = KeepAlive}) ->
case emqttd_keepalive:check(KeepAlive) of case emqttd_keepalive:check(KeepAlive) of
{ok, KeepAlive1} -> {ok, KeepAlive1} ->
noreply(State#client_state{keepalive = KeepAlive1}); noreply(State#wsclient_state{keepalive = KeepAlive1});
{error, timeout} -> {error, timeout} ->
lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]), ?WSLOG(debug, "Keepalive Timeout!", [], Req),
stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined}); shutdown(keepalive_timeout, State);
{error, Error} -> {error, Error} ->
lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]), ?WSLOG(warning, "Keepalive error - ~p", [Error], Req),
stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined}) shutdown(keepalive_error, State)
end; end;
handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{ws_pid = WsPid}) ->
proto_state = ProtoState}) -> stop(Reason, State);
ClientId = emqttd_protocol:clientid(ProtoState),
lager:warning("Websocket client ~s exit: reason=~p", [ClientId, Reason]),
stop({shutdown, websocket_closed}, State);
handle_info(Info, State = #client_state{request = Req}) -> handle_info(Info, State = #wsclient_state{request = Req}) ->
lager:error("Client(WebSocket) ~s: Unexpected Info - ~p", [Req:get(peer), Info]), ?WSLOG(error, "Unexpected Info: ~p", [Info], Req),
noreply(State). noreply(State).
terminate(Reason, #client_state{proto_state = ProtoState, keepalive = KeepAlive}) -> terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
lager:info("WebSocket client terminated: ~p", [Reason]),
emqttd_keepalive:cancel(KeepAlive), emqttd_keepalive:cancel(KeepAlive),
case Reason of case Reason of
{shutdown, Error} -> {shutdown, Error} ->
@ -232,12 +246,19 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
with_proto_state(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
{ok, ProtoState1} = Fun(ProtoState),
noreply(State#wsclient_state{proto_state = ProtoState1}).
with_session(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
Fun(emqttd_protocol:session(ProtoState)), noreply(State).
noreply(State) -> noreply(State) ->
{noreply, State, hibernate}. {noreply, State, hibernate}.
shutdown(Reason, State) ->
stop({shutdown, Reason}, State).
stop(Reason, State ) -> stop(Reason, State ) ->
{stop, Reason, State}. {stop, Reason, State}.
with_session(Fun, State = #client_state{proto_state = ProtoState}) ->
Fun(emqttd_protocol:session(ProtoState)), noreply(State).