From 47710c36aa406b95171d593f98b11fbc83adfa38 Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 26 Oct 2015 09:25:57 +0800 Subject: [PATCH] port_command --- src/emqttd_client.erl | 126 +++++++++++++++++++++++++----------------- 1 file changed, 75 insertions(+), 51 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index de7c10766..b8909ff32 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% MQTT Client +%%% MQTT Client Connection. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -52,7 +52,7 @@ conn_name, await_recv, conn_state, - conserve, + rate_limiter, parser, proto_state, packet_opts, @@ -85,22 +85,26 @@ unsubscribe(CPid, Topics) -> init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) -> % Transform if ssl. - {ok, NewSock} = esockd_connection:accept(SockArgs), + {ok, NewSock} = esockd_connection:accept(SockArgs), + %%TODO:... + {ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]), + io:format("~p~n", [BufSizes]), {ok, Peername} = emqttd_net:peername(Sock), - {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), - SendFun = fun(Data) -> Transport:send(NewSock, Data) end, - PktOpts = proplists:get_value(packet, MqttEnv), + {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), + SendFun = send_fun(Transport, NewSock), + PktOpts = proplists:get_value(packet, MqttEnv), ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts), - State = control_throttle(#state{transport = Transport, - socket = NewSock, - peername = Peername, - conn_name = ConnStr, - await_recv = false, - conn_state = running, - conserve = false, - packet_opts = PktOpts, - parser = emqttd_parser:new(PktOpts), - proto_state = ProtoState}), + Limiter = proplists:get_value(rate_limiter, MqttEnv), + State = run_socket(#state{transport = Transport, + socket = NewSock, + peername = Peername, + conn_name = ConnStr, + await_recv = false, + conn_state = running, + rate_limiter = Limiter, + packet_opts = PktOpts, + parser = emqttd_parser:new(PktOpts), + proto_state = ProtoState}), ClientOpts = proplists:get_value(client, MqttEnv), IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10), gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)). @@ -146,20 +150,26 @@ handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = Prot {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), noreply(State#state{proto_state = ProtoState1}); -handle_info({inet_reply, _Ref, ok}, State) -> - noreply(State); +handle_info(activate_sock, State) -> + noreply(run_socket(State#state{conn_state = running})); handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) -> + Size = size(Data), lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]), - emqttd_metrics:inc('bytes/received', size(Data)), - received(Data, control_throttle(State #state{await_recv = false})); + emqttd_metrics:inc('bytes/received', Size), + received(Data, rate_limit(Size, State#state{await_recv = false})); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> + %%TODO: ... network_error(Reason, State); +handle_info({inet_reply, _Ref, ok}, State) -> + %%TODO: ok... + io:format("inet_reply ok~n"), + noreply(State); + handle_info({inet_reply, _Sock, {error, Reason}}, State) -> - ?ERROR("Unexpected inet_reply - ~p", [Reason], State), - {noreply, State}; + network_error(Reason, State); handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) -> ?DEBUG("Start KeepAlive with ~p seconds", [TimeoutSec], State), @@ -174,14 +184,14 @@ handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of - {ok, KeepAlive1} -> - noreply(State#state{keepalive = KeepAlive1}); - {error, timeout} -> - ?DEBUG("Keepalive Timeout!", [], State), - stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); - {error, Error} -> - ?DEBUG("Keepalive Error - ~p", [Error], State), - stop({shutdown, keepalive_error}, State#state{keepalive = undefined}) + {ok, KeepAlive1} -> + noreply(State#state{keepalive = KeepAlive1}); + {error, timeout} -> + ?DEBUG("Keepalive Timeout!", [], State), + stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined}); + {error, Error} -> + ?DEBUG("Keepalive Error - ~p", [Error], State), + stop({shutdown, keepalive_error}, State#state{keepalive = undefined}) end; handle_info(Info, State) -> @@ -223,27 +233,27 @@ with_session(Fun, State = #state{proto_state = ProtoState}) -> %% receive and parse tcp data received(<<>>, State) -> - {noreply, State, hibernate}; + noreply(State); -received(Bytes, State = #state{packet_opts = PacketOpts, - parser = Parser, +received(Bytes, State = #state{parser = Parser, + packet_opts = PacketOpts, proto_state = ProtoState}) -> case catch Parser(Bytes) of {more, NewParser} -> - noreply(control_throttle(State#state{parser = NewParser})); + noreply(run_socket(State#state{parser = NewParser})); {ok, Packet, Rest} -> emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of - {ok, ProtoState1} -> - received(Rest, State#state{parser = emqttd_parser:new(PacketOpts), - proto_state = ProtoState1}); - {error, Error} -> - ?ERROR("Protocol error - ~p", [Error], State), - stop({shutdown, Error}, State); - {error, Error, ProtoState1} -> - stop({shutdown, Error}, State#state{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - stop(Reason, State#state{proto_state = ProtoState1}) + {ok, ProtoState1} -> + received(Rest, State#state{parser = emqttd_parser:new(PacketOpts), + proto_state = ProtoState1}); + {error, Error} -> + ?ERROR("Protocol error - ~p", [Error], State), + stop({shutdown, Error}, State); + {error, Error, ProtoState1} -> + stop({shutdown, Error}, State#state{proto_state = ProtoState1}); + {stop, Reason, ProtoState1} -> + stop(Reason, State#state{proto_state = ProtoState1}) end; {error, Error} -> ?ERROR("Framing error - ~p", [Error], State), @@ -258,6 +268,20 @@ network_error(Reason, State = #state{peername = Peername}) -> [emqttd_net:format(Peername), Reason]), stop({shutdown, conn_closed}, State). +rate_limit(_Size, State = #state{rate_limiter = undefined}) -> + run_socket(State); +rate_limit(Size, State = #state{socket = Sock, rate_limiter = Limiter}) -> + {ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]), + io:format("~p~n", [BufSizes]), + case esockd_rate_limiter:check(Limiter, Size) of + {0, Limiter1} -> + run_socket(State#state{conn_state = running, rate_limiter = Limiter1}); + {Pause, Limiter1} -> + ?ERROR("~p Received, Rate Limiter Pause for ~w", [Size, Pause], State), + erlang:send_after(Pause, self(), activate_sock), + State#state{conn_state = blocked, rate_limiter = Limiter1} + end. + run_socket(State = #state{conn_state = blocked}) -> State; run_socket(State = #state{await_recv = true}) -> @@ -266,11 +290,11 @@ run_socket(State = #state{transport = Transport, socket = Sock}) -> Transport:async_recv(Sock, 0, infinity), State#state{await_recv = true}. -control_throttle(State = #state{conn_state = Flow, - conserve = Conserve}) -> - case {Flow, Conserve} of - {running, true} -> State #state{conn_state = blocked}; - {blocked, false} -> run_socket(State #state{conn_state = running}); - {_, _} -> run_socket(State) +send_fun(Transport, Sock) -> + fun(Data) -> + try Transport:port_command(Sock, Data) of + true -> ok + catch + error:Error -> exit({socket_error, Error}) + end end. -