Improve the 'emqx_connection' module for CT

This commit is contained in:
Feng Lee 2019-12-16 11:20:57 +08:00
parent f46cfbaa62
commit 635c3f75fe
2 changed files with 62 additions and 56 deletions

View File

@ -3,7 +3,7 @@
{deps, {deps,
[{jsx, "2.10.0"}, [{jsx, "2.10.0"},
{gproc, "0.8.0"}, {gproc, "0.8.0"},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.0"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.0"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.1"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.1"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},

View File

@ -51,7 +51,7 @@
]). ]).
%% Internal callback %% Internal callback
-export([wakeup_from_hib/3]). -export([wakeup_from_hib/2]).
%% Export for CT %% Export for CT
-export([set_field/3]). -export([set_field/3]).
@ -89,7 +89,9 @@
%% Stats Timer %% Stats Timer
stats_timer :: disabled | maybe(reference()), stats_timer :: disabled | maybe(reference()),
%% Idle Timer %% Idle Timer
idle_timer :: maybe(reference()) idle_timer :: maybe(reference()),
%% Idle Timeout
idle_timeout :: integer()
}). }).
-type(state() :: #state{}). -type(state() :: #state{}).
@ -166,13 +168,13 @@ stop(Pid) ->
init(Parent, Transport, RawSocket, Options) -> init(Parent, Transport, RawSocket, Options) ->
case Transport:wait(RawSocket) of case Transport:wait(RawSocket) of
{ok, Socket} -> {ok, Socket} ->
do_init(Parent, Transport, Socket, Options); run_loop(Parent, init_state(Transport, Socket, Options));
{error, Reason} -> {error, Reason} ->
ok = Transport:fast_close(RawSocket), ok = Transport:fast_close(RawSocket),
exit_on_sock_error(Reason) exit_on_sock_error(Reason)
end. end.
do_init(Parent, Transport, Socket, Options) -> init_state(Transport, Socket, Options) ->
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]), {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
{ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]), {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]), Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
@ -194,25 +196,31 @@ do_init(Parent, Transport, Socket, Options) ->
StatsTimer = emqx_zone:stats_timer(Zone), StatsTimer = emqx_zone:stats_timer(Zone),
IdleTimeout = emqx_zone:idle_timeout(Zone), IdleTimeout = emqx_zone:idle_timeout(Zone),
IdleTimer = start_timer(IdleTimeout, idle_timeout), IdleTimer = start_timer(IdleTimeout, idle_timeout),
emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)), #state{transport = Transport,
socket = Socket,
peername = Peername,
sockname = Sockname,
sockstate = idle,
active_n = ActiveN,
limiter = Limiter,
parse_state = ParseState,
serialize = Serialize,
channel = Channel,
gc_state = GcState,
stats_timer = StatsTimer,
idle_timer = IdleTimer,
idle_timeout = IdleTimeout
}.
run_loop(Parent, State = #state{transport = Transport,
socket = Socket,
peername = Peername,
channel = Channel}) ->
emqx_logger:set_metadata_peername(esockd:format(Peername)), emqx_logger:set_metadata_peername(esockd:format(Peername)),
State = #state{transport = Transport, emqx_misc:tune_heap_size(emqx_zone:oom_policy(
socket = Socket, emqx_channel:info(zone, Channel))),
peername = Peername,
sockname = Sockname,
sockstate = idle,
active_n = ActiveN,
limiter = Limiter,
parse_state = ParseState,
serialize = Serialize,
channel = Channel,
gc_state = GcState,
stats_timer = StatsTimer,
idle_timer = IdleTimer
},
case activate_socket(State) of case activate_socket(State) of
{ok, NState} -> {ok, NState} -> hibernate(Parent, NState);
hibernate(Parent, NState, #{idle_timeout => IdleTimeout});
{error, Reason} -> {error, Reason} ->
ok = Transport:fast_close(Socket), ok = Transport:fast_close(Socket),
exit_on_sock_error(Reason) exit_on_sock_error(Reason)
@ -230,28 +238,24 @@ exit_on_sock_error(Reason) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Recv Loop %% Recv Loop
recvloop(Parent, State, Options = #{idle_timeout := IdleTimeout}) -> recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
receive receive
{system, From, Request} -> {system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
?MODULE, [], {State, Options});
{'EXIT', Parent, Reason} -> {'EXIT', Parent, Reason} ->
terminate(Reason, State); terminate(Reason, State);
Msg -> Msg ->
NState = ensure_stats_timer(IdleTimeout, State), process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State))
process_msg([Msg], Parent, NState, Options)
after after
IdleTimeout -> IdleTimeout ->
NState = cancel_stats_timer(State), hibernate(Parent, cancel_stats_timer(State))
hibernate(Parent, NState, Options)
end. end.
hibernate(Parent, State, Options) -> hibernate(Parent, State) ->
proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State, Options]). proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State]).
wakeup_from_hib(Parent, State, Options) -> %% Maybe do something here later.
%% Maybe do something later here. wakeup_from_hib(Parent, State) -> recvloop(Parent, State).
recvloop(Parent, State, Options).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Ensure/cancel stats timer %% Ensure/cancel stats timer
@ -270,17 +274,16 @@ cancel_stats_timer(State) -> State.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process next Msg %% Process next Msg
process_msg([], Parent, State, Options) -> process_msg([], Parent, State) -> recvloop(Parent, State);
recvloop(Parent, State, Options);
process_msg([Msg|More], Parent, State, Options) -> process_msg([Msg|More], Parent, State) ->
case catch handle_msg(Msg, State) of case catch handle_msg(Msg, State) of
ok -> ok ->
process_msg(More, Parent, State, Options); process_msg(More, Parent, State);
{ok, NState} -> {ok, NState} ->
process_msg(More, Parent, NState, Options); process_msg(More, Parent, NState);
{ok, Msgs, NState} -> {ok, Msgs, NState} ->
process_msg(append_msg(Msgs, More), Parent, NState, Options); process_msg(append_msg(More, Msgs), Parent, NState);
{stop, Reason} -> {stop, Reason} ->
terminate(Reason, State); terminate(Reason, State);
{stop, Reason, NState} -> {stop, Reason, NState} ->
@ -289,6 +292,15 @@ process_msg([Msg|More], Parent, State, Options) ->
terminate(Reason, State) terminate(Reason, State)
end. end.
-compile({inline, [append_msg/2]}).
append_msg([], Msgs) when is_list(Msgs) ->
Msgs;
append_msg([], Msg) -> [Msg];
append_msg(Q, Msgs) when is_list(Msgs) ->
lists:append(Q, Msgs);
append_msg(Q, Msg) ->
lists:append(Q, [Msg]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle a Msg %% Handle a Msg
@ -410,18 +422,17 @@ terminate(Reason, State = #state{channel = Channel}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Sys callbacks %% Sys callbacks
system_continue(Parent, _Deb, {State, Options}) -> system_continue(Parent, _Debug, State) ->
recvloop(Parent, State, Options). recvloop(Parent, State).
system_terminate(Reason, _Parent, _Deb, {State, _}) -> system_terminate(Reason, _Parent, _Debug, State) ->
terminate(Reason, State). terminate(Reason, State).
system_code_change(Misc, _, _, _) -> system_code_change(State, _Mod, _OldVsn, _Extra) ->
{ok, Misc}.
system_get_state({State, _Options}) ->
{ok, State}. {ok, State}.
system_get_state(State) -> {ok, State}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Handle call %% Handle call
@ -681,18 +692,13 @@ inc_outgoing_stats(Packet = ?PACKET(Type)) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
-compile({inline, [append_msg/2]}).
append_msg(Msgs, Q) when is_list(Msgs) ->
lists:append(Msgs, Q);
append_msg(Msg, Q) -> [Msg|Q].
-compile({inline, [next_msgs/1]}). -compile({inline, [next_msgs/1]}).
next_msgs(Packet) when is_record(Packet, mqtt_packet) -> next_msgs(Packet) when is_record(Packet, mqtt_packet) ->
{outgoing, Packet}; {outgoing, Packet};
next_msgs(Action) when is_tuple(Action) -> next_msgs(Event) when is_tuple(Event) ->
Action; Event;
next_msgs(Actions) when is_list(Actions) -> next_msgs(More) when is_list(More) ->
Actions. More.
-compile({inline, [shutdown/2, shutdown/3]}). -compile({inline, [shutdown/2, shutdown/3]}).
shutdown(Reason, State) -> shutdown(Reason, State) ->