diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index e0baab238..ed62fb63c 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -158,31 +158,6 @@ -define(ENABLED(X), (X =/= undefined)). --define(ALARM_TCP_CONGEST(Channel), - list_to_binary( - io_lib:format( - "mqtt_conn/congested/~ts/~ts", - [ - emqx_channel:info(clientid, Channel), - emqx_channel:info(username, Channel) - ] - ) - ) -). - --define(ALARM_CONN_INFO_KEYS, [ - socktype, - sockname, - peername, - clientid, - username, - proto_name, - proto_ver, - connected_at -]). --define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). --define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). - -define(LIMITER_BYTES_IN, bytes). -define(LIMITER_MESSAGE_IN, messages). @@ -603,17 +578,6 @@ handle_msg( ActiveN = get_active_n(Type, Listener), Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); -%% Something sent -handle_msg({inet_reply, _Sock, ok}, State = #state{listener = {Type, Listener}}) -> - case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Type, Listener) of - true -> - Pubs = emqx_pd:reset_counter(outgoing_pubs), - Bytes = emqx_pd:reset_counter(outgoing_bytes), - OutStats = #{cnt => Pubs, oct => Bytes}, - {ok, check_oom(run_gc(OutStats, State))}; - false -> - ok - end; handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> handle_info({sock_error, Reason}, State); handle_msg({connack, ConnAck}, State) -> @@ -729,9 +693,9 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> shutdown(Reason, Reply, State#state{channel = NChannel}); {shutdown, Reason, Reply, OutPacket, NChannel} -> NState = State#state{channel = NChannel}, - ok = handle_outgoing(OutPacket, NState), - NState2 = graceful_shutdown_transport(Reason, NState), - shutdown(Reason, Reply, NState2) + {ok, NState2} = handle_outgoing(OutPacket, NState), + NState3 = graceful_shutdown_transport(Reason, NState2), + shutdown(Reason, Reply, NState3) end. %%-------------------------------------------------------------------- @@ -854,8 +818,8 @@ with_channel(Fun, Args, State = #state{channel = Channel}) -> shutdown(Reason, State#state{channel = NChannel}); {shutdown, Reason, Packet, NChannel} -> NState = State#state{channel = NChannel}, - ok = handle_outgoing(Packet, NState), - shutdown(Reason, NState) + {ok, NState2} = handle_outgoing(Packet, NState), + shutdown(Reason, NState2) end. %%-------------------------------------------------------------------- @@ -909,20 +873,36 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> %%-------------------------------------------------------------------- %% Send data --spec send(iodata(), state()) -> ok. -send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) -> +-spec send(iodata(), state()) -> {ok, state()}. +send(IoData, #state{transport = Transport, socket = Socket} = State) -> Oct = iolist_size(IoData), - ok = emqx_metrics:inc('bytes.sent', Oct), + emqx_metrics:inc('bytes.sent', Oct), inc_counter(outgoing_bytes, Oct), - emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel), - case Transport:async_send(Socket, IoData, []) of + case Transport:send(Socket, IoData) of ok -> - ok; + %% NOTE: for Transport=emqx_quic_stream, it's actually an + %% async_send, sent/1 should technically be called when + %% {quic, send_complete, _Stream, true | false} is received, + %% but it is handled early for simplicity + sent(State); Error = {error, _Reason} -> - %% Send an inet_reply to postpone handling the error - %% @FIXME: why not just return error? + %% Defer error handling + %% so it's handled the same as tcp_closed or ssl_closed self() ! {inet_reply, Socket, Error}, - ok + {ok, State} + end. + +%% Some bytes sent +sent(#state{listener = {Type, Listener}} = State) -> + %% Run GC and check OOM after certain amount of messages or bytes sent. + case emqx_pd:get_counter(outgoing_pubs) > get_active_n(Type, Listener) of + true -> + Pubs = emqx_pd:reset_counter(outgoing_pubs), + Bytes = emqx_pd:reset_counter(outgoing_bytes), + OutStats = #{cnt => Pubs, oct => Bytes}, + {ok, check_oom(run_gc(OutStats, State))}; + false -> + {ok, State} end. %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_quic_stream.erl b/apps/emqx/src/emqx_quic_stream.erl index 43f1eebfe..ca4134c25 100644 --- a/apps/emqx/src/emqx_quic_stream.erl +++ b/apps/emqx/src/emqx_quic_stream.erl @@ -34,7 +34,7 @@ fast_close/1, shutdown/2, ensure_ok_or_exit/2, - async_send/3, + send/2, setopts/2, getopts/2, peername/1, @@ -165,7 +165,7 @@ ensure_ok_or_exit(Fun, Args = [Sock | _]) when is_atom(Fun), is_list(Args) -> Result end. -async_send({quic, _Conn, Stream, _Info}, Data, _Options) -> +send({quic, _Conn, Stream, _Info}, Data) -> case quicer:async_send(Stream, Data, ?QUICER_SEND_FLAG_SYNC) of {ok, _Len} -> ok; {error, X, Y} -> {error, {X, Y}}; diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 7fffa3374..3e2db9e36 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -94,8 +94,7 @@ init_per_testcase(TestCase, Config) when ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) -> {ok, [{K, 0} || K <- Options]} end), - ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data) -> ok end), - ok = meck:expect(emqx_transport, async_send, fun(_Sock, _Data, _Opts) -> ok end), + ok = meck:expect(emqx_transport, send, fun(_Sock, _Data) -> ok end), ok = meck:expect(emqx_transport, fast_close, fun(_Sock) -> ok end), case erlang:function_exported(?MODULE, TestCase, 2) of true -> ?MODULE:TestCase(init, Config); @@ -234,9 +233,11 @@ t_handle_msg_incoming(_) -> ?assertMatch({ok, _St}, handle_msg({incoming, undefined}, st())). t_handle_msg_outgoing(_) -> - ?assertEqual(ok, handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st())), - ?assertEqual(ok, handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())), - ?assertEqual(ok, handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())). + ?assertMatch( + {ok, _}, handle_msg({outgoing, ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>)}, st()) + ), + ?assertMatch({ok, _}, handle_msg({outgoing, ?PUBREL_PACKET(1)}, st())), + ?assertMatch({ok, _}, handle_msg({outgoing, ?PUBCOMP_PACKET(1)}, st())). t_handle_msg_tcp_error(_) -> ?assertMatch( @@ -255,18 +256,13 @@ t_handle_msg_deliver(_) -> ?assertMatch({ok, _St}, handle_msg({deliver, topic, msg}, st())). t_handle_msg_inet_reply(_) -> - ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), - emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 0), - ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st())), - emqx_config:put_listener_conf(tcp, default, [tcp_options, active_n], 100), - ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st())), ?assertMatch( {stop, {shutdown, for_testing}, _St}, handle_msg({inet_reply, for_testing, {error, for_testing}}, st()) ). t_handle_msg_connack(_) -> - ?assertEqual(ok, handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())). + ?assertMatch({ok, _}, handle_msg({connack, ?CONNACK_PACKET(?CONNACK_ACCEPT)}, st())). t_handle_msg_close(_) -> ?assertMatch({stop, {shutdown, normal}, _St}, handle_msg({close, normal}, st())). @@ -388,8 +384,8 @@ t_with_channel(_) -> meck:unload(emqx_channel). t_handle_outgoing(_) -> - ?assertEqual(ok, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())), - ?assertEqual(ok, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())). + ?assertMatch({ok, _}, emqx_connection:handle_outgoing(?PACKET(?PINGRESP), st())), + ?assertMatch({ok, _}, emqx_connection:handle_outgoing([?PACKET(?PINGRESP)], st())). t_handle_info(_) -> ?assertMatch( diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 84dfe44a2..710148b94 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -242,7 +242,7 @@ esockd_send(Data, #state{ }) -> gen_udp:send(Sock, Ip, Port, Data); esockd_send(Data, #state{socket = {esockd_transport, Sock}}) -> - esockd_transport:async_send(Sock, Data). + esockd_transport:send(Sock, Data). keepalive_stats(recv) -> emqx_pd:get_counter(recv_pkt); @@ -503,18 +503,6 @@ handle_msg( ) -> Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); -%% Something sent -%% TODO: Who will deliver this message? -handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> - case emqx_pd:get_counter(outgoing_pkt) > ActiveN of - true -> - Pubs = emqx_pd:reset_counter(outgoing_pkt), - Bytes = emqx_pd:reset_counter(outgoing_bytes), - OutStats = #{cnt => Pubs, oct => Bytes}, - {ok, check_oom(run_gc(OutStats, State))}; - false -> - ok - end; handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> handle_info({sock_error, Reason}, State); handle_msg({close, Reason}, State) -> @@ -630,8 +618,8 @@ handle_call( shutdown(Reason, Reply, State#state{channel = NChannel}); {shutdown, Reason, Reply, Packet, NChannel} -> NState = State#state{channel = NChannel}, - ok = handle_outgoing(Packet, NState), - shutdown(Reason, Reply, NState) + {ok, NState1} = handle_outgoing(Packet, NState), + shutdown(Reason, Reply, NState1) end. %%-------------------------------------------------------------------- @@ -772,15 +760,15 @@ with_channel( shutdown(Reason, State#state{channel = NChannel}); {shutdown, Reason, Packet, NChannel} -> NState = State#state{channel = NChannel}, - ok = handle_outgoing(Packet, NState), - shutdown(Reason, NState) + {ok, NState1} = handle_outgoing(Packet, NState), + shutdown(Reason, NState1) end. %%-------------------------------------------------------------------- %% Handle outgoing packets -handle_outgoing(_Packets = [], _State) -> - ok; +handle_outgoing(_Packets = [], State) -> + {ok, State}; handle_outgoing( Packets, State = #state{socket = Socket} @@ -792,12 +780,15 @@ handle_outgoing( State ); _ -> - lists:foreach( - fun(Packet) -> - handle_outgoing(Packet, State) + NState = lists:foldl( + fun(Packet, State0) -> + {ok, State1} = handle_outgoing(Packet, State0), + State1 end, + State, Packets - ) + ), + {ok, NState} end; handle_outgoing(Packet, State) -> send((serialize_and_inc_stats_fun(State))(Packet), State). @@ -842,7 +833,7 @@ serialize_and_inc_stats_fun(#state{ %%-------------------------------------------------------------------- %% Send data --spec send(iodata(), state()) -> ok. +-spec send(iodata(), state()) -> {ok, state()}. send( IoData, State = #state{ @@ -858,11 +849,22 @@ send( inc_counter(outgoing_bytes, Oct), case esockd_send(IoData, State) of ok -> - ok; + sent(State); Error = {error, _Reason} -> - %% Send an inet_reply to postpone handling the error + %% Send an inet_reply to defer handling the error self() ! {inet_reply, Socket, Error}, - ok + {ok, State} + end. + +sent(#state{active_n = ActiveN} = State) -> + case emqx_pd:get_counter(outgoing_pkt) > ActiveN of + true -> + Pubs = emqx_pd:reset_counter(outgoing_pkt), + Bytes = emqx_pd:reset_counter(outgoing_bytes), + OutStats = #{cnt => Pubs, oct => Bytes}, + {ok, check_oom(run_gc(OutStats, State))}; + false -> + {ok, State} end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/emqx_gateway.app.src b/apps/emqx_gateway/src/emqx_gateway.app.src index 3c6634edc..fa8a774ed 100644 --- a/apps/emqx_gateway/src/emqx_gateway.app.src +++ b/apps/emqx_gateway/src/emqx_gateway.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway, [ {description, "The Gateway management application"}, - {vsn, "0.1.32"}, + {vsn, "0.1.33"}, {registered, []}, {mod, {emqx_gateway_app, []}}, {applications, [kernel, stdlib, emqx, emqx_auth, emqx_ctl]}, diff --git a/changes/ce/feat-13180.en.md b/changes/ce/feat-13180.en.md new file mode 100644 index 000000000..255230d0d --- /dev/null +++ b/changes/ce/feat-13180.en.md @@ -0,0 +1 @@ +Improve client message handling performance when running on OTP 26.