diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 1819774e6..a570303ed 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -564,12 +564,10 @@ handle_msg({Closed, _Sock}, State) when handle_msg({Passive, _Sock}, State) when Passive == tcp_passive; Passive == ssl_passive; Passive =:= quic_passive -> - %% In Stats Pubs = emqx_pd:reset_counter(incoming_pubs), Bytes = emqx_pd:reset_counter(incoming_bytes), - InStats = #{cnt => Pubs, oct => Bytes}, %% Run GC and Check OOM - NState1 = check_oom(run_gc(InStats, State)), + NState1 = check_oom(Pubs, Bytes, run_gc(Pubs, Bytes, State)), handle_info(activate_socket, NState1); handle_msg( Deliver = {deliver, _Topic, _Msg}, @@ -899,8 +897,7 @@ sent(#state{listener = {Type, Listener}} = State) -> true -> Pubs = emqx_pd:reset_counter(outgoing_pubs), Bytes = emqx_pd:reset_counter(outgoing_bytes), - OutStats = #{cnt => Pubs, oct => Bytes}, - {ok, check_oom(run_gc(OutStats, State))}; + {ok, check_oom(Pubs, Bytes, run_gc(Pubs, Bytes, State))}; false -> {ok, State} end. @@ -1080,25 +1077,36 @@ retry_limiter(#state{channel = Channel, limiter = Limiter} = State) -> %%-------------------------------------------------------------------- %% Run GC and Check OOM -run_gc(Stats, State = #state{gc_state = GcSt, zone = Zone}) -> +run_gc(Pubs, Bytes, State = #state{gc_state = GcSt, zone = Zone}) -> case ?ENABLED(GcSt) andalso not emqx_olp:backoff_gc(Zone) andalso - emqx_gc:run(Stats, GcSt) + emqx_gc:run(Pubs, Bytes, GcSt) of false -> State; {_IsGC, GcSt1} -> State#state{gc_state = GcSt1} end. -check_oom(State = #state{channel = Channel}) -> +check_oom(Pubs, Bytes, State = #state{channel = Channel}) -> ShutdownPolicy = emqx_config:get_zone_conf( emqx_channel:info(zone, Channel), [force_shutdown] ), - ?tp(debug, check_oom, #{policy => ShutdownPolicy}), case emqx_utils:check_oom(ShutdownPolicy) of {shutdown, Reason} -> %% triggers terminate/2 callback immediately + ?tp(warning, check_oom_shutdown, #{ + policy => ShutdownPolicy, + incoming_pubs => Pubs, + incoming_bytes => Bytes, + shutdown => Reason + }), erlang:exit({shutdown, Reason}); - _ -> + Result -> + ?tp(debug, check_oom_ok, #{ + policy => ShutdownPolicy, + incoming_pubs => Pubs, + incoming_bytes => Bytes, + result => Result + }), ok end, State. diff --git a/apps/emqx/src/emqx_gc.erl b/apps/emqx/src/emqx_gc.erl index 0edfab77d..cfea4ce8f 100644 --- a/apps/emqx/src/emqx_gc.erl +++ b/apps/emqx/src/emqx_gc.erl @@ -30,7 +30,6 @@ -export([ init/1, - run/2, run/3, info/1, reset/1 @@ -62,12 +61,7 @@ init(#{count := Count, bytes := Bytes}) -> Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)], ?GCS(maps:from_list(Cnt ++ Oct)). -%% @doc Try to run GC based on reduntions of count or bytes. --spec run(#{cnt := pos_integer(), oct := pos_integer()}, gc_state()) -> - {boolean(), gc_state()}. -run(#{cnt := Cnt, oct := Oct}, GcSt) -> - run(Cnt, Oct, GcSt). - +%% @doc Try to run GC based on reductions of count or bytes. -spec run(pos_integer(), pos_integer(), gc_state()) -> {boolean(), gc_state()}. run(Cnt, Oct, ?GCS(St)) -> diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 7c6f3598c..2559452bf 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -451,8 +451,8 @@ websocket_info({incoming, Packet}, State) -> handle_incoming(Packet, State); websocket_info({outgoing, Packets}, State) -> return(enqueue(Packets, State)); -websocket_info({check_gc, Stats}, State) -> - return(check_oom(run_gc(Stats, State))); +websocket_info({check_gc, Cnt, Oct}, State) -> + return(check_oom(run_gc(Cnt, Oct, State))); websocket_info( Deliver = {deliver, _Topic, _Msg}, State = #state{listener = {Type, Listener}} @@ -691,8 +691,8 @@ when_msg_in(Packets, Msgs, State) -> %% Run GC, Check OOM %%-------------------------------------------------------------------- -run_gc(Stats, State = #state{gc_state = GcSt}) -> - case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of +run_gc(Cnt, Oct, State = #state{gc_state = GcSt}) -> + case ?ENABLED(GcSt) andalso emqx_gc:run(Cnt, Oct, GcSt) of false -> State; {_IsGC, GcSt1} -> State#state{gc_state = GcSt1} end. @@ -805,11 +805,9 @@ handle_outgoing( get_active_n(Type, Listener) of true -> - Stats = #{ - cnt => emqx_pd:reset_counter(outgoing_pubs), - oct => emqx_pd:reset_counter(outgoing_bytes) - }, - postpone({check_gc, Stats}, State); + Cnt = emqx_pd:reset_counter(outgoing_pubs), + Oct = emqx_pd:reset_counter(outgoing_bytes), + postpone({check_gc, Cnt, Oct}, State); false -> State end, diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index b025e9d08..6ddeee6f1 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -525,7 +525,7 @@ t_oom_shutdown(_) -> with_conn( fun(Pid) -> Pid ! {tcp_passive, foo}, - {ok, _} = ?block_until(#{?snk_kind := check_oom}, 1000), + {ok, _} = ?block_until(#{?snk_kind := check_oom_shutdown}, 1000), {ok, _} = ?block_until(#{?snk_kind := terminate}, 100), Trace = snabbkaffe:collect_trace(), ?assertEqual(1, length(?of_kind(terminate, Trace))), diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index 48303d278..d04405adb 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -556,7 +556,7 @@ t_handle_outgoing(_) -> t_run_gc(_) -> GcSt = emqx_gc:init(#{count => 10, bytes => 100}), WsSt = st(#{gc_state => GcSt}), - ?ws_conn:run_gc(#{cnt => 100, oct => 10000}, WsSt). + ?ws_conn:run_gc(100, 10000, WsSt). t_enqueue(_) -> Packet = ?PUBLISH_PACKET(?QOS_0), diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 710148b94..88d249d5e 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -488,14 +488,12 @@ handle_msg({Closed, _Sock}, State) when handle_msg({Passive, _Sock}, State) when Passive == tcp_passive; Passive == ssl_passive -> - %% In Stats Bytes = emqx_pd:reset_counter(incoming_bytes), Pubs = emqx_pd:reset_counter(incoming_pkt), - InStats = #{cnt => Pubs, oct => Bytes}, %% Ensure Rate Limit - NState = ensure_rate_limit(InStats, State), + NState = ensure_rate_limit(State), %% Run GC and Check OOM - NState1 = check_oom(run_gc(InStats, NState)), + NState1 = check_oom(run_gc(Pubs, Bytes, NState)), handle_info(activate_socket, NState1); handle_msg( Deliver = {deliver, _Topic, _Msg}, @@ -861,8 +859,7 @@ sent(#state{active_n = ActiveN} = State) -> true -> Pubs = emqx_pd:reset_counter(outgoing_pkt), Bytes = emqx_pd:reset_counter(outgoing_bytes), - OutStats = #{cnt => Pubs, oct => Bytes}, - {ok, check_oom(run_gc(OutStats, State))}; + {ok, check_oom(run_gc(Pubs, Bytes, State))}; false -> {ok, State} end. @@ -917,14 +914,14 @@ handle_info(Info, State) -> %% TODO %% Why do we need this? %% Why not use the esockd connection limiter (based on emqx_htb_limiter) directly? -ensure_rate_limit(_Stats, State) -> +ensure_rate_limit(State) -> State. %%-------------------------------------------------------------------- %% Run GC and Check OOM -run_gc(Stats, State = #state{gc_state = GcSt}) -> - case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of +run_gc(Pubs, Bytes, State = #state{gc_state = GcSt}) -> + case ?ENABLED(GcSt) andalso emqx_gc:run(Pubs, Bytes, GcSt) of false -> State; {_IsGC, GcSt1} -> State#state{gc_state = GcSt1} end. diff --git a/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src b/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src index 8682c164c..2174e0578 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src +++ b/apps/emqx_gateway_ocpp/src/emqx_gateway_ocpp.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_ocpp, [ {description, "OCPP-J 1.6 Gateway for EMQX"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [kernel, stdlib, jesse, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl b/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl index 1b2434a85..13b8601ba 100644 --- a/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl +++ b/apps/emqx_gateway_ocpp/src/emqx_ocpp_connection.erl @@ -469,20 +469,18 @@ websocket_handle({Frame, _}, State) -> websocket_info({call, From, Req}, State) -> handle_call(From, Req, State); websocket_info({cast, rate_limit}, State) -> - Stats = #{ - cnt => emqx_pd:reset_counter(incoming_pubs), - oct => emqx_pd:reset_counter(incoming_bytes) - }, - NState = postpone({check_gc, Stats}, State), - return(ensure_rate_limit(Stats, NState)); + Cnt = emqx_pd:reset_counter(incoming_pubs), + Oct = emqx_pd:reset_counter(incoming_bytes), + NState = postpone({check_gc, Cnt, Oct}, State), + return(ensure_rate_limit(NState)); websocket_info({cast, Msg}, State) -> handle_info(Msg, State); websocket_info({incoming, Packet}, State) -> handle_incoming(Packet, State); websocket_info({outgoing, Packets}, State) -> return(enqueue(Packets, State)); -websocket_info({check_gc, Stats}, State) -> - return(check_oom(run_gc(Stats, State))); +websocket_info({check_gc, Cnt, Oct}, State) -> + return(check_oom(run_gc(Cnt, Oct, State))); websocket_info( Deliver = {deliver, _Topic, _Msg}, State = #state{active_n = ActiveN} @@ -601,15 +599,15 @@ handle_timeout(TRef, TMsg, State) -> %% Ensure rate limit %%-------------------------------------------------------------------- -ensure_rate_limit(_Stats, State) -> +ensure_rate_limit(State) -> State. %%-------------------------------------------------------------------- %% Run GC, Check OOM %%-------------------------------------------------------------------- -run_gc(Stats, State = #state{gc_state = GcSt}) -> - case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of +run_gc(Cnt, Oct, State = #state{gc_state = GcSt}) -> + case ?ENABLED(GcSt) andalso emqx_gc:run(Cnt, Oct, GcSt) of false -> State; {_IsGC, GcSt1} -> State#state{gc_state = GcSt1} end. @@ -694,11 +692,9 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, piggyback = Piggybac NState = case emqx_pd:get_counter(outgoing_pubs) > ActiveN of true -> - Stats = #{ - cnt => emqx_pd:reset_counter(outgoing_pubs), - oct => emqx_pd:reset_counter(outgoing_bytes) - }, - postpone({check_gc, Stats}, State); + Cnt = emqx_pd:reset_counter(outgoing_pubs), + Oct = emqx_pd:reset_counter(outgoing_bytes), + postpone({check_gc, Cnt, Oct}, State); false -> State end,