117 lines
3.5 KiB
Erlang
117 lines
3.5 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2017-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% @doc
|
|
%% This module manages an opaque collection of statistics data used to
|
|
%% force garbage collection on `self()' process when hitting thresholds.
|
|
%% Namely:
|
|
%% (1) Total number of messages passed through
|
|
%% (2) Total data volume passed through
|
|
%% @end
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_gc).
|
|
|
|
-include("types.hrl").
|
|
|
|
-export([ init/1
|
|
, run/2
|
|
, run/3
|
|
, info/1
|
|
, reset/1
|
|
]).
|
|
|
|
-export_type([opts/0, gc_state/0]).
|
|
|
|
-type(opts() :: #{count => integer(),
|
|
bytes => integer()}).
|
|
|
|
-type(st() :: #{cnt => {integer(), integer()},
|
|
oct => {integer(), integer()}}).
|
|
|
|
-opaque(gc_state() :: {gc_state, st()}).
|
|
|
|
-define(GCS(St), {gc_state, St}).
|
|
|
|
-define(disabled, disabled).
|
|
-define(ENABLED(X), (is_integer(X) andalso X > 0)).
|
|
|
|
%% @doc Initialize force GC state.
|
|
-spec(init(opts()) -> gc_state()).
|
|
init(#{count := Count, bytes := Bytes}) ->
|
|
Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)],
|
|
Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)],
|
|
?GCS(maps:from_list(Cnt ++ Oct)).
|
|
|
|
%% @doc Try to run GC based on reduntions of count or bytes.
|
|
-spec(run(#{cnt := pos_integer(), oct := pos_integer()}, gc_state())
|
|
-> {boolean(), gc_state()}).
|
|
run(#{cnt := Cnt, oct := Oct}, GcSt) ->
|
|
run(Cnt, Oct, GcSt).
|
|
|
|
-spec(run(pos_integer(), pos_integer(), gc_state())
|
|
-> {boolean(), gc_state()}).
|
|
run(Cnt, Oct, ?GCS(St)) ->
|
|
{Res, St1} = do_run([{cnt, Cnt}, {oct, Oct}], St),
|
|
{Res, ?GCS(St1)}.
|
|
|
|
do_run([], St) ->
|
|
{false, St};
|
|
do_run([{K, N}|T], St) ->
|
|
case dec(K, N, St) of
|
|
{true, St1} ->
|
|
erlang:garbage_collect(),
|
|
{true, do_reset(St1)};
|
|
{false, St1} ->
|
|
do_run(T, St1)
|
|
end.
|
|
|
|
%% @doc Info of GC state.
|
|
-spec(info(maybe(gc_state())) -> maybe(map())).
|
|
info(?GCS(St)) -> St.
|
|
|
|
%% @doc Reset counters to zero.
|
|
-spec(reset(maybe(gc_state())) -> gc_state()).
|
|
reset(?GCS(St)) ->
|
|
?GCS(do_reset(St)).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(dec(cnt | oct, pos_integer(), st()) -> {boolean(), st()}).
|
|
dec(Key, Num, St) ->
|
|
case maps:get(Key, St, ?disabled) of
|
|
?disabled ->
|
|
{false, St};
|
|
{Init, Remain} when Remain > Num ->
|
|
{false, maps:put(Key, {Init, Remain - Num}, St)};
|
|
_ ->
|
|
{true, St}
|
|
end.
|
|
|
|
do_reset(St) ->
|
|
do_reset(cnt, do_reset(oct, St)).
|
|
|
|
%% Reset counters to zero.
|
|
do_reset(Key, St) ->
|
|
case maps:get(Key, St, ?disabled) of
|
|
?disabled -> St;
|
|
{Init, _} -> maps:put(Key, {Init, Init}, St)
|
|
end.
|
|
|