From 0d8ffc0d59b8abca7f0b5330ecd0eacf2da5ac5b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 19 May 2023 18:28:22 +0200 Subject: [PATCH] fix(resource-manager): ensure no false creation Update is implemented as remove + create. If a dleete call is made while the create is in progress the remove call is likely to timeout too. This causes the follwing creation to falsely succeed, because there is alreay a running child under the supervisor. As a result, the resource is permanently removed after resource_manager eventually handles the remove call. --- .../src/emqx_resource_manager.erl | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index cd858bda3..9eb08e80d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -53,7 +53,18 @@ % State record -record(data, { - id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid + id, + group, + mod, + callback_mode, + query_mode, + config, + opts, + status, + state, + error, + pid, + extra }). -type data() :: #data{}. @@ -181,7 +192,15 @@ remove(ResId) when is_binary(ResId) -> %% @doc Stops a running resource_manager and optionally clears the metrics for the resource -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. remove(ResId, ClearMetrics) when is_binary(ResId) -> - safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION). + ResourceManagerPid = gproc:whereis_name(?NAME(ResId)), + try + safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) + after + %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process + %% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition. + %% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long. + emqx_resource_manager_sup:delete_child(ResourceManagerPid) + end. %% @doc Stops and then starts an instance that was already running -spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. @@ -439,8 +458,10 @@ health_check_actions(Data) -> [{state_timeout, health_check_interval(Data#data.opts), health_check}]. handle_remove_event(From, ClearMetrics, Data) -> - _ = stop_resource(Data), + %% stop the buffer workers first, brutal_kill, so it should be fast ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), + %% no stop the resource, this can be slow + _ = stop_resource(Data), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok