From 3fec9cdf0a792ad113cac237d80313b3bf8362c6 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 21 Dec 2018 11:56:22 +0800 Subject: [PATCH] Try to simulate a '{ssl_passive, Sock}' message:( --- src/emqx_connection.erl | 54 +++++++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 15 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ddb7e6a85..1714632a8 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -214,8 +214,8 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> handle_info({timeout, Timer, emit_stats}, State = #state{stats_timer = Timer, - proto_state = ProtoState - }) -> + proto_state = ProtoState, + gc_state = GcState}) -> emqx_metrics:commit(), emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), NewState = State#state{stats_timer = undefined}, @@ -224,8 +224,9 @@ handle_info({timeout, Timer, emit_stats}, continue -> {noreply, NewState}; hibernate -> - ok = emqx_gc:reset(), - {noreply, NewState, hibernate}; + %% going to hibernate, reset gc stats + GcState1 = emqx_gc:reset(GcState), + {noreply, NewState#state{gc_state = GcState1}, hibernate}; {shutdown, Reason} -> ?LOG(warning, "shutdown due to ~p", [Reason]), shutdown(Reason, NewState) @@ -246,22 +247,29 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> shutdown(conflict, State); handle_info({tcp, _Sock, Data}, State) -> - ?LOG(debug, "RECV ~p", [Data]), - Oct = iolist_size(Data), - emqx_pd:update_counter(incoming_bytes, Oct), - emqx_metrics:trans(inc, 'bytes/received', Oct), - handle_packet(Data, maybe_gc({1, Oct}, State)); + process_incoming(Data, State); +%% FIXME Later +handle_info({ssl, _Sock, Data}, State) -> + process_incoming(Data, run_socket(State)); %% Rate limit here, cool:) handle_info({tcp_passive, _Sock}, State) -> {noreply, run_socket(ensure_rate_limit(State))}; +%% FIXME Later +handle_info({ssl_passive, _Sock}, State) -> + {noreply, run_socket(ensure_rate_limit(State))}; handle_info({tcp_error, _Sock, Reason}, State) -> shutdown(Reason, State); +handle_info({ssl_error, _Sock, Reason}, State) -> + shutdown(Reason, State); handle_info({tcp_closed, _Sock}, State) -> shutdown(closed, State); +handle_info({ssl_closed, _Sock}, State) -> + shutdown(closed, State); +%% Rate limit timer handle_info(activate_sock, State) -> {noreply, run_socket(State#state{conn_state = running, limit_timer = undefined})}; @@ -319,12 +327,24 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%------------------------------------------------------------------------------ -%% Parse and handle packets +%% Internals: process incoming, parse and handle packets %%------------------------------------------------------------------------------ -%% Receive and parse data +process_incoming(Data, State) -> + Oct = iolist_size(Data), + ?LOG(debug, "RECV ~p", [Data]), + emqx_pd:update_counter(incoming_bytes, Oct), + emqx_metrics:trans(inc, 'bytes/received', Oct), + case handle_packet(Data, State) of + {noreply, State1} -> + State2 = maybe_gc({1, Oct}, State1), + {noreply, ensure_stats_timer(State2)}; + Shutdown -> Shutdown + end. + +%% Parse and handle packets handle_packet(<<>>, State) -> - {noreply, ensure_stats_timer(State)}; + {noreply, State}; handle_packet(Data, State = #state{proto_state = ProtoState, parser_state = ParserState, @@ -384,8 +404,8 @@ run_socket(State = #state{conn_state = blocked}) -> State; run_socket(State = #state{transport = Transport, socket = Socket, - active_n = ActiveN}) -> - Transport:setopts(Socket, [{active, ActiveN}]), + active_n = N}) -> + ensure_ok_or_exit(Transport:setopts(Socket, [{active, N}])), State. %%------------------------------------------------------------------------------ @@ -393,7 +413,7 @@ run_socket(State = #state{transport = Transport, %%------------------------------------------------------------------------------ ensure_stats_timer(State = #state{enable_stats = true, - stats_timer = undefined, + stats_timer = undefined, idle_timeout = IdleTimeout}) -> State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; ensure_stats_timer(State) -> State. @@ -415,4 +435,8 @@ maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) -> maybe_gc(_, State) -> State. +ensure_ok_or_exit(ok) -> + ok; +ensure_ok_or_exit({error, Reason}) -> + self() ! {shutdown, Reason}.