diff --git a/Makefile b/Makefile index b579a078d..8f214a691 100644 --- a/Makefile +++ b/Makefile @@ -104,7 +104,7 @@ APPS=$(shell $(SCRIPTS)/find-apps.sh) .PHONY: $(APPS:%=%-ct) define gen-app-ct-target -$1-ct: $(REBAR) merge-config +$1-ct: $(REBAR) merge-config clean-test-cluster-config $(eval SUITES := $(shell $(SCRIPTS)/find-suites.sh $1)) ifneq ($(SUITES),) @ENABLE_COVER_COMPILE=1 $(REBAR) ct -c -v \ @@ -127,7 +127,7 @@ endef $(foreach app,$(APPS),$(eval $(call gen-app-prop-target,$(app)))) .PHONY: ct-suite -ct-suite: $(REBAR) merge-config +ct-suite: $(REBAR) merge-config clean-test-cluster-config ifneq ($(TESTCASE),) ifneq ($(GROUP),) $(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE) --case $(TESTCASE) --group $(GROUP) @@ -294,3 +294,7 @@ fmt: $(REBAR) @$(SCRIPTS)/erlfmt -w '{apps,lib-ee}/*/{src,include,test}/**/*.{erl,hrl,app.src}' @$(SCRIPTS)/erlfmt -w 'rebar.config.erl' @mix format + +.PHONY: clean-test-cluster-config +clean-test-cluster-config: + @rm -f apps/emqx_conf/data/configs/cluster.hocon || true diff --git a/apps/emqx/test/emqx_authentication_SUITE.erl b/apps/emqx/test/emqx_authentication_SUITE.erl index 0190ab936..652b634ca 100644 --- a/apps/emqx/test/emqx_authentication_SUITE.erl +++ b/apps/emqx/test/emqx_authentication_SUITE.erl @@ -98,6 +98,8 @@ init_per_suite(Config) -> LogLevel = emqx_logger:get_primary_log_level(), ok = emqx_logger:set_log_level(debug), application:set_env(ekka, strict_mode, true), + emqx_config:erase_all(), + emqx_common_test_helpers:stop_apps([]), emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), [{log_level, LogLevel} | Config]. diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 1fbd6902e..cbbe937aa 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -227,9 +227,13 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> diff_confs(NewConf, OldConf), %% The config update will be failed if any task in `perform_bridge_changes` failed. Result = perform_bridge_changes([ - {fun emqx_bridge_resource:remove/4, Removed}, - {fun emqx_bridge_resource:create/4, Added}, - {fun emqx_bridge_resource:update/4, Updated} + #{action => fun emqx_bridge_resource:remove/4, data => Removed}, + #{ + action => fun emqx_bridge_resource:create/4, + data => Added, + on_exception_fn => fun emqx_bridge_resource:remove/4 + }, + #{action => fun emqx_bridge_resource:update/4, data => Updated} ]), ok = unload_hook(), ok = load_hook(NewConf), @@ -345,7 +349,8 @@ perform_bridge_changes(Tasks) -> perform_bridge_changes([], Result) -> Result; -perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> +perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> + OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), Result = maps:fold( fun ({_Type, _Name}, _Conf, {error, Reason}) -> @@ -359,9 +364,21 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> end; ({Type, Name}, Conf, _) -> ResOpts = emqx_resource:fetch_creation_opts(Conf), - case Action(Type, Name, Conf, ResOpts) of + try Action(Type, Name, Conf, ResOpts) of {error, Reason} -> {error, Reason}; Return -> Return + catch + Kind:Error:Stacktrace -> + ?SLOG(error, #{ + msg => "bridge_config_update_exception", + kind => Kind, + error => Error, + type => Type, + name => Name, + stacktrace => Stacktrace + }), + OnException(Type, Name, Conf, ResOpts), + erlang:raise(Kind, Error, Stacktrace) end end, Result0, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 6c103f73b..64811c91c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index f7958af81..c0de23d94 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -101,6 +101,10 @@ " the connection parameters." ). +%% Allocatable resources +-define(kafka_client_id, kafka_client_id). +-define(kafka_subscriber_id, kafka_subscriber_id). + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -140,6 +144,7 @@ on_start(ResourceId, Config) -> Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}] end, ClientOpts = add_ssl_opts(ClientOpts0, SSL), + ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID), case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of ok -> ?tp( @@ -163,7 +168,21 @@ on_start(ResourceId, Config) -> start_consumer(Config, ResourceId, ClientID). -spec on_stop(resource_id(), state()) -> ok. -on_stop(_ResourceID, State) -> +on_stop(ResourceId, _State = undefined) -> + case emqx_resource:get_allocated_resources(ResourceId) of + #{?kafka_client_id := ClientID, ?kafka_subscriber_id := SubscriberId} -> + stop_subscriber(SubscriberId), + stop_client(ClientID), + ?tp(kafka_consumer_subcriber_and_client_stopped, #{}), + ok; + #{?kafka_client_id := ClientID} -> + stop_client(ClientID), + ?tp(kafka_consumer_just_client_stopped, #{}), + ok; + _ -> + ok + end; +on_stop(_ResourceId, State) -> #{ subscriber_id := SubscriberId, kafka_client_id := ClientID @@ -333,6 +352,9 @@ start_consumer(Config, ResourceId, ClientID) -> %% spawns one worker for each assigned topic-partition %% automatically, so we should not spawn duplicate workers. SubscriberId = make_subscriber_id(BridgeName), + ?tp(kafka_consumer_about_to_start_subscriber, #{}), + ok = emqx_resource:allocate_resource(ResourceId, ?kafka_subscriber_id, SubscriberId), + ?tp(kafka_consumer_subscriber_allocated, #{}), case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of {ok, _ConsumerPid} -> ?tp( @@ -359,7 +381,13 @@ start_consumer(Config, ResourceId, ClientID) -> stop_subscriber(SubscriberId) -> _ = log_when_error( fun() -> - emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId) + try + emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId) + catch + exit:{noproc, _} -> + %% may happen when node is shutting down + ok + end end, #{ msg => "failed_to_delete_kafka_subscriber", @@ -443,16 +471,22 @@ do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> end. are_subscriber_workers_alive(SubscriberId) -> - Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup), - case lists:keyfind(SubscriberId, 1, Children) of - false -> - false; - {_, Pid, _, _} -> - Workers = brod_group_subscriber_v2:get_workers(Pid), - %% we can't enforce the number of partitions on a single - %% node, as the group might be spread across an emqx - %% cluster. - lists:all(fun is_process_alive/1, maps:values(Workers)) + try + Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup), + case lists:keyfind(SubscriberId, 1, Children) of + false -> + false; + {_, Pid, _, _} -> + Workers = brod_group_subscriber_v2:get_workers(Pid), + %% we can't enforce the number of partitions on a single + %% node, as the group might be spread across an emqx + %% cluster. + lists:all(fun is_process_alive/1, maps:values(Workers)) + end + catch + exit:{shutdown, _} -> + %% may happen if node is shutting down + false end. log_when_error(Fun, Log) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 7bee2c70d..8b8337b09 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -23,6 +23,11 @@ -include_lib("emqx/include/logger.hrl"). +%% Allocatable resources +-define(kafka_resource_id, kafka_resource_id). +-define(kafka_client_id, kafka_client_id). +-define(kafka_producers, kafka_producers). + %% TODO: rename this to `kafka_producer' after alias support is added %% to hocon; keeping this as just `kafka' for backwards compatibility. -define(BRIDGE_TYPE, kafka). @@ -46,9 +51,11 @@ on_start(InstId, Config) -> } = Config, BridgeType = ?BRIDGE_TYPE, ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), + ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId), _ = maybe_install_wolff_telemetry_handlers(ResourceId), Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), + ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), ClientConfig = #{ min_metadata_refresh_interval => MinMetaRefreshInterval, connect_timeout => ConnTimeout, @@ -86,6 +93,7 @@ on_start(InstId, Config) -> WolffProducerConfig = producers_config(BridgeName, ClientId, KafkaConfig, IsDryRun), case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of {ok, Producers} -> + ok = emqx_resource:allocate_resource(InstId, ?kafka_producers, Producers), {ok, #{ message_template => compile_message_template(MessageTemplate), client_id => ClientId, @@ -120,28 +128,63 @@ on_start(InstId, Config) -> ) end. -on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) -> - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, +on_stop(InstanceId, _State) -> + case emqx_resource:get_allocated_resources(InstanceId) of #{ - msg => "failed_to_delete_kafka_producer", - client_id => ClientID - } - ), - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_client(ClientID) end, - #{ - msg => "failed_to_delete_kafka_client", - client_id => ClientID - } - ), - with_log_at_error( - fun() -> uninstall_telemetry_handlers(ResourceID) end, - #{ - msg => "failed_to_uninstall_telemetry_handlers", - client_id => ClientID - } - ). + ?kafka_client_id := ClientId, + ?kafka_producers := Producers, + ?kafka_resource_id := ResourceId + } -> + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, + #{ + msg => "failed_to_delete_kafka_producer", + client_id => ClientId + } + ), + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientId + } + ), + _ = with_log_at_error( + fun() -> uninstall_telemetry_handlers(ResourceId) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + resource_id => ResourceId + } + ), + ok; + #{?kafka_client_id := ClientId, ?kafka_resource_id := ResourceId} -> + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientId + } + ), + _ = with_log_at_error( + fun() -> uninstall_telemetry_handlers(ResourceId) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + resource_id => ResourceId + } + ), + ok; + #{?kafka_resource_id := ResourceId} -> + _ = with_log_at_error( + fun() -> uninstall_telemetry_handlers(ResourceId) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + resource_id => ResourceId + } + ), + ok; + _ -> + ok + end. on_query( _InstId, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 6d18bbdf2..7e7acbcd5 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -59,7 +59,9 @@ only_once_tests() -> t_cluster_group, t_node_joins_existing_cluster, t_cluster_node_down, - t_multiple_topic_mappings + t_multiple_topic_mappings, + t_resource_manager_crash_after_subscriber_started, + t_resource_manager_crash_before_subscriber_started ]. init_per_suite(Config) -> @@ -333,6 +335,7 @@ init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config0) -> ct:timetrap(timer:seconds(60)), delete_all_bridges(), + emqx_config:delete_override_conf_files(), KafkaTopic = << (atom_to_binary(TestCase))/binary, @@ -1117,6 +1120,24 @@ stop_async_publisher(Pid) -> end, ok. +kill_resource_managers() -> + ct:pal("gonna kill resource managers"), + lists:foreach( + fun({_, Pid, _, _}) -> + ct:pal("terminating resource manager ~p", [Pid]), + Ref = monitor(process, Pid), + exit(Pid, kill), + receive + {'DOWN', Ref, process, Pid, killed} -> + ok + after 500 -> + ct:fail("pid ~p didn't die!", [Pid]) + end, + ok + end, + supervisor:which_children(emqx_resource_manager_sup) + ). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -2019,3 +2040,118 @@ t_begin_offset_earliest(Config) -> end ), ok. + +t_resource_manager_crash_after_subscriber_started(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := kafka_consumer_subscriber_allocated}, + #{?snk_kind := will_kill_resource_manager} + ), + ?force_ordering( + #{?snk_kind := resource_manager_killed}, + #{?snk_kind := kafka_consumer_subscriber_started} + ), + spawn_link(fun() -> + ?tp(will_kill_resource_manager, #{}), + kill_resource_managers(), + ?tp(resource_manager_killed, #{}), + ok + end), + + %% even if the resource manager is dead, we can still + %% clear the allocated resources. + + %% We avoid asserting only the `config_update_crashed' + %% error here because there's a race condition (just a + %% problem for the test assertion below) in which the + %% `emqx_resource_manager:create/5' call returns a failure + %% (not checked) and then `lookup' in that module is + %% delayed enough so that the manager supervisor has time + %% to restart the manager process and for the latter to + %% startup successfully. Occurs frequently in CI... + + {Res, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := kafka_consumer_subcriber_and_client_stopped}, + 10_000 + ), + case Res of + {error, {config_update_crashed, {killed, _}}} -> + ok; + {ok, _} -> + %% the new manager may have had time to startup + %% before the resource status cache is read... + ok; + _ -> + ct:fail("unexpected result: ~p", [Res]) + end, + ?assertMatch({ok, _}, delete_bridge(Config)), + ?retry( + _Sleep = 50, + _Attempts = 50, + ?assertEqual([], supervisor:which_children(emqx_bridge_kafka_consumer_sup)) + ), + ok + end, + [] + ), + ok. + +t_resource_manager_crash_before_subscriber_started(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := kafka_consumer_client_started}, + #{?snk_kind := will_kill_resource_manager} + ), + ?force_ordering( + #{?snk_kind := resource_manager_killed}, + #{?snk_kind := kafka_consumer_about_to_start_subscriber} + ), + spawn_link(fun() -> + ?tp(will_kill_resource_manager, #{}), + kill_resource_managers(), + ?tp(resource_manager_killed, #{}), + ok + end), + + %% even if the resource manager is dead, we can still + %% clear the allocated resources. + + %% We avoid asserting only the `config_update_crashed' + %% error here because there's a race condition (just a + %% problem for the test assertion below) in which the + %% `emqx_resource_manager:create/5' call returns a failure + %% (not checked) and then `lookup' in that module is + %% delayed enough so that the manager supervisor has time + %% to restart the manager process and for the latter to + %% startup successfully. Occurs frequently in CI... + {Res, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := kafka_consumer_just_client_stopped}, + 10_000 + ), + case Res of + {error, {config_update_crashed, {killed, _}}} -> + ok; + {ok, _} -> + %% the new manager may have had time to startup + %% before the resource status cache is read... + ok; + _ -> + ct:fail("unexpected result: ~p", [Res]) + end, + ?assertMatch({ok, _}, delete_bridge(Config)), + ?retry( + _Sleep = 50, + _Attempts = 50, + ?assertEqual([], supervisor:which_children(emqx_bridge_kafka_consumer_sup)) + ), + ok + end, + [] + ), + ok. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index d1a29fffe..ad41c9904 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -446,6 +446,8 @@ t_failed_creation_then_fix(Config) -> ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), %% TODO: refactor those into init/end per testcase ok = ?PRODUCER:on_stop(ResourceId, State), + ?assertEqual([], supervisor:which_children(wolff_client_sup)), + ?assertEqual([], supervisor:which_children(wolff_producers_sup)), ok = emqx_bridge_resource:remove(BridgeId), delete_all_bridges(), ok. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 5906cc57a..b8157d4fc 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -60,6 +60,10 @@ sync_timeout := emqx_schema:duration_ms() }. +%% Allocatable resources +-define(pulsar_client_id, pulsar_client_id). +-define(pulsar_producers, pulsar_producers). + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -81,7 +85,7 @@ on_start(InstanceId, Config) -> } = Config, Servers = format_servers(Servers0), ClientId = make_client_id(InstanceId, BridgeName), - ok = emqx_resource:allocate_resource(InstanceId, pulsar_client_id, ClientId), + ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_client_id, ClientId), SSLOpts = emqx_tls_lib:to_client_opts(SSL), ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)), ClientOpts = #{ @@ -119,7 +123,7 @@ on_start(InstanceId, Config) -> -spec on_stop(resource_id(), state()) -> ok. on_stop(InstanceId, _State) -> case emqx_resource:get_allocated_resources(InstanceId) of - #{pulsar_client_id := ClientId, pulsar_producers := Producers} -> + #{?pulsar_client_id := ClientId, ?pulsar_producers := Producers} -> stop_producers(ClientId, Producers), stop_client(ClientId), ?tp(pulsar_bridge_stopped, #{ @@ -128,7 +132,7 @@ on_stop(InstanceId, _State) -> pulsar_producers => Producers }), ok; - #{pulsar_client_id := ClientId} -> + #{?pulsar_client_id := ClientId} -> stop_client(ClientId), ?tp(pulsar_bridge_stopped, #{ instance_id => InstanceId, @@ -340,7 +344,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> ?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}), try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of {ok, Producers} -> - ok = emqx_resource:allocate_resource(InstanceId, pulsar_producers, Producers), + ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_producers, Producers), ?tp(pulsar_producer_producers_allocated, #{}), State = #{ pulsar_client_id => ClientId, diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 3605baaab..a5c04160c 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -285,6 +285,11 @@ create_bridge(Config, Overrides) -> PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides), emqx_bridge:create(Type, Name, PulsarConfig). +delete_bridge(Config) -> + Type = ?BRIDGE_TYPE_BIN, + Name = ?config(pulsar_name, Config), + emqx_bridge:remove(Type, Name). + create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). @@ -541,8 +546,14 @@ kill_resource_managers() -> lists:foreach( fun({_, Pid, _, _}) -> ct:pal("terminating resource manager ~p", [Pid]), - %% sys:terminate(Pid, stop), + Ref = monitor(process, Pid), exit(Pid, kill), + receive + {'DOWN', Ref, process, Pid, killed} -> + ok + after 500 -> + ct:fail("pid ~p didn't die!", [Pid]) + end, ok end, supervisor:which_children(emqx_resource_manager_sup) @@ -1002,6 +1013,8 @@ t_resource_manager_crash_after_producers_started(Config) -> Producers =/= undefined, 10_000 ), + ?assertMatch({ok, _}, delete_bridge(Config)), + ?assertEqual([], get_pulsar_producers()), ok end, [] @@ -1033,6 +1046,8 @@ t_resource_manager_crash_before_producers_started(Config) -> #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, 10_000 ), + ?assertMatch({ok, _}, delete_bridge(Config)), + ?assertEqual([], get_pulsar_producers()), ok end, [] diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 10f1de6c4..840c6cfec 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -533,7 +533,7 @@ clean_allocated_resources(ResourceId, ResourceMod) -> true -> %% The resource entries in the ETS table are erased inside %% `call_stop' if the call is successful. - ok = emqx_resource:call_stop(ResourceId, ResourceMod, _ResourceState = undefined), + ok = call_stop(ResourceId, ResourceMod, _ResourceState = undefined), ok; false -> ok diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index fa8d53903..d7c2a4bd3 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -192,14 +192,13 @@ remove(ResId) when is_binary(ResId) -> %% @doc Stops a running resource_manager and optionally clears the metrics for the resource -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. remove(ResId, ClearMetrics) when is_binary(ResId) -> - ResourceManagerPid = gproc:whereis_name(?NAME(ResId)), try safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) after %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process %% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition. %% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long. - emqx_resource_manager_sup:delete_child(ResourceManagerPid) + emqx_resource_manager_sup:delete_child(ResId) end. %% @doc Stops and then starts an instance that was already running diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 9e86e6363..732d5e513 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -26,12 +26,12 @@ -export([init/1]). ensure_child(ResId, Group, ResourceType, Config, Opts) -> - _ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]), + _ = supervisor:start_child(?MODULE, child_spec(ResId, Group, ResourceType, Config, Opts)), ok. -delete_child(Pid) -> - _ = supervisor:terminate_child(?MODULE, Pid), - _ = supervisor:delete_child(?MODULE, Pid), +delete_child(ResId) -> + _ = supervisor:terminate_child(?MODULE, ResId), + _ = supervisor:delete_child(?MODULE, ResId), ok. start_link() -> @@ -44,18 +44,19 @@ init([]) -> public, {read_concurrency, true} ]), - ChildSpecs = [ - #{ - id => emqx_resource_manager, - start => {emqx_resource_manager, start_link, []}, - restart => transient, - %% never force kill a resource manager. - %% becasue otherwise it may lead to release leak, - %% resource_manager's terminate callback calls resource on_stop - shutdown => infinity, - type => worker, - modules => [emqx_resource_manager] - } - ], - SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10}, + ChildSpecs = [], + SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, {ok, {SupFlags, ChildSpecs}}. + +child_spec(ResId, Group, ResourceType, Config, Opts) -> + #{ + id => ResId, + start => {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]}, + restart => transient, + %% never force kill a resource manager. + %% becasue otherwise it may lead to release leak, + %% resource_manager's terminate callback calls resource on_stop + shutdown => infinity, + type => worker, + modules => [emqx_resource_manager] + }. diff --git a/changes/ee/feat-10778.en.md b/changes/ee/feat-10778.en.md index 3084d2959..2f2131a8a 100644 --- a/changes/ee/feat-10778.en.md +++ b/changes/ee/feat-10778.en.md @@ -1 +1 @@ -Refactored Pulsar Producer bridge to avoid leaking resources during crashes. +Refactored Pulsar Producer bridge to avoid leaking resources during crashes at creation. diff --git a/changes/ee/feat-10813.en.md b/changes/ee/feat-10813.en.md new file mode 100644 index 000000000..b36039f4c --- /dev/null +++ b/changes/ee/feat-10813.en.md @@ -0,0 +1 @@ +Refactored Kafka Producer and Consumer bridges to avoid leaking resources during crashes at creation.