Merge pull request #9730 from kjellwinblad/kjell/fix/resource_atom_leak/EMQX-8583

fix: remove atom leaks
This commit is contained in:
Kjell Winblad 2023-01-13 14:38:28 +01:00 committed by GitHub
commit 1ac03ab208
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 91 additions and 85 deletions

View File

@ -54,7 +54,7 @@
% State record % State record
-record(data, { -record(data, {
id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid
}). }).
-type data() :: #data{}. -type data() :: #data{}.
@ -296,17 +296,16 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) ->
state = undefined, state = undefined,
error = undefined error = undefined
}, },
Module = atom_to_binary(?MODULE), gen_statem:start_link(?MODULE, {Data, Opts}, []).
ProcName = binary_to_atom(<<Module/binary, "_", MgrId/binary>>, utf8),
gen_statem:start_link({local, ProcName}, ?MODULE, {Data, Opts}, []).
init({Data, Opts}) -> init({Data, Opts}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
%% init the cache so that lookup/1 will always return something %% init the cache so that lookup/1 will always return something
insert_cache(Data#data.id, Data#data.group, Data), DataWithPid = Data#data{pid = self()},
insert_cache(DataWithPid#data.id, DataWithPid#data.group, DataWithPid),
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
true -> {ok, connecting, Data, {next_event, internal, start_resource}}; true -> {ok, connecting, DataWithPid, {next_event, internal, start_resource}};
false -> {ok, stopped, Data} false -> {ok, stopped, DataWithPid}
end. end.
terminate(_Reason, _State, Data) -> terminate(_Reason, _State, Data) ->
@ -649,10 +648,12 @@ do_wait_for_ready(ResId, Retry) ->
safe_call(ResId, Message, Timeout) -> safe_call(ResId, Message, Timeout) ->
try try
Module = atom_to_binary(?MODULE), case read_cache(ResId) of
MgrId = get_owner(ResId), not_found ->
ProcName = binary_to_existing_atom(<<Module/binary, "_", MgrId/binary>>, utf8), {error, not_found};
gen_statem:call(ProcName, Message, {clean_timeout, Timeout}) {_, #data{pid = ManagerPid}} ->
gen_statem:call(ManagerPid, Message, {clean_timeout, Timeout})
end
catch catch
error:badarg -> error:badarg ->
{error, not_found}; {error, not_found};

View File

@ -76,7 +76,7 @@
-type data() :: #{ -type data() :: #{
id => id(), id => id(),
index => index(), index => index(),
name => atom(), inflight_tid => ets:tid(),
batch_size => pos_integer(), batch_size => pos_integer(),
batch_time => timer:time(), batch_time => timer:time(),
queue => replayq:q(), queue => replayq:q(),
@ -87,7 +87,7 @@
callback_mode() -> [state_functions, state_enter]. callback_mode() -> [state_functions, state_enter].
start_link(Id, Index, Opts) -> start_link(Id, Index, Opts) ->
gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []). gen_statem:start_link(?MODULE, {Id, Index, Opts}, []).
-spec sync_query(id(), request(), query_opts()) -> Result :: term(). -spec sync_query(id(), request(), query_opts()) -> Result :: term().
sync_query(Id, Request, Opts) -> sync_query(Id, Request, Opts) ->
@ -133,11 +133,11 @@ simple_async_query(Id, Request, ReplyFun) ->
_ = handle_query_result(Id, Result, false, false), _ = handle_query_result(Id, Result, false, false),
Result. Result.
-spec block(pid() | atom()) -> ok. -spec block(pid()) -> ok.
block(ServerRef) -> block(ServerRef) ->
gen_statem:cast(ServerRef, block). gen_statem:cast(ServerRef, block).
-spec resume(pid() | atom()) -> ok. -spec resume(pid()) -> ok.
resume(ServerRef) -> resume(ServerRef) ->
gen_statem:cast(ServerRef, resume). gen_statem:cast(ServerRef, resume).
@ -145,7 +145,6 @@ resume(ServerRef) ->
init({Id, Index, Opts}) -> init({Id, Index, Opts}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Id, {Id, Index}), true = gproc_pool:connect_worker(Id, {Id, Index}),
Name = name(Id, Index),
BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE), SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
@ -165,12 +164,12 @@ init({Id, Index, Opts}) ->
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
emqx_resource_metrics:inflight_set(Id, Index, 0), emqx_resource_metrics:inflight_set(Id, Index, 0),
InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
ok = inflight_new(Name, InfltWinSZ, Id, Index), InflightTID = inflight_new(InfltWinSZ, Id, Index),
HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
St = #{ St = #{
id => Id, id => Id,
index => Index, index => Index,
name => Name, inflight_tid => InflightTID,
batch_size => BatchSize, batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
queue => Queue, queue => Queue,
@ -283,14 +282,14 @@ pick_cast(Id, Key, Query) ->
ok ok
end). end).
do_resume(#{id := Id, name := Name} = Data) -> do_resume(#{id := Id, inflight_tid := InflightTID} = Data) ->
case inflight_get_first(Name) of case inflight_get_first(InflightTID) of
empty -> empty ->
retry_queue(Data); retry_queue(Data);
{Ref, FirstQuery} -> {Ref, FirstQuery} ->
%% We retry msgs in inflight window sync, as if we send them %% We retry msgs in inflight window sync, as if we send them
%% async, they will be appended to the end of inflight window again. %% async, they will be appended to the end of inflight window again.
retry_inflight_sync(Id, Ref, FirstQuery, Name, Data) retry_inflight_sync(Id, Ref, FirstQuery, InflightTID, Data)
end. end.
retry_queue( retry_queue(
@ -299,7 +298,7 @@ retry_queue(
id := Id, id := Id,
index := Index, index := Index,
batch_size := 1, batch_size := 1,
name := Name, inflight_tid := InflightTID,
resume_interval := ResumeT resume_interval := ResumeT
} = Data0 } = Data0
) -> ) ->
@ -308,7 +307,7 @@ retry_queue(
empty -> empty ->
{next_state, running, Data0}; {next_state, running, Data0};
{Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} -> {Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} ->
QueryOpts = #{inflight_name => Name}, QueryOpts = #{inflight_name => InflightTID},
Result = call_query(configured, Id, Index, Query, QueryOpts), Result = call_query(configured, Id, Index, Query, QueryOpts),
Reply = ?REPLY(undefined, Request, HasBeenSent, Result), Reply = ?REPLY(undefined, Request, HasBeenSent, Result),
case reply_caller(Id, Reply) of case reply_caller(Id, Reply) of
@ -327,7 +326,7 @@ retry_queue(
id := Id, id := Id,
index := Index, index := Index,
batch_size := BatchSize, batch_size := BatchSize,
name := Name, inflight_tid := InflightTID,
resume_interval := ResumeT resume_interval := ResumeT
} = Data0 } = Data0
) -> ) ->
@ -336,7 +335,7 @@ retry_queue(
empty -> empty ->
{next_state, running, Data0}; {next_state, running, Data0};
{Q1, QAckRef, Batch0} -> {Q1, QAckRef, Batch0} ->
QueryOpts = #{inflight_name => Name}, QueryOpts = #{inflight_name => InflightTID},
Result = call_query(configured, Id, Index, Batch0, QueryOpts), Result = call_query(configured, Id, Index, Batch0, QueryOpts),
%% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue, %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue,
%% we now change the 'from' field to 'undefined' so it will not reply the caller again. %% we now change the 'from' field to 'undefined' so it will not reply the caller again.
@ -361,7 +360,7 @@ retry_inflight_sync(
Id, Id,
Ref, Ref,
QueryOrBatch, QueryOrBatch,
Name, InflightTID,
#{index := Index, resume_interval := ResumeT} = Data0 #{index := Index, resume_interval := ResumeT} = Data0
) -> ) ->
QueryOpts = #{}, QueryOpts = #{},
@ -375,7 +374,7 @@ retry_inflight_sync(
{keep_state, Data0, {state_timeout, ResumeT, resume}}; {keep_state, Data0, {state_timeout, ResumeT, resume}};
%% Send ok or failed but the resource is working %% Send ok or failed but the resource is working
false -> false ->
inflight_drop(Name, Ref, Id, Index), inflight_drop(InflightTID, Ref, Id, Index),
do_resume(Data0) do_resume(Data0)
end. end.
@ -451,11 +450,11 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que
#{ #{
id := Id, id := Id,
index := Index, index := Index,
name := Name inflight_tid := InflightTID
} = Data0, } = Data0,
%% unwrap when not batching (i.e., batch size == 1) %% unwrap when not batching (i.e., batch size == 1)
[?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
QueryOpts = #{inflight_name => Name}, QueryOpts = #{inflight_name => InflightTID},
Result = call_query(configured, Id, Index, Request, QueryOpts), Result = call_query(configured, Id, Index, Request, QueryOpts),
IsAsync = is_async(Id), IsAsync = is_async(Id),
Data1 = cancel_flush_timer(Data0), Data1 = cancel_flush_timer(Data0),
@ -489,9 +488,9 @@ do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queu
id := Id, id := Id,
index := Index, index := Index,
batch_size := BatchSize, batch_size := BatchSize,
name := Name inflight_tid := InflightTID
} = Data0, } = Data0,
QueryOpts = #{inflight_name => Name}, QueryOpts = #{inflight_name => InflightTID},
Result = call_query(configured, Id, Index, Batch, QueryOpts), Result = call_query(configured, Id, Index, Batch, QueryOpts),
IsAsync = is_async(Id), IsAsync = is_async(Id),
Data1 = cancel_flush_timer(Data0), Data1 = cancel_flush_timer(Data0),
@ -639,17 +638,17 @@ apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _Q
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), InflightTID = maps:get(inflight_name, QueryOpts, undefined),
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_query_async, call_query_async,
case is_inflight_full(Name) of case is_inflight_full(InflightTID) of
true -> true ->
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
ReplyFun = fun ?MODULE:reply_after_query/7, ReplyFun = fun ?MODULE:reply_after_query/7,
Ref = make_message_ref(), Ref = make_message_ref(),
Args = [self(), Id, Index, Name, Ref, Query], Args = [self(), Id, Index, InflightTID, Ref, Query],
ok = inflight_append(Name, Ref, Query, Id, Index), ok = inflight_append(InflightTID, Ref, Query, Id, Index),
Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
{async_return, Result} {async_return, Result}
end, end,
@ -661,25 +660,25 @@ apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _Qu
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
Name = maps:get(inflight_name, QueryOpts, undefined), InflightTID = maps:get(inflight_name, QueryOpts, undefined),
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_batch_query_async, call_batch_query_async,
case is_inflight_full(Name) of case is_inflight_full(InflightTID) of
true -> true ->
{async_return, inflight_full}; {async_return, inflight_full};
false -> false ->
ReplyFun = fun ?MODULE:batch_reply_after_query/7, ReplyFun = fun ?MODULE:batch_reply_after_query/7,
Ref = make_message_ref(), Ref = make_message_ref(),
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]}, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]},
Requests = [Request || ?QUERY(_From, Request, _) <- Batch], Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
ok = inflight_append(Name, Ref, Batch, Id, Index), ok = inflight_append(InflightTID, Ref, Batch, Id, Index),
Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt),
{async_return, Result} {async_return, Result}
end, end,
Batch Batch
). ).
reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasBeenSent), Result) -> reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), Result) ->
%% NOTE: 'inflight' is the count of messages that were sent async %% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
%% inflight window. %% inflight window.
@ -687,10 +686,10 @@ reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasBeenSent),
true -> true ->
?MODULE:block(Pid); ?MODULE:block(Pid);
false -> false ->
drop_inflight_and_resume(Pid, Name, Ref, Id, Index) drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index)
end. end.
batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) -> batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) ->
%% NOTE: 'inflight' is the count of messages that were sent async %% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
%% inflight window. %% inflight window.
@ -698,16 +697,16 @@ batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) ->
true -> true ->
?MODULE:block(Pid); ?MODULE:block(Pid);
false -> false ->
drop_inflight_and_resume(Pid, Name, Ref, Id, Index) drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index)
end. end.
drop_inflight_and_resume(Pid, Name, Ref, Id, Index) -> drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) ->
case is_inflight_full(Name) of case is_inflight_full(InflightTID) of
true -> true ->
inflight_drop(Name, Ref, Id, Index), inflight_drop(InflightTID, Ref, Id, Index),
?MODULE:resume(Pid); ?MODULE:resume(Pid);
false -> false ->
inflight_drop(Name, Ref, Id, Index) inflight_drop(InflightTID, Ref, Id, Index)
end. end.
%%============================================================================== %%==============================================================================
@ -757,82 +756,85 @@ get_first_n_from_queue(Q, N) ->
%% the inflight queue for async query %% the inflight queue for async query
-define(MAX_SIZE_REF, -1). -define(MAX_SIZE_REF, -1).
-define(SIZE_REF, -2). -define(SIZE_REF, -2).
inflight_new(Name, InfltWinSZ, Id, Index) -> inflight_new(InfltWinSZ, Id, Index) ->
_ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]), TableId = ets:new(
inflight_append(Name, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index), emqx_resource_worker_inflight_tab,
[ordered_set, public, {write_concurrency, true}]
),
inflight_append(TableId, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index),
%% we use this counter because we might deal with batches as %% we use this counter because we might deal with batches as
%% elements. %% elements.
inflight_append(Name, ?SIZE_REF, 0, Id, Index), inflight_append(TableId, ?SIZE_REF, 0, Id, Index),
ok. TableId.
inflight_get_first(Name) -> inflight_get_first(InflightTID) ->
case ets:next(Name, ?MAX_SIZE_REF) of case ets:next(InflightTID, ?MAX_SIZE_REF) of
'$end_of_table' -> '$end_of_table' ->
empty; empty;
Ref -> Ref ->
case ets:lookup(Name, Ref) of case ets:lookup(InflightTID, Ref) of
[Object] -> [Object] ->
Object; Object;
[] -> [] ->
%% it might have been dropped %% it might have been dropped
inflight_get_first(Name) inflight_get_first(InflightTID)
end end
end. end.
is_inflight_full(undefined) -> is_inflight_full(undefined) ->
false; false;
is_inflight_full(Name) -> is_inflight_full(InflightTID) ->
[{_, {max_size, MaxSize}}] = ets:lookup(Name, ?MAX_SIZE_REF), [{_, {max_size, MaxSize}}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
%% we consider number of batches rather than number of messages %% we consider number of batches rather than number of messages
%% because one batch request may hold several messages. %% because one batch request may hold several messages.
Size = inflight_num_batches(Name), Size = inflight_num_batches(InflightTID),
Size >= MaxSize. Size >= MaxSize.
inflight_num_batches(Name) -> inflight_num_batches(InflightTID) ->
%% Note: we subtract 2 because there're 2 metadata rows that hold %% Note: we subtract 2 because there're 2 metadata rows that hold
%% the maximum size value and the number of messages. %% the maximum size value and the number of messages.
MetadataRowCount = 2, MetadataRowCount = 2,
case ets:info(Name, size) of case ets:info(InflightTID, size) of
undefined -> 0; undefined -> 0;
Size -> max(0, Size - MetadataRowCount) Size -> max(0, Size - MetadataRowCount)
end. end.
inflight_num_msgs(Name) -> inflight_num_msgs(InflightTID) ->
[{_, Size}] = ets:lookup(Name, ?SIZE_REF), [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF),
Size. Size.
inflight_append(undefined, _Ref, _Query, _Id, _Index) -> inflight_append(undefined, _Ref, _Query, _Id, _Index) ->
ok; ok;
inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) -> inflight_append(InflightTID, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) ->
Batch = mark_as_sent(Batch0), Batch = mark_as_sent(Batch0),
ets:insert(Name, {Ref, Batch}), ets:insert(InflightTID, {Ref, Batch}),
BatchSize = length(Batch), BatchSize = length(Batch),
ets:update_counter(Name, ?SIZE_REF, {2, BatchSize}), ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
ok; ok;
inflight_append(Name, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) -> inflight_append(InflightTID, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) ->
Query = mark_as_sent(Query0), Query = mark_as_sent(Query0),
ets:insert(Name, {Ref, Query}), ets:insert(InflightTID, {Ref, Query}),
ets:update_counter(Name, ?SIZE_REF, {2, 1}), ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
ok; ok;
inflight_append(Name, Ref, Data, _Id, _Index) -> inflight_append(InflightTID, Ref, Data, _Id, _Index) ->
ets:insert(Name, {Ref, Data}), ets:insert(InflightTID, {Ref, Data}),
%% this is a metadata row being inserted; therefore, we don't bump %% this is a metadata row being inserted; therefore, we don't bump
%% the inflight metric. %% the inflight metric.
ok. ok.
inflight_drop(undefined, _, _Id, _Index) -> inflight_drop(undefined, _, _Id, _Index) ->
ok; ok;
inflight_drop(Name, Ref, Id, Index) -> inflight_drop(InflightTID, Ref, Id, Index) ->
Count = Count =
case ets:take(Name, Ref) of case ets:take(InflightTID, Ref) of
[{Ref, ?QUERY(_, _, _)}] -> 1; [{Ref, ?QUERY(_, _, _)}] -> 1;
[{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch); [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch);
_ -> 0 _ -> 0
end, end,
Count > 0 andalso ets:update_counter(Name, ?SIZE_REF, {2, -Count, 0, 0}), Count > 0 andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
ok. ok.
%%============================================================================== %%==============================================================================
@ -868,13 +870,6 @@ assert_ok_result(R) ->
queue_count(Q) -> queue_count(Q) ->
replayq:count(Q). replayq:count(Q).
-spec name(id(), integer()) -> atom().
name(Id, Index) ->
Mod = atom_to_list(?MODULE),
Id1 = binary_to_list(Id),
Index1 = integer_to_list(Index),
list_to_atom(lists:concat([Mod, ":", Id1, ":", Index1])).
disk_queue_dir(Id, Index) -> disk_queue_dir(Id, Index) ->
QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index), QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
filename:join([emqx:data_dir(), "resource_worker", node(), QDir]). filename:join([emqx:data_dir(), "resource_worker", node(), QDir]).

View File

@ -944,7 +944,15 @@ t_create_dry_run_local(_) ->
end, end,
lists:seq(1, 10) lists:seq(1, 10)
), ),
[] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}). case [] =:= ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}) of
false ->
%% Sleep to remove flakyness in test case. It take some time for
%% the ETS table to be cleared.
timer:sleep(2000),
[] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'});
true ->
ok
end.
create_dry_run_local_succ() -> create_dry_run_local_succ() ->
case whereis(test_resource) of case whereis(test_resource) of

View File

@ -0,0 +1 @@
Potential leaks of atoms that could lead to a crash if a lot of resources were created have been removed.

View File

@ -0,0 +1 @@
如果创建了大量的资源,可能会导致崩溃的潜在的原子泄漏已经被删除。