From 8c482e03d143733552a1196f2a90042e0612220b Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 11 Jan 2023 16:48:29 +0100 Subject: [PATCH 1/2] fix: remove atom leaks Both emqx_resource_managers and emqx_resource_workers leaked atoms as they created an unique atoms to use as registered names. This is fixed by removing the need to register the names. Fixes: https://emqx.atlassian.net/browse/EMQX-8583 --- .../src/emqx_resource_manager.erl | 27 ++-- .../src/emqx_resource_worker.erl | 141 +++++++++--------- changes/v5.0.14/fix-8583.en.md | 1 + changes/v5.0.14/fix-8583.zh.md | 1 + 4 files changed, 86 insertions(+), 84 deletions(-) create mode 100644 changes/v5.0.14/fix-8583.en.md create mode 100644 changes/v5.0.14/fix-8583.zh.md diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 8531f1641..06e6288e3 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -54,7 +54,7 @@ % State record -record(data, { - id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error + id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid }). -type data() :: #data{}. @@ -296,17 +296,16 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> state = undefined, error = undefined }, - Module = atom_to_binary(?MODULE), - ProcName = binary_to_atom(<>, utf8), - gen_statem:start_link({local, ProcName}, ?MODULE, {Data, Opts}, []). + gen_statem:start_link(?MODULE, {Data, Opts}, []). init({Data, Opts}) -> process_flag(trap_exit, true), %% init the cache so that lookup/1 will always return something - insert_cache(Data#data.id, Data#data.group, Data), + DataWithPid = Data#data{pid = self()}, + insert_cache(DataWithPid#data.id, DataWithPid#data.group, DataWithPid), case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of - true -> {ok, connecting, Data, {next_event, internal, start_resource}}; - false -> {ok, stopped, Data} + true -> {ok, connecting, DataWithPid, {next_event, internal, start_resource}}; + false -> {ok, stopped, DataWithPid} end. terminate(_Reason, _State, Data) -> @@ -429,6 +428,14 @@ read_cache(ResId) -> [] -> not_found end. +read_manager_pid_from_cache(ResId) -> + case read_cache(ResId) of + not_found -> + erlang:error(badarg); + {_, #data{pid = ManagerPid}} -> + ManagerPid + end. + delete_cache(ResId, MgrId) -> case get_owner(ResId) of MgrIdNow when MgrIdNow == not_found; MgrIdNow == MgrId -> @@ -649,10 +656,8 @@ do_wait_for_ready(ResId, Retry) -> safe_call(ResId, Message, Timeout) -> try - Module = atom_to_binary(?MODULE), - MgrId = get_owner(ResId), - ProcName = binary_to_existing_atom(<>, utf8), - gen_statem:call(ProcName, Message, {clean_timeout, Timeout}) + ManagerPid = read_manager_pid_from_cache(ResId), + gen_statem:call(ManagerPid, Message, {clean_timeout, Timeout}) catch error:badarg -> {error, not_found}; diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 93bb22551..264556bb5 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -76,7 +76,7 @@ -type data() :: #{ id => id(), index => index(), - name => atom(), + inflight_tid => ets:tid(), batch_size => pos_integer(), batch_time => timer:time(), queue => replayq:q(), @@ -87,7 +87,7 @@ callback_mode() -> [state_functions, state_enter]. start_link(Id, Index, Opts) -> - gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []). + gen_statem:start_link(?MODULE, {Id, Index, Opts}, []). -spec sync_query(id(), request(), query_opts()) -> Result :: term(). sync_query(Id, Request, Opts) -> @@ -133,11 +133,11 @@ simple_async_query(Id, Request, ReplyFun) -> _ = handle_query_result(Id, Result, false, false), Result. --spec block(pid() | atom()) -> ok. +-spec block(pid()) -> ok. block(ServerRef) -> gen_statem:cast(ServerRef, block). --spec resume(pid() | atom()) -> ok. +-spec resume(pid()) -> ok. resume(ServerRef) -> gen_statem:cast(ServerRef, resume). @@ -145,7 +145,6 @@ resume(ServerRef) -> init({Id, Index, Opts}) -> process_flag(trap_exit, true), true = gproc_pool:connect_worker(Id, {Id, Index}), - Name = name(Id, Index), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE), TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), @@ -165,12 +164,12 @@ init({Id, Index, Opts}) -> emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), emqx_resource_metrics:inflight_set(Id, Index, 0), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), - ok = inflight_new(Name, InfltWinSZ, Id, Index), + {ok, InflightTID} = inflight_new(InfltWinSZ, Id, Index), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), St = #{ id => Id, index => Index, - name => Name, + inflight_tid => InflightTID, batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), queue => Queue, @@ -283,14 +282,14 @@ pick_cast(Id, Key, Query) -> ok end). -do_resume(#{id := Id, name := Name} = Data) -> - case inflight_get_first(Name) of +do_resume(#{id := Id, inflight_tid := InflightTID} = Data) -> + case inflight_get_first(InflightTID) of empty -> retry_queue(Data); {Ref, FirstQuery} -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. - retry_inflight_sync(Id, Ref, FirstQuery, Name, Data) + retry_inflight_sync(Id, Ref, FirstQuery, InflightTID, Data) end. retry_queue( @@ -299,7 +298,7 @@ retry_queue( id := Id, index := Index, batch_size := 1, - name := Name, + inflight_tid := InflightTID, resume_interval := ResumeT } = Data0 ) -> @@ -308,7 +307,7 @@ retry_queue( empty -> {next_state, running, Data0}; {Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} -> - QueryOpts = #{inflight_name => Name}, + QueryOpts = #{inflight_name => InflightTID}, Result = call_query(configured, Id, Index, Query, QueryOpts), Reply = ?REPLY(undefined, Request, HasBeenSent, Result), case reply_caller(Id, Reply) of @@ -327,7 +326,7 @@ retry_queue( id := Id, index := Index, batch_size := BatchSize, - name := Name, + inflight_tid := InflightTID, resume_interval := ResumeT } = Data0 ) -> @@ -336,7 +335,7 @@ retry_queue( empty -> {next_state, running, Data0}; {Q1, QAckRef, Batch0} -> - QueryOpts = #{inflight_name => Name}, + QueryOpts = #{inflight_name => InflightTID}, Result = call_query(configured, Id, Index, Batch0, QueryOpts), %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue, %% we now change the 'from' field to 'undefined' so it will not reply the caller again. @@ -361,7 +360,7 @@ retry_inflight_sync( Id, Ref, QueryOrBatch, - Name, + InflightTID, #{index := Index, resume_interval := ResumeT} = Data0 ) -> QueryOpts = #{}, @@ -375,7 +374,7 @@ retry_inflight_sync( {keep_state, Data0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working false -> - inflight_drop(Name, Ref, Id, Index), + inflight_drop(InflightTID, Ref, Id, Index), do_resume(Data0) end. @@ -451,11 +450,11 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que #{ id := Id, index := Index, - name := Name + inflight_tid := InflightTID } = Data0, %% unwrap when not batching (i.e., batch size == 1) [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, - QueryOpts = #{inflight_name => Name}, + QueryOpts = #{inflight_name => InflightTID}, Result = call_query(configured, Id, Index, Request, QueryOpts), IsAsync = is_async(Id), Data1 = cancel_flush_timer(Data0), @@ -489,9 +488,9 @@ do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queu id := Id, index := Index, batch_size := BatchSize, - name := Name + inflight_tid := InflightTID } = Data0, - QueryOpts = #{inflight_name => Name}, + QueryOpts = #{inflight_name => InflightTID}, Result = call_query(configured, Id, Index, Batch, QueryOpts), IsAsync = is_async(Id), Data1 = cancel_flush_timer(Data0), @@ -639,17 +638,17 @@ apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _Q ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), - Name = maps:get(inflight_name, QueryOpts, undefined), + InflightTID = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( call_query_async, - case is_inflight_full(Name) of + case is_inflight_full(InflightTID) of true -> {async_return, inflight_full}; false -> ReplyFun = fun ?MODULE:reply_after_query/7, Ref = make_message_ref(), - Args = [self(), Id, Index, Name, Ref, Query], - ok = inflight_append(Name, Ref, Query, Id, Index), + Args = [self(), Id, Index, InflightTID, Ref, Query], + ok = inflight_append(InflightTID, Ref, Query, Id, Index), Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), {async_return, Result} end, @@ -661,25 +660,25 @@ apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _Qu ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), - Name = maps:get(inflight_name, QueryOpts, undefined), + InflightTID = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( call_batch_query_async, - case is_inflight_full(Name) of + case is_inflight_full(InflightTID) of true -> {async_return, inflight_full}; false -> ReplyFun = fun ?MODULE:batch_reply_after_query/7, Ref = make_message_ref(), - ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]}, + ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]}, Requests = [Request || ?QUERY(_From, Request, _) <- Batch], - ok = inflight_append(Name, Ref, Batch, Id, Index), + ok = inflight_append(InflightTID, Ref, Batch, Id, Index), Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), {async_return, Result} end, Batch ). -reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasBeenSent), Result) -> +reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), Result) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. @@ -687,10 +686,10 @@ reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasBeenSent), true -> ?MODULE:block(Pid); false -> - drop_inflight_and_resume(Pid, Name, Ref, Id, Index) + drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) end. -batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) -> +batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. @@ -698,16 +697,16 @@ batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) -> true -> ?MODULE:block(Pid); false -> - drop_inflight_and_resume(Pid, Name, Ref, Id, Index) + drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) end. -drop_inflight_and_resume(Pid, Name, Ref, Id, Index) -> - case is_inflight_full(Name) of +drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) -> + case is_inflight_full(InflightTID) of true -> - inflight_drop(Name, Ref, Id, Index), + inflight_drop(InflightTID, Ref, Id, Index), ?MODULE:resume(Pid); false -> - inflight_drop(Name, Ref, Id, Index) + inflight_drop(InflightTID, Ref, Id, Index) end. %%============================================================================== @@ -757,82 +756,85 @@ get_first_n_from_queue(Q, N) -> %% the inflight queue for async query -define(MAX_SIZE_REF, -1). -define(SIZE_REF, -2). -inflight_new(Name, InfltWinSZ, Id, Index) -> - _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]), - inflight_append(Name, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index), +inflight_new(InfltWinSZ, Id, Index) -> + TableId = ets:new( + emqx_resource_worker_inflight_tab, + [ordered_set, public, {write_concurrency, true}] + ), + inflight_append(TableId, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index), %% we use this counter because we might deal with batches as %% elements. - inflight_append(Name, ?SIZE_REF, 0, Id, Index), - ok. + inflight_append(TableId, ?SIZE_REF, 0, Id, Index), + {ok, TableId}. -inflight_get_first(Name) -> - case ets:next(Name, ?MAX_SIZE_REF) of +inflight_get_first(InflightTID) -> + case ets:next(InflightTID, ?MAX_SIZE_REF) of '$end_of_table' -> empty; Ref -> - case ets:lookup(Name, Ref) of + case ets:lookup(InflightTID, Ref) of [Object] -> Object; [] -> %% it might have been dropped - inflight_get_first(Name) + inflight_get_first(InflightTID) end end. is_inflight_full(undefined) -> false; -is_inflight_full(Name) -> - [{_, {max_size, MaxSize}}] = ets:lookup(Name, ?MAX_SIZE_REF), +is_inflight_full(InflightTID) -> + [{_, {max_size, MaxSize}}] = ets:lookup(InflightTID, ?MAX_SIZE_REF), %% we consider number of batches rather than number of messages %% because one batch request may hold several messages. - Size = inflight_num_batches(Name), + Size = inflight_num_batches(InflightTID), Size >= MaxSize. -inflight_num_batches(Name) -> +inflight_num_batches(InflightTID) -> %% Note: we subtract 2 because there're 2 metadata rows that hold %% the maximum size value and the number of messages. MetadataRowCount = 2, - case ets:info(Name, size) of + case ets:info(InflightTID, size) of undefined -> 0; Size -> max(0, Size - MetadataRowCount) end. -inflight_num_msgs(Name) -> - [{_, Size}] = ets:lookup(Name, ?SIZE_REF), +inflight_num_msgs(InflightTID) -> + [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF), Size. inflight_append(undefined, _Ref, _Query, _Id, _Index) -> ok; -inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) -> +inflight_append(InflightTID, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) -> Batch = mark_as_sent(Batch0), - ets:insert(Name, {Ref, Batch}), + ets:insert(InflightTID, {Ref, Batch}), BatchSize = length(Batch), - ets:update_counter(Name, ?SIZE_REF, {2, BatchSize}), - emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), + ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ok; -inflight_append(Name, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) -> +inflight_append(InflightTID, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) -> Query = mark_as_sent(Query0), - ets:insert(Name, {Ref, Query}), - ets:update_counter(Name, ?SIZE_REF, {2, 1}), - emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), + ets:insert(InflightTID, {Ref, Query}), + ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ok; -inflight_append(Name, Ref, Data, _Id, _Index) -> - ets:insert(Name, {Ref, Data}), +inflight_append(InflightTID, Ref, Data, _Id, _Index) -> + ets:insert(InflightTID, {Ref, Data}), %% this is a metadata row being inserted; therefore, we don't bump %% the inflight metric. ok. inflight_drop(undefined, _, _Id, _Index) -> ok; -inflight_drop(Name, Ref, Id, Index) -> +inflight_drop(InflightTID, Ref, Id, Index) -> Count = - case ets:take(Name, Ref) of + case ets:take(InflightTID, Ref) of [{Ref, ?QUERY(_, _, _)}] -> 1; [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch); _ -> 0 end, - Count > 0 andalso ets:update_counter(Name, ?SIZE_REF, {2, -Count, 0, 0}), - emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), + Count > 0 andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), ok. %%============================================================================== @@ -868,13 +870,6 @@ assert_ok_result(R) -> queue_count(Q) -> replayq:count(Q). --spec name(id(), integer()) -> atom(). -name(Id, Index) -> - Mod = atom_to_list(?MODULE), - Id1 = binary_to_list(Id), - Index1 = integer_to_list(Index), - list_to_atom(lists:concat([Mod, ":", Id1, ":", Index1])). - disk_queue_dir(Id, Index) -> QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index), filename:join([emqx:data_dir(), "resource_worker", node(), QDir]). diff --git a/changes/v5.0.14/fix-8583.en.md b/changes/v5.0.14/fix-8583.en.md new file mode 100644 index 000000000..f926d6e51 --- /dev/null +++ b/changes/v5.0.14/fix-8583.en.md @@ -0,0 +1 @@ +Potential leaks of atoms that could lead to a crash if a lot of resources were created have been removed. diff --git a/changes/v5.0.14/fix-8583.zh.md b/changes/v5.0.14/fix-8583.zh.md new file mode 100644 index 000000000..e2138d654 --- /dev/null +++ b/changes/v5.0.14/fix-8583.zh.md @@ -0,0 +1 @@ +如果创建了大量的资源,可能会导致崩溃的潜在的原子泄漏已经被删除。 From 734e6b9c96c1366ff7bbbf898ec65571baec98fb Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 13 Jan 2023 10:20:26 +0100 Subject: [PATCH 2/2] chore: fix flaky test cases, log labels and review comments Co-authored-by: Thales Macedo Garitezi --- apps/emqx_resource/src/emqx_resource_manager.erl | 16 ++++++---------- apps/emqx_resource/src/emqx_resource_worker.erl | 4 ++-- apps/emqx_resource/test/emqx_resource_SUITE.erl | 10 +++++++++- .../v5.0.14/{fix-8583.en.md => fix-9730.en.md} | 0 .../v5.0.14/{fix-8583.zh.md => fix-9730.zh.md} | 0 5 files changed, 17 insertions(+), 13 deletions(-) rename changes/v5.0.14/{fix-8583.en.md => fix-9730.en.md} (100%) rename changes/v5.0.14/{fix-8583.zh.md => fix-9730.zh.md} (100%) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 06e6288e3..553802b78 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -428,14 +428,6 @@ read_cache(ResId) -> [] -> not_found end. -read_manager_pid_from_cache(ResId) -> - case read_cache(ResId) of - not_found -> - erlang:error(badarg); - {_, #data{pid = ManagerPid}} -> - ManagerPid - end. - delete_cache(ResId, MgrId) -> case get_owner(ResId) of MgrIdNow when MgrIdNow == not_found; MgrIdNow == MgrId -> @@ -656,8 +648,12 @@ do_wait_for_ready(ResId, Retry) -> safe_call(ResId, Message, Timeout) -> try - ManagerPid = read_manager_pid_from_cache(ResId), - gen_statem:call(ManagerPid, Message, {clean_timeout, Timeout}) + case read_cache(ResId) of + not_found -> + {error, not_found}; + {_, #data{pid = ManagerPid}} -> + gen_statem:call(ManagerPid, Message, {clean_timeout, Timeout}) + end catch error:badarg -> {error, not_found}; diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 264556bb5..7840fd474 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -164,7 +164,7 @@ init({Id, Index, Opts}) -> emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), emqx_resource_metrics:inflight_set(Id, Index, 0), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), - {ok, InflightTID} = inflight_new(InfltWinSZ, Id, Index), + InflightTID = inflight_new(InfltWinSZ, Id, Index), HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), St = #{ id => Id, @@ -765,7 +765,7 @@ inflight_new(InfltWinSZ, Id, Index) -> %% we use this counter because we might deal with batches as %% elements. inflight_append(TableId, ?SIZE_REF, 0, Id, Index), - {ok, TableId}. + TableId. inflight_get_first(InflightTID) -> case ets:next(InflightTID, ?MAX_SIZE_REF) of diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 4bea0a1ee..cdec414c9 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -944,7 +944,15 @@ t_create_dry_run_local(_) -> end, lists:seq(1, 10) ), - [] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}). + case [] =:= ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}) of + false -> + %% Sleep to remove flakyness in test case. It take some time for + %% the ETS table to be cleared. + timer:sleep(2000), + [] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}); + true -> + ok + end. create_dry_run_local_succ() -> case whereis(test_resource) of diff --git a/changes/v5.0.14/fix-8583.en.md b/changes/v5.0.14/fix-9730.en.md similarity index 100% rename from changes/v5.0.14/fix-8583.en.md rename to changes/v5.0.14/fix-9730.en.md diff --git a/changes/v5.0.14/fix-8583.zh.md b/changes/v5.0.14/fix-9730.zh.md similarity index 100% rename from changes/v5.0.14/fix-8583.zh.md rename to changes/v5.0.14/fix-9730.zh.md