diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 50a25620c..1a3781a65 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -59,7 +59,7 @@ ]). % Server --export([start_link/5]). +-export([start_link/5, where/1]). % Behaviour -export([init/1, callback_mode/0, handle_event/4, terminate/3]). @@ -156,6 +156,9 @@ %% API %%------------------------------------------------------------------------------ +where(ResId) -> + gproc:where(?NAME(ResId)). + %% @doc Called from emqx_resource when starting a resource instance. %% %% Triggers the emqx_resource_manager_sup supervisor to actually create @@ -277,17 +280,7 @@ remove(ResId) when is_binary(ResId) -> -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. remove(ResId, ClearMetrics) when is_binary(ResId) -> try - case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of - {error, timeout} -> - ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{ - action => remove, - resource_id => ResId - }), - force_kill(ResId), - ok; - Res -> - Res - end + do_remove(ResId, ClearMetrics) after %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process %% If the 'remove' call above had succeeded, this is mostly a no-op but still needed to avoid race condition. @@ -295,6 +288,31 @@ remove(ResId, ClearMetrics) when is_binary(ResId) -> emqx_resource_manager_sup:delete_child(ResId) end. +do_remove(ResId, ClearMetrics) -> + case gproc:whereis_name(?NAME(ResId)) of + undefined -> + ok; + Pid when is_pid(Pid) -> + MRef = monitor(process, Pid), + case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of + {error, timeout} -> + ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{ + action => remove, + resource_id => ResId + }), + force_kill(ResId, MRef), + ok; + ok -> + receive + {'DOWN', MRef, process, Pid, _} -> + ok + end, + ok; + Res -> + Res + end + end. + %% @doc Stops and then starts an instance that was already running -spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. restart(ResId, Opts) when is_binary(ResId) -> @@ -332,7 +350,7 @@ stop(ResId, Timeout) -> action => stop, resource_id => ResId }), - force_kill(ResId), + force_kill(ResId, _MRef = undefined), ok; {error, _Reason} = Error -> Error @@ -469,12 +487,21 @@ get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when get_error(_ResId, #{error := Error}) -> Error. -force_kill(ResId) -> +force_kill(ResId, MRef0) -> case gproc:whereis_name(?NAME(ResId)) of undefined -> ok; Pid when is_pid(Pid) -> + MRef = + case MRef0 of + undefined -> monitor(process, Pid); + _ -> MRef0 + end, exit(Pid, kill), + receive + {'DOWN', MRef, process, Pid, _} -> + ok + end, try_clean_allocated_resources(ResId), ok end.