Merge pull request #13180 from zmstone/0604-optimize-tcp-ssl-connection-data-send-performance

0604 optimize tcp ssl connection data send performance
This commit is contained in:
zmstone 2024-06-11 22:06:57 +02:00 committed by GitHub
commit 8a283b1cc0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 72 additions and 93 deletions

View File

@ -158,31 +158,6 @@
-define(ENABLED(X), (X =/= undefined)).
-define(ALARM_TCP_CONGEST(Channel),
list_to_binary(
io_lib:format(
"mqtt_conn/congested/~ts/~ts",
[
emqx_channel:info(clientid, Channel),
emqx_channel:info(username, Channel)
]
)
)
).
-define(ALARM_CONN_INFO_KEYS, [
socktype,
sockname,
peername,
clientid,
username,
proto_name,
proto_ver,
connected_at
]).
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-define(LIMITER_BYTES_IN, bytes).
-define(LIMITER_MESSAGE_IN, messages).
@ -603,17 +578,6 @@ handle_msg(
ActiveN = get_active_n(Type, Listener),
Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
handle_msg({inet_reply, _Sock, ok}, State = #state{listener = {Type, Listener}}) ->
case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Type, Listener) of
true ->
Pubs = emqx_pd:reset_counter(outgoing_pubs),
Bytes = emqx_pd:reset_counter(outgoing_bytes),
OutStats = #{cnt => Pubs, oct => Bytes},
{ok, check_oom(run_gc(OutStats, State))};
false ->
ok
end;
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_info({sock_error, Reason}, State);
handle_msg({connack, ConnAck}, State) ->
@ -729,9 +693,9 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
shutdown(Reason, Reply, State#state{channel = NChannel});
{shutdown, Reason, Reply, OutPacket, NChannel} ->
NState = State#state{channel = NChannel},
ok = handle_outgoing(OutPacket, NState),
NState2 = graceful_shutdown_transport(Reason, NState),
shutdown(Reason, Reply, NState2)
{ok, NState2} = handle_outgoing(OutPacket, NState),
NState3 = graceful_shutdown_transport(Reason, NState2),
shutdown(Reason, Reply, NState3)
end.
%%--------------------------------------------------------------------
@ -854,8 +818,8 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
shutdown(Reason, State#state{channel = NChannel});
{shutdown, Reason, Packet, NChannel} ->
NState = State#state{channel = NChannel},
ok = handle_outgoing(Packet, NState),
shutdown(Reason, NState)
{ok, NState2} = handle_outgoing(Packet, NState),
shutdown(Reason, NState2)
end.
%%--------------------------------------------------------------------
@ -909,20 +873,36 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
%%--------------------------------------------------------------------
%% Send data
-spec send(iodata(), state()) -> ok.
send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ->
-spec send(iodata(), state()) -> {ok, state()}.
send(IoData, #state{transport = Transport, socket = Socket} = State) ->
Oct = iolist_size(IoData),
ok = emqx_metrics:inc('bytes.sent', Oct),
emqx_metrics:inc('bytes.sent', Oct),
inc_counter(outgoing_bytes, Oct),
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
case Transport:async_send(Socket, IoData, []) of
case Transport:send(Socket, IoData) of
ok ->
ok;
%% NOTE: for Transport=emqx_quic_stream, it's actually an
%% async_send, sent/1 should technically be called when
%% {quic, send_complete, _Stream, true | false} is received,
%% but it is handled early for simplicity
sent(State);
Error = {error, _Reason} ->
%% Send an inet_reply to postpone handling the error
%% @FIXME: why not just return error?
%% Defer error handling
%% so it's handled the same as tcp_closed or ssl_closed
self() ! {inet_reply, Socket, Error},
ok
{ok, State}
end.
%% Some bytes sent
sent(#state{listener = {Type, Listener}} = State) ->
%% Run GC and check OOM after certain amount of messages or bytes sent.
case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Type, Listener) of
true ->
Pubs = emqx_pd:reset_counter(outgoing_pubs),
Bytes = emqx_pd:reset_counter(outgoing_bytes),
OutStats = #{cnt => Pubs, oct => Bytes},
{ok, check_oom(run_gc(OutStats, State))};
false ->
{ok, State}
end.
%%--------------------------------------------------------------------

View File

@ -34,7 +34,7 @@
fast_close/1,
shutdown/2,
ensure_ok_or_exit/2,
async_send/3,
send/2,
setopts/2,
getopts/2,
peername/1,
@ -165,7 +165,7 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) ->
Result
end.
async_send({quic, _Conn, Stream, _Info}, Data, _Options) ->
send({quic, _Conn, Stream, _Info}, Data) ->
case quicer:async_send(Stream, Data, ?QUICER_SEND_FLAG_SYNC) of
{ok, _Len} -> ok;
{error, X, Y} -> {error, {X, Y}};

View File

@ -94,8 +94,7 @@ init_per_testcase(TestCase, Config) when
ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) ->
{ok, [{K, 0} || K <- Options]}
end),
ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end),
ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end),
ok = meck:expect(emqx_transport, send, fun(_Sock, _Data) -> ok end),
ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end),
case erlang:function_exported(?MODULE, TestCase, 2) of
true -> ?MODULE:TestCase(init, Config);
@ -234,9 +233,11 @@ t_handle_msg_incoming(_) ->
?assertMatch({ok, _St}, handle_msg({incoming, undefined}, st())).
t_handle_msg_outgoing(_) ->
?assertEqual(ok, handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())),
?assertEqual(ok, handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())),
?assertEqual(ok, handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())).
?assertMatch(
{ok, _}, handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())
),
?assertMatch({ok, _}, handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())),
?assertMatch({ok, _}, handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())).
t_handle_msg_tcp_error(_) ->
?assertMatch(
@ -255,18 +256,13 @@ t_handle_msg_deliver(_) ->
?assertMatch({ok, _St}, handle_msg({deliver, topic, msg}, st())).
t_handle_msg_inet_reply(_) ->
ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 0),
?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st())),
emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 100),
?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st())),
?assertMatch(
{stop, {shutdown, for_testing}, _St},
handle_msg({inet_reply, for_testing, {error, for_testing}}, st())
).
t_handle_msg_connack(_) ->
?assertEqual(ok, handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())).
?assertMatch({ok, _}, handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())).
t_handle_msg_close(_) ->
?assertMatch({stop, {shutdown, normal}, _St}, handle_msg({close, normal}, st())).
@ -388,8 +384,8 @@ t_with_channel(_) ->
meck:unload(emqx_channel).
t_handle_outgoing(_) ->
?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
?assertMatch({ok, _}, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())),
?assertMatch({ok, _}, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())).
t_handle_info(_) ->
?assertMatch(

View File

@ -242,7 +242,7 @@ esockd_send(Data, #state{
}) ->
gen_udp:send(Sock, Ip, Port, Data);
esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
esockd_transport:async_send(Sock, Data).
esockd_transport:send(Sock, Data).
keepalive_stats(recv) ->
emqx_pd:get_counter(recv_pkt);
@ -503,18 +503,6 @@ handle_msg(
) ->
Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
%% TODO: Who will deliver this message?
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
true ->
Pubs = emqx_pd:reset_counter(outgoing_pkt),
Bytes = emqx_pd:reset_counter(outgoing_bytes),
OutStats = #{cnt => Pubs, oct => Bytes},
{ok, check_oom(run_gc(OutStats, State))};
false ->
ok
end;
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_info({sock_error, Reason}, State);
handle_msg({close, Reason}, State) ->
@ -630,8 +618,8 @@ handle_call(
shutdown(Reason, Reply, State#state{channel = NChannel});
{shutdown, Reason, Reply, Packet, NChannel} ->
NState = State#state{channel = NChannel},
ok = handle_outgoing(Packet, NState),
shutdown(Reason, Reply, NState)
{ok, NState1} = handle_outgoing(Packet, NState),
shutdown(Reason, Reply, NState1)
end.
%%--------------------------------------------------------------------
@ -772,15 +760,15 @@ with_channel(
shutdown(Reason, State#state{channel = NChannel});
{shutdown, Reason, Packet, NChannel} ->
NState = State#state{channel = NChannel},
ok = handle_outgoing(Packet, NState),
shutdown(Reason, NState)
{ok, NState1} = handle_outgoing(Packet, NState),
shutdown(Reason, NState1)
end.
%%--------------------------------------------------------------------
%% Handle outgoing packets
handle_outgoing(_Packets = [], _State) ->
ok;
handle_outgoing(_Packets = [], State) ->
{ok, State};
handle_outgoing(
Packets,
State = #state{socket = Socket}
@ -792,12 +780,15 @@ handle_outgoing(
State
);
_ ->
lists:foreach(
fun(Packet) ->
handle_outgoing(Packet, State)
NState = lists:foldl(
fun(Packet, State0) ->
{ok, State1} = handle_outgoing(Packet, State0),
State1
end,
State,
Packets
)
),
{ok, NState}
end;
handle_outgoing(Packet, State) ->
send((serialize_and_inc_stats_fun(State))(Packet), State).
@ -842,7 +833,7 @@ serialize_and_inc_stats_fun(#state{
%%--------------------------------------------------------------------
%% Send data
-spec send(iodata(), state()) -> ok.
-spec send(iodata(), state()) -> {ok, state()}.
send(
IoData,
State = #state{
@ -858,11 +849,22 @@ send(
inc_counter(outgoing_bytes, Oct),
case esockd_send(IoData, State) of
ok ->
ok;
sent(State);
Error = {error, _Reason} ->
%% Send an inet_reply to postpone handling the error
%% Send an inet_reply to defer handling the error
self() ! {inet_reply, Socket, Error},
ok
{ok, State}
end.
sent(#state{active_n = ActiveN} = State) ->
case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
true ->
Pubs = emqx_pd:reset_counter(outgoing_pkt),
Bytes = emqx_pd:reset_counter(outgoing_bytes),
OutStats = #{cnt => Pubs, oct => Bytes},
{ok, check_oom(run_gc(OutStats, State))};
false ->
{ok, State}
end.
%%--------------------------------------------------------------------

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway, [
{description, "The Gateway management application"},
{vsn, "0.1.32"},
{vsn, "0.1.33"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, emqx, emqx_auth, emqx_ctl]},

View File

@ -0,0 +1 @@
Improve client message handling performance when running on OTP 26.