diff --git a/rebar.config b/rebar.config index 7a2c67eb1..98ce306f0 100644 --- a/rebar.config +++ b/rebar.config @@ -3,7 +3,7 @@ {deps, [{jsx, "2.10.0"}, {gproc, "0.8.0"}, - {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.1"}}}, + {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.0"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 3461788d5..7fb8756a3 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -51,7 +51,7 @@ ]). %% Internal callback --export([wakeup_from_hib/3]). +-export([wakeup_from_hib/2]). %% Export for CT -export([set_field/3]). @@ -89,7 +89,9 @@ %% Stats Timer stats_timer :: disabled | maybe(reference()), %% Idle Timer - idle_timer :: maybe(reference()) + idle_timer :: maybe(reference()), + %% Idle Timeout + idle_timeout :: integer() }). -type(state() :: #state{}). @@ -166,13 +168,13 @@ stop(Pid) -> init(Parent, Transport, RawSocket, Options) -> case Transport:wait(RawSocket) of {ok, Socket} -> - do_init(Parent, Transport, Socket, Options); + run_loop(Parent, init_state(Transport, Socket, Options)); {error, Reason} -> ok = Transport:fast_close(RawSocket), exit_on_sock_error(Reason) end. -do_init(Parent, Transport, Socket, Options) -> +init_state(Transport, Socket, Options) -> {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), @@ -194,25 +196,31 @@ do_init(Parent, Transport, Socket, Options) -> StatsTimer = emqx_zone:stats_timer(Zone), IdleTimeout = emqx_zone:idle_timeout(Zone), IdleTimer = start_timer(IdleTimeout, idle_timeout), - emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)), + #state{transport = Transport, + socket = Socket, + peername = Peername, + sockname = Sockname, + sockstate = idle, + active_n = ActiveN, + limiter = Limiter, + parse_state = ParseState, + serialize = Serialize, + channel = Channel, + gc_state = GcState, + stats_timer = StatsTimer, + idle_timer = IdleTimer, + idle_timeout = IdleTimeout + }. + +run_loop(Parent, State = #state{transport = Transport, + socket = Socket, + peername = Peername, + channel = Channel}) -> emqx_logger:set_metadata_peername(esockd:format(Peername)), - State = #state{transport = Transport, - socket = Socket, - peername = Peername, - sockname = Sockname, - sockstate = idle, - active_n = ActiveN, - limiter = Limiter, - parse_state = ParseState, - serialize = Serialize, - channel = Channel, - gc_state = GcState, - stats_timer = StatsTimer, - idle_timer = IdleTimer - }, + emqx_misc:tune_heap_size(emqx_zone:oom_policy( + emqx_channel:info(zone, Channel))), case activate_socket(State) of - {ok, NState} -> - hibernate(Parent, NState, #{idle_timeout => IdleTimeout}); + {ok, NState} -> hibernate(Parent, NState); {error, Reason} -> ok = Transport:fast_close(Socket), exit_on_sock_error(Reason) @@ -230,28 +238,24 @@ exit_on_sock_error(Reason) -> %%-------------------------------------------------------------------- %% Recv Loop -recvloop(Parent, State, Options = #{idle_timeout := IdleTimeout}) -> +recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) -> receive {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, - ?MODULE, [], {State, Options}); + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State); {'EXIT', Parent, Reason} -> terminate(Reason, State); Msg -> - NState = ensure_stats_timer(IdleTimeout, State), - process_msg([Msg], Parent, NState, Options) + process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State)) after IdleTimeout -> - NState = cancel_stats_timer(State), - hibernate(Parent, NState, Options) + hibernate(Parent, cancel_stats_timer(State)) end. -hibernate(Parent, State, Options) -> - proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State, Options]). +hibernate(Parent, State) -> + proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]). -wakeup_from_hib(Parent, State, Options) -> - %% Maybe do something later here. - recvloop(Parent, State, Options). +%% Maybe do something here later. +wakeup_from_hib(Parent, State) -> recvloop(Parent, State). %%-------------------------------------------------------------------- %% Ensure/cancel stats timer @@ -270,17 +274,16 @@ cancel_stats_timer(State) -> State. %%-------------------------------------------------------------------- %% Process next Msg -process_msg([], Parent, State, Options) -> - recvloop(Parent, State, Options); +process_msg([], Parent, State) -> recvloop(Parent, State); -process_msg([Msg|More], Parent, State, Options) -> +process_msg([Msg|More], Parent, State) -> case catch handle_msg(Msg, State) of ok -> - process_msg(More, Parent, State, Options); + process_msg(More, Parent, State); {ok, NState} -> - process_msg(More, Parent, NState, Options); + process_msg(More, Parent, NState); {ok, Msgs, NState} -> - process_msg(append_msg(Msgs, More), Parent, NState, Options); + process_msg(append_msg(More, Msgs), Parent, NState); {stop, Reason} -> terminate(Reason, State); {stop, Reason, NState} -> @@ -289,6 +292,15 @@ process_msg([Msg|More], Parent, State, Options) -> terminate(Reason, State) end. +-compile({inline, [append_msg/2]}). +append_msg([], Msgs) when is_list(Msgs) -> + Msgs; +append_msg([], Msg) -> [Msg]; +append_msg(Q, Msgs) when is_list(Msgs) -> + lists:append(Q, Msgs); +append_msg(Q, Msg) -> + lists:append(Q, [Msg]). + %%-------------------------------------------------------------------- %% Handle a Msg @@ -410,18 +422,17 @@ terminate(Reason, State = #state{channel = Channel}) -> %%-------------------------------------------------------------------- %% Sys callbacks -system_continue(Parent, _Deb, {State, Options}) -> - recvloop(Parent, State, Options). +system_continue(Parent, _Debug, State) -> + recvloop(Parent, State). -system_terminate(Reason, _Parent, _Deb, {State, _}) -> +system_terminate(Reason, _Parent, _Debug, State) -> terminate(Reason, State). -system_code_change(Misc, _, _, _) -> - {ok, Misc}. - -system_get_state({State, _Options}) -> +system_code_change(State, _Mod, _OldVsn, _Extra) -> {ok, State}. +system_get_state(State) -> {ok, State}. + %%-------------------------------------------------------------------- %% Handle call @@ -681,18 +692,13 @@ inc_outgoing_stats(Packet = ?PACKET(Type)) -> %%-------------------------------------------------------------------- %% Helper functions --compile({inline, [append_msg/2]}). -append_msg(Msgs, Q) when is_list(Msgs) -> - lists:append(Msgs, Q); -append_msg(Msg, Q) -> [Msg|Q]. - -compile({inline, [next_msgs/1]}). next_msgs(Packet) when is_record(Packet, mqtt_packet) -> {outgoing, Packet}; -next_msgs(Action) when is_tuple(Action) -> - Action; -next_msgs(Actions) when is_list(Actions) -> - Actions. +next_msgs(Event) when is_tuple(Event) -> + Event; +next_msgs(More) when is_list(More) -> + More. -compile({inline, [shutdown/2, shutdown/3]}). shutdown(Reason, State) ->