From 10e5210581ea0097ebc0c28e811135c3a449d63c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 21 Dec 2018 15:39:24 +0800 Subject: [PATCH] Workaround ssl:setopts(SslSock, [{active, N}]) (#2095) * Set '{active, true}' for SSL socket --- src/emqx_connection.erl | 51 ++++++++++++++++++++------------------- test/emqx_stats_tests.erl | 4 +-- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 1714632a8..9f2572b32 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -246,11 +246,8 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); -handle_info({tcp, _Sock, Data}, State) -> +handle_info({TcpOrSsL, _Sock, Data}, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl -> process_incoming(Data, State); -%% FIXME Later -handle_info({ssl, _Sock, Data}, State) -> - process_incoming(Data, run_socket(State)); %% Rate limit here, cool:) handle_info({tcp_passive, _Sock}, State) -> @@ -259,14 +256,10 @@ handle_info({tcp_passive, _Sock}, State) -> handle_info({ssl_passive, _Sock}, State) -> {noreply, run_socket(ensure_rate_limit(State))}; -handle_info({tcp_error, _Sock, Reason}, State) -> - shutdown(Reason, State); -handle_info({ssl_error, _Sock, Reason}, State) -> +handle_info({Err, _Sock, Reason}, State) when Err =:= tcp_error; Err =:= ssl_error -> shutdown(Reason, State); -handle_info({tcp_closed, _Sock}, State) -> - shutdown(closed, State); -handle_info({ssl_closed, _Sock}, State) -> +handle_info({Closed, _Sock}, State) when Closed =:= tcp_closed; Closed =:= ssl_closed -> shutdown(closed, State); %% Rate limit timer @@ -380,7 +373,6 @@ reset_parser(State = #state{proto_state = ProtoState}) -> %%------------------------------------------------------------------------------ %% Ensure rate limit -%%------------------------------------------------------------------------------ ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl}) -> Limiters = [{Pl, #state.pub_limit, emqx_pd:reset_counter(incoming_pubs)}, @@ -400,17 +392,26 @@ ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> setelement(Pos, State#state{conn_state = blocked, limit_timer = TRef}, Rl1) end. +%%------------------------------------------------------------------------------ +%% Activate socket + run_socket(State = #state{conn_state = blocked}) -> State; -run_socket(State = #state{transport = Transport, - socket = Socket, - active_n = N}) -> - ensure_ok_or_exit(Transport:setopts(Socket, [{active, N}])), + +run_socket(State = #state{transport = Transport, socket = Socket, active_n = N}) -> + TrueOrN = case Transport:is_ssl(Socket) of + true -> true; %% Cannot set '{active, N}' for SSL:( + false -> N + end, + ensure_ok_or_exit(Transport:setopts(Socket, [{active, TrueOrN}])), State. +ensure_ok_or_exit(ok) -> ok; +ensure_ok_or_exit({error, Reason}) -> + self() ! {shutdown, Reason}. + %%------------------------------------------------------------------------------ %% Ensure stats timer -%%------------------------------------------------------------------------------ ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, @@ -418,11 +419,8 @@ ensure_stats_timer(State = #state{enable_stats = true, State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; ensure_stats_timer(State) -> State. -shutdown(Reason, State) -> - stop({shutdown, Reason}, State). - -stop(Reason, State) -> - {stop, Reason, State}. +%%------------------------------------------------------------------------------ +%% Maybe GC maybe_gc(_, State = #state{gc_state = undefined}) -> State; @@ -435,8 +433,11 @@ maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) -> maybe_gc(_, State) -> State. -ensure_ok_or_exit(ok) -> - ok; -ensure_ok_or_exit({error, Reason}) -> - self() ! {shutdown, Reason}. +%%------------------------------------------------------------------------------ +%% Shutdown or stop +shutdown(Reason, State) -> + stop({shutdown, Reason}, State). + +stop(Reason, State) -> + {stop, Reason, State}. diff --git a/test/emqx_stats_tests.erl b/test/emqx_stats_tests.erl index dd9733a88..e8b5e82af 100644 --- a/test/emqx_stats_tests.erl +++ b/test/emqx_stats_tests.erl @@ -75,10 +75,10 @@ helper_test_() -> with_proc(fun() -> TestF(CbModule, CbFun) end, TickMs) end end, - [{"emqx_broker_helper", MkTestFun(emqx_broker_helper, stats_fun)}, + [{"emqx_broker", MkTestFun(emqx_broker, stats_fun)}, {"emqx_sm", MkTestFun(emqx_sm, stats_fun)}, {"emqx_router_helper", MkTestFun(emqx_router_helper, stats_fun)}, - {"emqx_cm", MkTestFun(emqx_cm, update_conn_stats)} + {"emqx_cm", MkTestFun(emqx_cm, stats_fun)} ]. with_proc(F) ->