chore: log shutdown reason for check_oom trace log

This commit is contained in:
zhongwencool 2024-06-27 11:41:00 +08:00
parent 10e9fed22b
commit 6a78951715
8 changed files with 47 additions and 54 deletions

View File

@ -564,12 +564,10 @@ handle_msg({Closed, _Sock}, State) when
handle_msg({Passive, _Sock}, State) when
Passive == tcp_passive; Passive == ssl_passive; Passive =:= quic_passive
->
%% In Stats
Pubs = emqx_pd:reset_counter(incoming_pubs),
Bytes = emqx_pd:reset_counter(incoming_bytes),
InStats = #{cnt => Pubs, oct => Bytes},
%% Run GC and Check OOM
NState1 = check_oom(run_gc(InStats, State)),
NState1 = check_oom(Pubs, Bytes, run_gc(Pubs, Bytes, State)),
handle_info(activate_socket, NState1);
handle_msg(
Deliver = {deliver, _Topic, _Msg},
@ -899,8 +897,7 @@ sent(#state{listener = {Type, Listener}} = State) ->
true ->
Pubs = emqx_pd:reset_counter(outgoing_pubs),
Bytes = emqx_pd:reset_counter(outgoing_bytes),
OutStats = #{cnt => Pubs, oct => Bytes},
{ok, check_oom(run_gc(OutStats, State))};
{ok, check_oom(Pubs, Bytes, run_gc(Pubs, Bytes, State))};
false ->
{ok, State}
end.
@ -1080,25 +1077,36 @@ retry_limiter(#state{channel = Channel, limiter = Limiter} = State) ->
%%--------------------------------------------------------------------
%% Run GC and Check OOM
run_gc(Stats, State = #state{gc_state = GcSt, zone = Zone}) ->
run_gc(Pubs, Bytes, State = #state{gc_state = GcSt, zone = Zone}) ->
case
?ENABLED(GcSt) andalso not emqx_olp:backoff_gc(Zone) andalso
emqx_gc:run(Stats, GcSt)
emqx_gc:run(Pubs, Bytes, GcSt)
of
false -> State;
{_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
end.
check_oom(State = #state{channel = Channel}) ->
check_oom(Pubs, Bytes, State = #state{channel = Channel}) ->
ShutdownPolicy = emqx_config:get_zone_conf(
emqx_channel:info(zone, Channel), [force_shutdown]
),
?tp(debug, check_oom, #{policy => ShutdownPolicy}),
case emqx_utils:check_oom(ShutdownPolicy) of
{shutdown, Reason} ->
%% triggers terminate/2 callback immediately
?tp(warning, check_oom_shutdown, #{
policy => ShutdownPolicy,
incoming_pubs => Pubs,
incoming_bytes => Bytes,
shutdown => Reason
}),
erlang:exit({shutdown, Reason});
_ ->
Result ->
?tp(debug, check_oom_ok, #{
policy => ShutdownPolicy,
incoming_pubs => Pubs,
incoming_bytes => Bytes,
result => Result
}),
ok
end,
State.

View File

@ -30,7 +30,6 @@
-export([
init/1,
run/2,
run/3,
info/1,
reset/1
@ -62,12 +61,7 @@ init(#{count := Count, bytes := Bytes}) ->
Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)],
?GCS(maps:from_list(Cnt ++ Oct)).
%% @doc Try to run GC based on reduntions of count or bytes.
-spec run(#{cnt := pos_integer(), oct := pos_integer()}, gc_state()) ->
{boolean(), gc_state()}.
run(#{cnt := Cnt, oct := Oct}, GcSt) ->
run(Cnt, Oct, GcSt).
%% @doc Try to run GC based on reductions of count or bytes.
-spec run(pos_integer(), pos_integer(), gc_state()) ->
{boolean(), gc_state()}.
run(Cnt, Oct, ?GCS(St)) ->

View File

@ -451,8 +451,8 @@ websocket_info({incoming, Packet}, State) ->
handle_incoming(Packet, State);
websocket_info({outgoing, Packets}, State) ->
return(enqueue(Packets, State));
websocket_info({check_gc, Stats}, State) ->
return(check_oom(run_gc(Stats, State)));
websocket_info({check_gc, Cnt, Oct}, State) ->
return(check_oom(run_gc(Cnt, Oct, State)));
websocket_info(
Deliver = {deliver, _Topic, _Msg},
State = #state{listener = {Type, Listener}}
@ -691,8 +691,8 @@ when_msg_in(Packets, Msgs, State) ->
%% Run GC, Check OOM
%%--------------------------------------------------------------------
run_gc(Stats, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
run_gc(Cnt, Oct, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Cnt, Oct, GcSt) of
false -> State;
{_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
end.
@ -805,11 +805,9 @@ handle_outgoing(
get_active_n(Type, Listener)
of
true ->
Stats = #{
cnt => emqx_pd:reset_counter(outgoing_pubs),
oct => emqx_pd:reset_counter(outgoing_bytes)
},
postpone({check_gc, Stats}, State);
Cnt = emqx_pd:reset_counter(outgoing_pubs),
Oct = emqx_pd:reset_counter(outgoing_bytes),
postpone({check_gc, Cnt, Oct}, State);
false ->
State
end,

View File

@ -525,7 +525,7 @@ t_oom_shutdown(_) ->
with_conn(
fun(Pid) ->
Pid ! {tcp_passive, foo},
{ok, _} = ?block_until(#{?snk_kind := check_oom}, 1000),
{ok, _} = ?block_until(#{?snk_kind := check_oom_shutdown}, 1000),
{ok, _} = ?block_until(#{?snk_kind := terminate}, 100),
Trace = snabbkaffe:collect_trace(),
?assertEqual(1, length(?of_kind(terminate, Trace))),

View File

@ -556,7 +556,7 @@ t_handle_outgoing(_) ->
t_run_gc(_) ->
GcSt = emqx_gc:init(#{count => 10, bytes => 100}),
WsSt = st(#{gc_state => GcSt}),
?ws_conn:run_gc(#{cnt => 100, oct => 10000}, WsSt).
?ws_conn:run_gc(100, 10000, WsSt).
t_enqueue(_) ->
Packet = ?PUBLISH_PACKET(?QOS_0),

View File

@ -488,14 +488,12 @@ handle_msg({Closed, _Sock}, State) when
handle_msg({Passive, _Sock}, State) when
Passive == tcp_passive; Passive == ssl_passive
->
%% In Stats
Bytes = emqx_pd:reset_counter(incoming_bytes),
Pubs = emqx_pd:reset_counter(incoming_pkt),
InStats = #{cnt => Pubs, oct => Bytes},
%% Ensure Rate Limit
NState = ensure_rate_limit(InStats, State),
NState = ensure_rate_limit(State),
%% Run GC and Check OOM
NState1 = check_oom(run_gc(InStats, NState)),
NState1 = check_oom(run_gc(Pubs, Bytes, NState)),
handle_info(activate_socket, NState1);
handle_msg(
Deliver = {deliver, _Topic, _Msg},
@ -861,8 +859,7 @@ sent(#state{active_n = ActiveN} = State) ->
true ->
Pubs = emqx_pd:reset_counter(outgoing_pkt),
Bytes = emqx_pd:reset_counter(outgoing_bytes),
OutStats = #{cnt => Pubs, oct => Bytes},
{ok, check_oom(run_gc(OutStats, State))};
{ok, check_oom(run_gc(Pubs, Bytes, State))};
false ->
{ok, State}
end.
@ -917,14 +914,14 @@ handle_info(Info, State) ->
%% TODO
%% Why do we need this?
%% Why not use the esockd connection limiter (based on emqx_htb_limiter) directly?
ensure_rate_limit(_Stats, State) ->
ensure_rate_limit(State) ->
State.
%%--------------------------------------------------------------------
%% Run GC and Check OOM
run_gc(Stats, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
run_gc(Pubs, Bytes, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Pubs, Bytes, GcSt) of
false -> State;
{_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
end.

View File

@ -1,6 +1,6 @@
{application, emqx_gateway_ocpp, [
{description, "OCPP-J 1.6 Gateway for EMQX"},
{vsn, "0.1.4"},
{vsn, "0.1.5"},
{registered, []},
{applications, [kernel, stdlib, jesse, emqx, emqx_gateway]},
{env, []},

View File

@ -469,20 +469,18 @@ websocket_handle({Frame, _}, State) ->
websocket_info({call, From, Req}, State) ->
handle_call(From, Req, State);
websocket_info({cast, rate_limit}, State) ->
Stats = #{
cnt => emqx_pd:reset_counter(incoming_pubs),
oct => emqx_pd:reset_counter(incoming_bytes)
},
NState = postpone({check_gc, Stats}, State),
return(ensure_rate_limit(Stats, NState));
Cnt = emqx_pd:reset_counter(incoming_pubs),
Oct = emqx_pd:reset_counter(incoming_bytes),
NState = postpone({check_gc, Cnt, Oct}, State),
return(ensure_rate_limit(NState));
websocket_info({cast, Msg}, State) ->
handle_info(Msg, State);
websocket_info({incoming, Packet}, State) ->
handle_incoming(Packet, State);
websocket_info({outgoing, Packets}, State) ->
return(enqueue(Packets, State));
websocket_info({check_gc, Stats}, State) ->
return(check_oom(run_gc(Stats, State)));
websocket_info({check_gc, Cnt, Oct}, State) ->
return(check_oom(run_gc(Cnt, Oct, State)));
websocket_info(
Deliver = {deliver, _Topic, _Msg},
State = #state{active_n = ActiveN}
@ -601,15 +599,15 @@ handle_timeout(TRef, TMsg, State) ->
%% Ensure rate limit
%%--------------------------------------------------------------------
ensure_rate_limit(_Stats, State) ->
ensure_rate_limit(State) ->
State.
%%--------------------------------------------------------------------
%% Run GC, Check OOM
%%--------------------------------------------------------------------
run_gc(Stats, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
run_gc(Cnt, Oct, State = #state{gc_state = GcSt}) ->
case ?ENABLED(GcSt) andalso emqx_gc:run(Cnt, Oct, GcSt) of
false -> State;
{_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
end.
@ -694,11 +692,9 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, piggyback = Piggybac
NState =
case emqx_pd:get_counter(outgoing_pubs) > ActiveN of
true ->
Stats = #{
cnt => emqx_pd:reset_counter(outgoing_pubs),
oct => emqx_pd:reset_counter(outgoing_bytes)
},
postpone({check_gc, Stats}, State);
Cnt = emqx_pd:reset_counter(outgoing_pubs),
Oct = emqx_pd:reset_counter(outgoing_bytes),
postpone({check_gc, Cnt, Oct}, State);
false ->
State
end,