Merge pull request #13181 from thalesmg/force-stop-connector-r57-20240604
fix(resource manager): force kill process if stuck when stopping/removing
This commit is contained in:
commit
ae0379f974
|
@ -1151,7 +1151,18 @@ post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, undefin
|
|||
post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) when
|
||||
ConfRootKey =:= ?ROOT_KEY_ACTIONS; ConfRootKey =:= ?ROOT_KEY_SOURCES
|
||||
->
|
||||
ok = uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf),
|
||||
case uninstall_bridge_v2(ConfRootKey, BridgeType, BridgeName, OldConf) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, timeout} ->
|
||||
throw(<<
|
||||
"Timed out trying to remove action or source. Please try again and,"
|
||||
" if the error persists, try disabling the connector before retrying."
|
||||
>>);
|
||||
{error, not_found} ->
|
||||
%% Should not happen, unless config is inconsistent.
|
||||
throw(<<"Referenced connector not found">>)
|
||||
end,
|
||||
ok = install_bridge_v2(ConfRootKey, BridgeType, BridgeName, NewConf),
|
||||
Bridges = emqx_utils_maps:deep_put(
|
||||
[BridgeType, BridgeName], emqx:get_config([ConfRootKey]), NewConf
|
||||
|
|
|
@ -879,6 +879,8 @@ handle_disable_enable(ConfRootKey, Id, Enable) ->
|
|||
?SERVICE_UNAVAILABLE(<<"request timeout">>);
|
||||
{error, timeout} ->
|
||||
?SERVICE_UNAVAILABLE(<<"request timeout">>);
|
||||
{error, Reason} when is_binary(Reason) ->
|
||||
?BAD_REQUEST(Reason);
|
||||
{error, Reason} ->
|
||||
?INTERNAL_ERROR(Reason)
|
||||
end
|
||||
|
|
|
@ -85,6 +85,7 @@
|
|||
get_allocated_resources_list/1,
|
||||
forget_allocated_resources/1,
|
||||
deallocate_resource/2,
|
||||
clean_allocated_resources/2,
|
||||
%% Get channel config from resource
|
||||
call_get_channel_config/3,
|
||||
% Call the format query result function
|
||||
|
|
|
@ -63,6 +63,10 @@
|
|||
%% Internal exports.
|
||||
-export([worker_resource_health_check/1, worker_channel_health_check/2]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([stop/2]).
|
||||
-endif.
|
||||
|
||||
% State record
|
||||
-record(data, {
|
||||
id,
|
||||
|
@ -254,7 +258,17 @@ remove(ResId) when is_binary(ResId) ->
|
|||
-spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
|
||||
remove(ResId, ClearMetrics) when is_binary(ResId) ->
|
||||
try
|
||||
safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION)
|
||||
case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of
|
||||
{error, timeout} ->
|
||||
?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
|
||||
action => remove,
|
||||
resource_id => ResId
|
||||
}),
|
||||
force_kill(ResId),
|
||||
ok;
|
||||
Res ->
|
||||
Res
|
||||
end
|
||||
after
|
||||
%% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process
|
||||
%% If the 'remove' call above had succeeded, this is mostly a no-op but still needed to avoid race condition.
|
||||
|
@ -287,9 +301,20 @@ start(ResId, Opts) ->
|
|||
%% @doc Stop the resource
|
||||
-spec stop(resource_id()) -> ok | {error, Reason :: term()}.
|
||||
stop(ResId) ->
|
||||
case safe_call(ResId, stop, ?T_OPERATION) of
|
||||
stop(ResId, ?T_OPERATION).
|
||||
|
||||
-spec stop(resource_id(), timeout()) -> ok | {error, Reason :: term()}.
|
||||
stop(ResId, Timeout) ->
|
||||
case safe_call(ResId, stop, Timeout) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, timeout} ->
|
||||
?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
|
||||
action => stop,
|
||||
resource_id => ResId
|
||||
}),
|
||||
force_kill(ResId),
|
||||
ok;
|
||||
{error, _Reason} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
@ -406,6 +431,25 @@ get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when
|
|||
get_error(_ResId, #{error := Error}) ->
|
||||
Error.
|
||||
|
||||
force_kill(ResId) ->
|
||||
case gproc:whereis_name(?NAME(ResId)) of
|
||||
undefined ->
|
||||
ok;
|
||||
Pid when is_pid(Pid) ->
|
||||
exit(Pid, kill),
|
||||
try_clean_allocated_resources(ResId),
|
||||
ok
|
||||
end.
|
||||
|
||||
try_clean_allocated_resources(ResId) ->
|
||||
case try_read_cache(ResId) of
|
||||
#data{mod = Mod} ->
|
||||
catch emqx_resource:clean_allocated_resources(ResId, Mod),
|
||||
ok;
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
%% Server start/stop callbacks
|
||||
|
||||
%% @doc Function called from the supervisor to actually start the server
|
||||
|
@ -737,7 +781,7 @@ maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) ->
|
|||
Data.
|
||||
|
||||
stop_resource(#data{state = ResState, id = ResId} = Data) ->
|
||||
%% We don't care the return value of the Mod:on_stop/2.
|
||||
%% We don't care about the return value of `Mod:on_stop/2'.
|
||||
%% The callback mod should make sure the resource is stopped after on_stop/2
|
||||
%% is returned.
|
||||
HasAllocatedResources = emqx_resource:has_allocated_resources(ResId),
|
||||
|
|
|
@ -71,6 +71,16 @@ set_callback_mode(Mode) ->
|
|||
on_start(_InstId, #{create_error := true}) ->
|
||||
?tp(connector_demo_start_error, #{}),
|
||||
error("some error");
|
||||
on_start(InstId, #{create_error := {delay, Delay, Agent}} = Opts) ->
|
||||
?tp(connector_demo_start_delay, #{}),
|
||||
case emqx_utils_agent:get_and_update(Agent, fun(St) -> {St, called} end) of
|
||||
not_called ->
|
||||
emqx_resource:allocate_resource(InstId, i_should_be_deallocated, yep),
|
||||
timer:sleep(Delay),
|
||||
on_start(InstId, maps:remove(create_error, Opts));
|
||||
called ->
|
||||
on_start(InstId, maps:remove(create_error, Opts))
|
||||
end;
|
||||
on_start(InstId, #{name := Name} = Opts) ->
|
||||
Register = maps:get(register, Opts, false),
|
||||
StopError = maps:get(stop_error, Opts, false),
|
||||
|
@ -81,6 +91,9 @@ on_start(InstId, #{name := Name} = Opts) ->
|
|||
pid => spawn_counter_process(Name, Register)
|
||||
}}.
|
||||
|
||||
on_stop(_InstId, undefined) ->
|
||||
?tp(connector_demo_free_resources_without_state, #{}),
|
||||
ok;
|
||||
on_stop(_InstId, #{stop_error := true}) ->
|
||||
{error, stop_error};
|
||||
on_stop(InstId, #{pid := Pid}) ->
|
||||
|
|
|
@ -3189,6 +3189,43 @@ t_non_blocking_channel_health_check(_Config) ->
|
|||
),
|
||||
ok.
|
||||
|
||||
%% Test that `stop' forcefully stops the resource manager even if it's stuck on a sync
|
||||
%% call such as `on_start', and that the claimed resources, if any, are freed.
|
||||
t_force_stop(_Config) ->
|
||||
?check_trace(
|
||||
begin
|
||||
{ok, Agent} = emqx_utils_agent:start_link(not_called),
|
||||
{ok, _} =
|
||||
create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{
|
||||
name => test_resource,
|
||||
create_error => {delay, 30_000, Agent}
|
||||
},
|
||||
#{
|
||||
health_check_interval => 100,
|
||||
start_timeout => 100
|
||||
}
|
||||
),
|
||||
?assertEqual(ok, emqx_resource_manager:stop(?ID, _Timeout = 100)),
|
||||
ok
|
||||
end,
|
||||
[
|
||||
log_consistency_prop(),
|
||||
fun(Trace) ->
|
||||
?assertMatch([_ | _], ?of_kind(connector_demo_start_delay, Trace)),
|
||||
?assertMatch(
|
||||
[_ | _], ?of_kind("forcefully_stopping_resource_due_to_timeout", Trace)
|
||||
),
|
||||
?assertMatch([_ | _], ?of_kind(connector_demo_free_resources_without_state, Trace)),
|
||||
ok
|
||||
end
|
||||
]
|
||||
),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%------------------------------------------------------------------------------
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Similar to Elixir's [`Agent'](https://hexdocs.pm/elixir/Agent.html).
|
||||
|
||||
-module(emqx_utils_agent).
|
||||
|
||||
%% API
|
||||
-export([start_link/1, get/1, get_and_update/2]).
|
||||
|
||||
%% `gen_server' API
|
||||
-export([init/1, handle_call/3]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Type declarations
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-type state() :: term().
|
||||
|
||||
-type get_and_update_fn() :: fun((state()) -> {term(), state()}).
|
||||
|
||||
-record(get_and_update, {fn :: get_and_update_fn()}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec start_link(state()) -> gen_server:start_ret().
|
||||
start_link(InitState) ->
|
||||
gen_server:start_link(?MODULE, InitState, []).
|
||||
|
||||
-spec get(gen_server:server_ref()) -> term().
|
||||
get(ServerRef) ->
|
||||
Fn = fun(St) -> {St, St} end,
|
||||
gen_server:call(ServerRef, #get_and_update{fn = Fn}).
|
||||
|
||||
-spec get_and_update(gen_server:server_ref(), get_and_update_fn()) -> term().
|
||||
get_and_update(ServerRef, Fn) ->
|
||||
gen_server:call(ServerRef, #get_and_update{fn = Fn}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% `gen_server' API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
init(InitState) ->
|
||||
{ok, InitState}.
|
||||
|
||||
handle_call(#get_and_update{fn = Fn}, _From, State0) ->
|
||||
{Reply, State} = Fn(State0),
|
||||
{reply, Reply, State}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%------------------------------------------------------------------------------
|
|
@ -0,0 +1,3 @@
|
|||
Now, when attempting to stop a connector, if such operation times out, we forcefully shut down the connector process.
|
||||
|
||||
Error messages when attempting to disable an action/source when its underlying connector is stuck were also improved.
|
Loading…
Reference in New Issue