diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 8340bf589..9bd6b1390 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -85,6 +85,7 @@ get_allocated_resources_list/1, forget_allocated_resources/1, deallocate_resource/2, + clean_allocated_resources/2, %% Get channel config from resource call_get_channel_config/3, % Call the format query result function diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index a09a6e9f8..4763094d0 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -63,6 +63,10 @@ %% Internal exports. -export([worker_resource_health_check/1, worker_channel_health_check/2]). +-ifdef(TEST). +-export([stop/2]). +-endif. + % State record -record(data, { id, @@ -254,7 +258,17 @@ remove(ResId) when is_binary(ResId) -> -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. remove(ResId, ClearMetrics) when is_binary(ResId) -> try - safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) + case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of + {error, timeout} -> + ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{ + action => remove, + resource_id => ResId + }), + force_kill(ResId), + ok; + Res -> + Res + end after %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process %% If the 'remove' call above had succeeded, this is mostly a no-op but still needed to avoid race condition. @@ -287,9 +301,20 @@ start(ResId, Opts) -> %% @doc Stop the resource -spec stop(resource_id()) -> ok | {error, Reason :: term()}. stop(ResId) -> - case safe_call(ResId, stop, ?T_OPERATION) of + stop(ResId, ?T_OPERATION). + +-spec stop(resource_id(), timeout()) -> ok | {error, Reason :: term()}. +stop(ResId, Timeout) -> + case safe_call(ResId, stop, Timeout) of ok -> ok; + {error, timeout} -> + ?tp(error, "forcefully_stopping_resource_due_to_timeout", #{ + action => stop, + resource_id => ResId + }), + force_kill(ResId), + ok; {error, _Reason} = Error -> Error end. @@ -406,6 +431,25 @@ get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when get_error(_ResId, #{error := Error}) -> Error. +force_kill(ResId) -> + case gproc:whereis_name(?NAME(ResId)) of + undefined -> + ok; + Pid when is_pid(Pid) -> + exit(Pid, kill), + try_clean_allocated_resources(ResId), + ok + end. + +try_clean_allocated_resources(ResId) -> + case try_read_cache(ResId) of + #data{mod = Mod} -> + catch emqx_resource:clean_allocated_resources(ResId, Mod), + ok; + _ -> + ok + end. + %% Server start/stop callbacks %% @doc Function called from the supervisor to actually start the server @@ -737,7 +781,7 @@ maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) -> Data. stop_resource(#data{state = ResState, id = ResId} = Data) -> - %% We don't care the return value of the Mod:on_stop/2. + %% We don't care about the return value of `Mod:on_stop/2'. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. HasAllocatedResources = emqx_resource:has_allocated_resources(ResId), diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 754727e8c..0fc11cc66 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -71,6 +71,16 @@ set_callback_mode(Mode) -> on_start(_InstId, #{create_error := true}) -> ?tp(connector_demo_start_error, #{}), error("some error"); +on_start(InstId, #{create_error := {delay, Delay, Agent}} = Opts) -> + ?tp(connector_demo_start_delay, #{}), + case emqx_utils_agent:get_and_update(Agent, fun(St) -> {St, called} end) of + not_called -> + emqx_resource:allocate_resource(InstId, i_should_be_deallocated, yep), + timer:sleep(Delay), + on_start(InstId, maps:remove(create_error, Opts)); + called -> + on_start(InstId, maps:remove(create_error, Opts)) + end; on_start(InstId, #{name := Name} = Opts) -> Register = maps:get(register, Opts, false), StopError = maps:get(stop_error, Opts, false), @@ -81,6 +91,9 @@ on_start(InstId, #{name := Name} = Opts) -> pid => spawn_counter_process(Name, Register) }}. +on_stop(_InstId, undefined) -> + ?tp(connector_demo_free_resources_without_state, #{}), + ok; on_stop(_InstId, #{stop_error := true}) -> {error, stop_error}; on_stop(InstId, #{pid := Pid}) -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 05a2f711d..764c65e6f 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -3189,6 +3189,43 @@ t_non_blocking_channel_health_check(_Config) -> ), ok. +%% Test that `stop' forcefully stops the resource manager even if it's stuck on a sync +%% call such as `on_start', and that the claimed resources, if any, are freed. +t_force_stop(_Config) -> + ?check_trace( + begin + {ok, Agent} = emqx_utils_agent:start_link(not_called), + {ok, _} = + create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{ + name => test_resource, + create_error => {delay, 30_000, Agent} + }, + #{ + health_check_interval => 100, + start_timeout => 100 + } + ), + ?assertEqual(ok, emqx_resource_manager:stop(?ID, _Timeout = 100)), + ok + end, + [ + log_consistency_prop(), + fun(Trace) -> + ?assertMatch([_ | _], ?of_kind(connector_demo_start_delay, Trace)), + ?assertMatch( + [_ | _], ?of_kind("forcefully_stopping_resource_due_to_timeout", Trace) + ), + ?assertMatch([_ | _], ?of_kind(connector_demo_free_resources_without_state, Trace)), + ok + end + ] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/apps/emqx_utils/test/emqx_utils_agent.erl b/apps/emqx_utils/test/emqx_utils_agent.erl new file mode 100644 index 000000000..4280a491f --- /dev/null +++ b/apps/emqx_utils/test/emqx_utils_agent.erl @@ -0,0 +1,66 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Similar to Elixir's [`Agent'](https://hexdocs.pm/elixir/Agent.html). + +-module(emqx_utils_agent). + +%% API +-export([start_link/1, get/1, get_and_update/2]). + +%% `gen_server' API +-export([init/1, handle_call/3]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +-type state() :: term(). + +-type get_and_update_fn() :: fun((state()) -> {term(), state()}). + +-record(get_and_update, {fn :: get_and_update_fn()}). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec start_link(state()) -> gen_server:start_ret(). +start_link(InitState) -> + gen_server:start_link(?MODULE, InitState, []). + +-spec get(gen_server:server_ref()) -> term(). +get(ServerRef) -> + Fn = fun(St) -> {St, St} end, + gen_server:call(ServerRef, #get_and_update{fn = Fn}). + +-spec get_and_update(gen_server:server_ref(), get_and_update_fn()) -> term(). +get_and_update(ServerRef, Fn) -> + gen_server:call(ServerRef, #get_and_update{fn = Fn}). + +%%------------------------------------------------------------------------------ +%% `gen_server' API +%%------------------------------------------------------------------------------ + +init(InitState) -> + {ok, InitState}. + +handle_call(#get_and_update{fn = Fn}, _From, State0) -> + {Reply, State} = Fn(State0), + {reply, Reply, State}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------