diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 8b61b2ee1..7e7acbcd5 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -2058,14 +2058,41 @@ t_resource_manager_crash_after_subscriber_started(Config) -> ?tp(resource_manager_killed, #{}), ok end), + %% even if the resource manager is dead, we can still %% clear the allocated resources. - {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + + %% We avoid asserting only the `config_update_crashed' + %% error here because there's a race condition (just a + %% problem for the test assertion below) in which the + %% `emqx_resource_manager:create/5' call returns a failure + %% (not checked) and then `lookup' in that module is + %% delayed enough so that the manager supervisor has time + %% to restart the manager process and for the latter to + %% startup successfully. Occurs frequently in CI... + + {Res, {ok, _}} = ?wait_async_action( create_bridge(Config), #{?snk_kind := kafka_consumer_subcriber_and_client_stopped}, 10_000 ), + case Res of + {error, {config_update_crashed, {killed, _}}} -> + ok; + {ok, _} -> + %% the new manager may have had time to startup + %% before the resource status cache is read... + ok; + _ -> + ct:fail("unexpected result: ~p", [Res]) + end, + ?assertMatch({ok, _}, delete_bridge(Config)), + ?retry( + _Sleep = 50, + _Attempts = 50, + ?assertEqual([], supervisor:which_children(emqx_bridge_kafka_consumer_sup)) + ), ok end, [] @@ -2089,14 +2116,40 @@ t_resource_manager_crash_before_subscriber_started(Config) -> ?tp(resource_manager_killed, #{}), ok end), + %% even if the resource manager is dead, we can still %% clear the allocated resources. - {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + + %% We avoid asserting only the `config_update_crashed' + %% error here because there's a race condition (just a + %% problem for the test assertion below) in which the + %% `emqx_resource_manager:create/5' call returns a failure + %% (not checked) and then `lookup' in that module is + %% delayed enough so that the manager supervisor has time + %% to restart the manager process and for the latter to + %% startup successfully. Occurs frequently in CI... + {Res, {ok, _}} = ?wait_async_action( create_bridge(Config), #{?snk_kind := kafka_consumer_just_client_stopped}, 10_000 ), + case Res of + {error, {config_update_crashed, {killed, _}}} -> + ok; + {ok, _} -> + %% the new manager may have had time to startup + %% before the resource status cache is read... + ok; + _ -> + ct:fail("unexpected result: ~p", [Res]) + end, + ?assertMatch({ok, _}, delete_bridge(Config)), + ?retry( + _Sleep = 50, + _Attempts = 50, + ?assertEqual([], supervisor:which_children(emqx_bridge_kafka_consumer_sup)) + ), ok end, [] diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 4dc318205..a5c04160c 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -285,6 +285,11 @@ create_bridge(Config, Overrides) -> PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides), emqx_bridge:create(Type, Name, PulsarConfig). +delete_bridge(Config) -> + Type = ?BRIDGE_TYPE_BIN, + Name = ?config(pulsar_name, Config), + emqx_bridge:remove(Type, Name). + create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). @@ -1008,6 +1013,8 @@ t_resource_manager_crash_after_producers_started(Config) -> Producers =/= undefined, 10_000 ), + ?assertMatch({ok, _}, delete_bridge(Config)), + ?assertEqual([], get_pulsar_producers()), ok end, [] @@ -1039,6 +1046,8 @@ t_resource_manager_crash_before_producers_started(Config) -> #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, 10_000 ), + ?assertMatch({ok, _}, delete_bridge(Config)), + ?assertEqual([], get_pulsar_producers()), ok end, [] diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 7a54bfa97..388251c0b 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -192,14 +192,13 @@ remove(ResId) when is_binary(ResId) -> %% @doc Stops a running resource_manager and optionally clears the metrics for the resource -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. remove(ResId, ClearMetrics) when is_binary(ResId) -> - ResourceManagerPid = gproc:whereis_name(?NAME(ResId)), try safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) after %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process %% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition. %% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long. - emqx_resource_manager_sup:delete_child(ResourceManagerPid) + emqx_resource_manager_sup:delete_child(ResId) end. %% @doc Stops and then starts an instance that was already running diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 9e86e6363..732d5e513 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -26,12 +26,12 @@ -export([init/1]). ensure_child(ResId, Group, ResourceType, Config, Opts) -> - _ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]), + _ = supervisor:start_child(?MODULE, child_spec(ResId, Group, ResourceType, Config, Opts)), ok. -delete_child(Pid) -> - _ = supervisor:terminate_child(?MODULE, Pid), - _ = supervisor:delete_child(?MODULE, Pid), +delete_child(ResId) -> + _ = supervisor:terminate_child(?MODULE, ResId), + _ = supervisor:delete_child(?MODULE, ResId), ok. start_link() -> @@ -44,18 +44,19 @@ init([]) -> public, {read_concurrency, true} ]), - ChildSpecs = [ - #{ - id => emqx_resource_manager, - start => {emqx_resource_manager, start_link, []}, - restart => transient, - %% never force kill a resource manager. - %% becasue otherwise it may lead to release leak, - %% resource_manager's terminate callback calls resource on_stop - shutdown => infinity, - type => worker, - modules => [emqx_resource_manager] - } - ], - SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10}, + ChildSpecs = [], + SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, {ok, {SupFlags, ChildSpecs}}. + +child_spec(ResId, Group, ResourceType, Config, Opts) -> + #{ + id => ResId, + start => {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]}, + restart => transient, + %% never force kill a resource manager. + %% becasue otherwise it may lead to release leak, + %% resource_manager's terminate callback calls resource on_stop + shutdown => infinity, + type => worker, + modules => [emqx_resource_manager] + }.