From f4cf90ae91d18538db261538c6c917e81fad11f3 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 30 Oct 2015 16:00:11 +0800 Subject: [PATCH] refactor client --- rebar.config | 4 +- src/emqttd_client.erl | 314 +++++++++++++++++++++--------------------- 2 files changed, 161 insertions(+), 157 deletions(-) diff --git a/rebar.config b/rebar.config index 9dcb12ee6..4b39a2f2b 100644 --- a/rebar.config +++ b/rebar.config @@ -29,8 +29,8 @@ {deps, [ {gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}}, {lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}}, - {esockd, "2.*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}}, - {mochiweb, ".*", {git, "git://github.com/emqtt/mochiweb.git", {branch, "master"}}} + {esockd, "3.*", {git, "git://github.com/emqtt/esockd.git", {branch, "3.0"}}}, + {mochiweb, ".*", {git, "git://github.com/emqtt/mochiweb.git", {branch, "4.0"}}} ]}. {recursive_cmds, [ct, eunit, clean]}. diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 247fe7500..e8a143734 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -24,7 +24,6 @@ %%% %%% @end %%%----------------------------------------------------------------------------- - -module(emqttd_client). -author("Feng Lee "). @@ -33,40 +32,35 @@ -include("emqttd_protocol.hrl"). +-include("emqttd_internel.hrl"). + +-behaviour(gen_server). + %% API Function Exports -export([start_link/2, session/1, info/1, kick/1]). -%% SUB/UNSUB Asynchronously +%% SUB/UNSUB Asynchronously, called by plugins. -export([subscribe/2, unsubscribe/2]). --behaviour(gen_server). - %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -%% Client State... --record(state, {transport, - socket, - peername, - conn_name, - await_recv, - conn_state, - rate_limiter, - parser, - proto_state, - packet_opts, - keepalive}). +%% Client State +-record(client_state, {connection, peername, peerhost, peerport, + await_recv, conn_state, rate_limit, + parser_fun, proto_state, packet_opts, + keepalive}). --define(DEBUG(Format, Args, State), - lager:debug("Client(~s): " ++ Format, - [emqttd_net:format(State#state.peername) | Args])). --define(ERROR(Format, Args, State), - lager:error("Client(~s): " ++ Format, - [emqttd_net:format(State#state.peername) | Args])). +-define(INFO_KEYS, [peername, peerhost, peerport, await_recv, conn_state]). -start_link(SockArgs, MqttEnv) -> - {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}. +-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). + +-define(LOG(Level, Format, Args, State), + lager:Level("Client(~s): " ++ Format, [State#client_state.peername | Args])). + +start_link(Connection, MqttEnv) -> + {ok, proc_lib:spawn_link(?MODULE, init, [[Connection, MqttEnv]])}. session(CPid) -> gen_server:call(CPid, session, infinity). @@ -83,140 +77,156 @@ subscribe(CPid, TopicTable) -> unsubscribe(CPid, Topics) -> gen_server:cast(CPid, {unsubscribe, Topics}). -init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) -> - % Transform if ssl. - {ok, NewSock} = esockd_connection:accept(SockArgs), - %%TODO:... - {ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]), - io:format("~p~n", [BufSizes]), - {ok, Peername} = emqttd_net:peername(Sock), - {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), - SendFun = send_fun(Transport, NewSock), - PktOpts = proplists:get_value(packet, MqttEnv), - ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts), - Limiter = proplists:get_value(rate_limiter, MqttEnv), - State = run_socket(#state{transport = Transport, - socket = NewSock, - peername = Peername, - conn_name = ConnStr, - await_recv = false, - conn_state = running, - rate_limiter = Limiter, - packet_opts = PktOpts, - parser = emqttd_parser:new(PktOpts), - proto_state = ProtoState}), +init([Connection0, MqttEnv]) -> + {ok, Connection} = Connection0:wait(), + {PeerHost, PeerPort, PeerName} = + case Connection:peername() of + {ok, {Host, Port}} -> + {Host, Port, esockd_net:format({Host, Port})}; + {error, enotconn} -> + Connection:fast_close(), + exit(normal); + {error, Reason} -> + Connection:fast_close(), + exit({shutdown, Reason}) + end, + SendFun = fun(Data) -> + try Connection:async_send(Data) of + true -> ok + catch + error:Error -> exit({shutdown, Error}) + end + end, + PktOpts = proplists:get_value(packet, MqttEnv), + ParserFun = emqttd_parser:new(PktOpts), + ProtoState = emqttd_protocol:init(PeerName, SendFun, PktOpts), + RateLimit = proplists:get_value(rate_limit, Connection:opts()), + State = run_socket(#client_state{connection = Connection, + peername = PeerName, + peerhost = PeerHost, + peerport = PeerPort, + await_recv = false, + conn_state = running, + rate_limit = RateLimit, + parser_fun = ParserFun, + proto_state = ProtoState, + packet_opts = PktOpts}), ClientOpts = proplists:get_value(client, MqttEnv), IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10), gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)). -handle_call(session, _From, State = #state{proto_state = ProtoState}) -> +handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> {reply, emqttd_protocol:session(ProtoState), State}; -handle_call(info, _From, State = #state{conn_name = ConnName, - proto_state = ProtoState}) -> - {reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State}; +handle_call(info, _From, State = #client_state{connection = Connection, + proto_state = ProtoState}) -> + ClientInfo = [{Key, Val} || {Key, Val} <- ?record_to_proplist(client_state, State), lists:member(Key, ?INFO_KEYS)], + ProtoInfo = emqttd_protocol:info(ProtoState), + {ok, SockStats} = Connection:getstat(?SOCK_STATS), + Info = lists:append([ClientInfo, [{proto_info, ProtoInfo}, {sock_stats, SockStats}]]), + {reply, Info, State}; handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; handle_call(Req, _From, State) -> - ?ERROR("Unexpected request: ~p", [Req], State), - {reply, {error, unsupported_request}, State}. + ?LOG(critical, "Unexpected request: ~p", [Req], State), + {reply, {error, unsupported_request}, State}. handle_cast({subscribe, TopicTable}, State) -> - with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State); + with_session(fun(SessPid) -> + emqttd_session:subscribe(SessPid, TopicTable) + end, State); handle_cast({unsubscribe, Topics}, State) -> - with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State); + with_session(fun(SessPid) -> + emqttd_session:unsubscribe(SessPid, Topics) + end, State); handle_cast(Msg, State) -> - ?ERROR("Unexpected msg: ~p",[Msg], State), - {noreply, State}. + ?LOG(critical, "Unexpected msg: ~p", [Msg], State), + noreply(State). handle_info(timeout, State) -> - stop({shutdown, timeout}, State); + shutdown(idle_timeout, State); %% Asynchronous SUBACK -handle_info({suback, PacketId, GrantedQos}, State = #state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState), - noreply(State#state{proto_state = ProtoState1}); +handle_info({suback, PacketId, GrantedQos}, State) -> + with_proto_state(fun(ProtoState) -> + Packet = ?SUBACK_PACKET(PacketId, GrantedQos), + emqttd_protocol:send(Packet, ProtoState) + end, State); -handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), - noreply(State#state{proto_state = ProtoState1}); +handle_info({deliver, Message}, State) -> + with_proto_state(fun(ProtoState) -> + emqttd_protocol:send(Message, ProtoState) + end, State); -handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = ProtoState}) -> - {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), - noreply(State#state{proto_state = ProtoState1}); +handle_info({redeliver, {?PUBREL, PacketId}}, State) -> + with_proto_state(fun(ProtoState) -> + emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState) + end, State); -handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState, - conn_name = ConnName}) -> - lager:warning("Shutdown for duplicate clientid: ~s, conn:~s", - [emqttd_protocol:clientid(ProtoState), ConnName]), - stop({shutdown, duplicate_id}, State); +handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> + ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), + shutdown(confict, State); handle_info(activate_sock, State) -> - noreply(run_socket(State#state{conn_state = running})); + noreply(run_socket(State#client_state{conn_state = running})); -handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) -> +handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> Size = size(Data), - lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]), + ?LOG(debug, "RECV: ~p", [Data], State), emqttd_metrics:inc('bytes/received', Size), - received(Data, rate_limit(Size, State#state{await_recv = false})); + received(Data, rate_limit(Size, State#client_state{await_recv = false})); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> - %%TODO: ... - network_error(Reason, State); + shutdown(Reason, State); -handle_info({inet_reply, _Ref, ok}, State) -> - %%TODO: ok... - io:format("inet_reply ok~n"), +handle_info({inet_reply, _Sock, ok}, State) -> noreply(State); handle_info({inet_reply, _Sock, {error, Reason}}, State) -> - network_error(Reason, State); + shutdown(Reason, State); -handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) -> - ?DEBUG("Start KeepAlive with ~p seconds", [TimeoutSec], State), +handle_info({keepalive, start, Interval}, State = #client_state{connection = Connection}) -> + ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), StatFun = fun() -> - case Transport:getstat(Socket, [recv_oct]) of - {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; - {error, Error} -> {error, Error} - end - end, - KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}), - noreply(State#state{keepalive = KeepAlive}); + case Connection:getstat([recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + {error, Error} -> {error, Error} + end + end, + KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), + noreply(State#client_state{keepalive = KeepAlive}); -handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> +handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> noreply(State#state{keepalive = KeepAlive1}); {error, timeout} -> - ?DEBUG("Keepalive Timeout!", [], State), - stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); + ?LOG(debug, "Keepalive timeout", [], State), + shutdown(keepalive_timeout, State); {error, Error} -> - ?DEBUG("Keepalive Error - ~p", [Error], State), - stop({shutdown, keepalive_error}, State#state{keepalive = undefined}) + ?LOG(warning, "Keepalive error - ~p", [Error], State), + shutdown(Error, State) end; handle_info(Info, State) -> - ?ERROR("Unexpected info: ~p", [Info], State), - {noreply, State}. + ?LOG(critical, "Unexpected info: ~p", [Info], State), + noreply(State). -terminate(Reason, #state{transport = Transport, - socket = Socket, - keepalive = KeepAlive, - proto_state = ProtoState}) -> +terminate(Reason, #client_state{connection = Connection, + keepalive = KeepAlive, + proto_state = ProtoState}) -> + Connection:fast_close(), emqttd_keepalive:cancel(KeepAlive), - if - Reason == {shutdown, conn_closed} -> ok; - true -> Transport:fast_close(Socket) - end, case {ProtoState, Reason} of - {undefined, _} -> ok; + {undefined, _} -> + ok; {_, {shutdown, Error}} -> emqttd_protocol:shutdown(Error, ProtoState); - {_, Reason} -> + {_, Reason} -> emqttd_protocol:shutdown(Reason, ProtoState) end. @@ -227,79 +237,73 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -noreply(State) -> - {noreply, State, hibernate}. +with_proto_state(Fun, State = #client_state{proto_state = ProtoState}) -> + {ok, ProtoState1} = Fun(ProtoState), + noreply(State#client_state{proto_state = ProtoState1}). -stop(Reason, State) -> - {stop, Reason, State}. - -with_session(Fun, State = #state{proto_state = ProtoState}) -> - Fun(emqttd_protocol:session(ProtoState)), noreply(State). +with_session(Fun, State = #client_state{proto_state = ProtoState}) -> + Fun(emqttd_protocol:session(ProtoState)), + noreply(State). %% receive and parse tcp data received(<<>>, State) -> noreply(State); -received(Bytes, State = #state{parser = Parser, - packet_opts = PacketOpts, - proto_state = ProtoState}) -> - case catch Parser(Bytes) of +received(Bytes, State = #client_state{parser_fun = ParserFun, + packet_opts = PacketOpts, + proto_state = ProtoState}) -> + case catch ParserFun(Bytes) of {more, NewParser} -> - noreply(run_socket(State#state{parser = NewParser})); + noreply(run_socket(State#client_state{parser_fun = NewParser})); {ok, Packet, Rest} -> emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - received(Rest, State#state{parser = emqttd_parser:new(PacketOpts), - proto_state = ProtoState1}); + received(Rest, State#client_state{parser_fun = emqttd_parser:new(PacketOpts), + proto_state = ProtoState1}); {error, Error} -> - ?ERROR("Protocol error - ~p", [Error], State), - stop({shutdown, Error}, State); + ?LOG(error, "Protocol error - ~p", [Error], State), + shutdown(Error, State); {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#state{proto_state = ProtoState1}); + shutdown(Error, State#client_state{proto_state = ProtoState1}); {stop, Reason, ProtoState1} -> - stop(Reason, State#state{proto_state = ProtoState1}) + stop(Reason, State#client_state{proto_state = ProtoState1}) end; {error, Error} -> - ?ERROR("Framing error - ~p", [Error], State), - stop({shutdown, Error}, State); + ?LOG(error, "Framing error - ~p", [Error], State), + shutdown(Error, State); {'EXIT', Reason} -> - ?ERROR("Parser failed for ~p~nError Frame: ~p", [Reason, Bytes], State), - {stop, {shutdown, frame_error}, State} + ?LOG(error, "Parser failed for ~p", [Reason], State), + ?LOG(error, "Error data: ~p", [Bytes], State), + shutdown(parser_error, State) end. -network_error(Reason, State = #state{peername = Peername}) -> - lager:warning("Client(~s): network error - ~p", - [emqttd_net:format(Peername), Reason]), - stop({shutdown, conn_closed}, State). - -rate_limit(_Size, State = #state{rate_limiter = undefined}) -> +rate_limit(_Size, State = #client_state{rate_limit = undefined}) -> run_socket(State); -rate_limit(Size, State = #state{socket = Sock, rate_limiter = Limiter}) -> - {ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]), - io:format("~p~n", [BufSizes]), - case esockd_rate_limiter:check(Limiter, Size) of +rate_limit(Size, State = #client_state{rate_limit = Limiter}) -> + case esockd_ratelimit:check(Limiter, Size) of {0, Limiter1} -> - run_socket(State#state{conn_state = running, rate_limiter = Limiter1}); + run_socket(State#client_state{conn_state = running, rate_limit = Limiter1}); {Pause, Limiter1} -> - ?ERROR("~p Received, Rate Limiter Pause for ~w", [Size, Pause], State), + ?LOG(error, "Rate limiter pause for ~p", [Size, Pause], State), erlang:send_after(Pause, self(), activate_sock), - State#state{conn_state = blocked, rate_limiter = Limiter1} + State#client_state{conn_state = blocked, rate_limit = Limiter1} end. -run_socket(State = #state{conn_state = blocked}) -> +run_socket(State = #client_state{conn_state = blocked}) -> State; -run_socket(State = #state{await_recv = true}) -> +run_socket(State = #client_state{await_recv = true}) -> State; -run_socket(State = #state{transport = Transport, socket = Sock}) -> - Transport:async_recv(Sock, 0, infinity), - State#state{await_recv = true}. +run_socket(State = #client_state{connection = Connection}) -> + Connection:async_recv(0, infinity), + State#client_state{await_recv = true}. + +noreply(State) -> + {noreply, State, hibernate}. + +shutdown(Reason, State) -> + stop({shutdown, Reason}, State). + +stop(Reason, State) -> + {stop, Reason, State}. -send_fun(Transport, Sock) -> - fun(Data) -> - try Transport:port_command(Sock, Data) of - true -> ok - catch - error:Error -> exit({socket_error, Error}) - end - end.