From 297b8b380dc9c1587fe070e2b57378b57c0a1bcb Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 16:25:22 -0300 Subject: [PATCH 01/10] docs: improve changelog --- changes/ee/feat-10778.en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ee/feat-10778.en.md b/changes/ee/feat-10778.en.md index 3084d2959..2f2131a8a 100644 --- a/changes/ee/feat-10778.en.md +++ b/changes/ee/feat-10778.en.md @@ -1 +1 @@ -Refactored Pulsar Producer bridge to avoid leaking resources during crashes. +Refactored Pulsar Producer bridge to avoid leaking resources during crashes at creation. From 34be8b3a00e1ba8ca4d5933c8ae33b4b7ada47ba Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 25 May 2023 15:05:18 -0300 Subject: [PATCH 02/10] ci: ensure `cluster.hocon` is removed before running tests --- Makefile | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 785458601..200fa7c71 100644 --- a/Makefile +++ b/Makefile @@ -104,7 +104,7 @@ APPS=$(shell $(SCRIPTS)/find-apps.sh) .PHONY: $(APPS:%=%-ct) define gen-app-ct-target -$1-ct: $(REBAR) merge-config +$1-ct: $(REBAR) merge-config clean-test-cluster-config $(eval SUITES := $(shell $(SCRIPTS)/find-suites.sh $1)) ifneq ($(SUITES),) @ENABLE_COVER_COMPILE=1 $(REBAR) ct -c -v \ @@ -127,7 +127,7 @@ endef $(foreach app,$(APPS),$(eval $(call gen-app-prop-target,$(app)))) .PHONY: ct-suite -ct-suite: $(REBAR) merge-config +ct-suite: $(REBAR) merge-config clean-test-cluster-config ifneq ($(TESTCASE),) ifneq ($(GROUP),) $(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE) --case $(TESTCASE) --group $(GROUP) @@ -294,3 +294,7 @@ fmt: $(REBAR) @$(SCRIPTS)/erlfmt -w '{apps,lib-ee}/*/{src,include,test}/**/*.{erl,hrl,app.src}' @$(SCRIPTS)/erlfmt -w 'rebar.config.erl' @mix format + +.PHONY: clean-test-cluster-config +clean-test-cluster-config: + @rm apps/emqx_conf/data/configs/cluster.hocon || true From 42b37690c7666a3def04659b687aa5b6da23cffa Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 16:27:31 -0300 Subject: [PATCH 03/10] refactor(pulsar): use macros for allocatable resources --- .../src/emqx_bridge_pulsar_impl_producer.erl | 12 ++++++++---- .../test/emqx_bridge_pulsar_impl_producer_SUITE.erl | 8 +++++++- apps/emqx_resource/src/emqx_resource.erl | 2 +- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 5906cc57a..b8157d4fc 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -60,6 +60,10 @@ sync_timeout := emqx_schema:duration_ms() }. +%% Allocatable resources +-define(pulsar_client_id, pulsar_client_id). +-define(pulsar_producers, pulsar_producers). + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -81,7 +85,7 @@ on_start(InstanceId, Config) -> } = Config, Servers = format_servers(Servers0), ClientId = make_client_id(InstanceId, BridgeName), - ok = emqx_resource:allocate_resource(InstanceId, pulsar_client_id, ClientId), + ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_client_id, ClientId), SSLOpts = emqx_tls_lib:to_client_opts(SSL), ConnectTimeout = maps:get(connect_timeout, Config, timer:seconds(5)), ClientOpts = #{ @@ -119,7 +123,7 @@ on_start(InstanceId, Config) -> -spec on_stop(resource_id(), state()) -> ok. on_stop(InstanceId, _State) -> case emqx_resource:get_allocated_resources(InstanceId) of - #{pulsar_client_id := ClientId, pulsar_producers := Producers} -> + #{?pulsar_client_id := ClientId, ?pulsar_producers := Producers} -> stop_producers(ClientId, Producers), stop_client(ClientId), ?tp(pulsar_bridge_stopped, #{ @@ -128,7 +132,7 @@ on_stop(InstanceId, _State) -> pulsar_producers => Producers }), ok; - #{pulsar_client_id := ClientId} -> + #{?pulsar_client_id := ClientId} -> stop_client(ClientId), ?tp(pulsar_bridge_stopped, #{ instance_id => InstanceId, @@ -340,7 +344,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> ?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}), try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of {ok, Producers} -> - ok = emqx_resource:allocate_resource(InstanceId, pulsar_producers, Producers), + ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_producers, Producers), ?tp(pulsar_producer_producers_allocated, #{}), State = #{ pulsar_client_id => ClientId, diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 3605baaab..4dc318205 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -541,8 +541,14 @@ kill_resource_managers() -> lists:foreach( fun({_, Pid, _, _}) -> ct:pal("terminating resource manager ~p", [Pid]), - %% sys:terminate(Pid, stop), + Ref = monitor(process, Pid), exit(Pid, kill), + receive + {'DOWN', Ref, process, Pid, killed} -> + ok + after 500 -> + ct:fail("pid ~p didn't die!", [Pid]) + end, ok end, supervisor:which_children(emqx_resource_manager_sup) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 10f1de6c4..840c6cfec 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -533,7 +533,7 @@ clean_allocated_resources(ResourceId, ResourceMod) -> true -> %% The resource entries in the ETS table are erased inside %% `call_stop' if the call is successful. - ok = emqx_resource:call_stop(ResourceId, ResourceMod, _ResourceState = undefined), + ok = call_stop(ResourceId, ResourceMod, _ResourceState = undefined), ok; false -> ok From 5df7314255d93ef497803783b36200e7da638ac2 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 16:28:08 -0300 Subject: [PATCH 04/10] feat(kafka): ensure allocated resources are removed on failures --- .../src/emqx_bridge_kafka.app.src | 2 +- .../src/emqx_bridge_kafka_impl_consumer.erl | 24 +++++- .../src/emqx_bridge_kafka_impl_producer.erl | 85 ++++++++++++++----- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 84 +++++++++++++++++- .../emqx_bridge_kafka_impl_producer_SUITE.erl | 2 + changes/ee/feat-10813.en.md | 1 + 6 files changed, 174 insertions(+), 24 deletions(-) create mode 100644 changes/ee/feat-10813.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 6c103f73b..64811c91c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index f7958af81..225f90c18 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -101,6 +101,10 @@ " the connection parameters." ). +%% Allocatable resources +-define(kafka_client_id, kafka_client_id). +-define(kafka_subscriber_id, kafka_subscriber_id). + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -140,6 +144,7 @@ on_start(ResourceId, Config) -> Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}] end, ClientOpts = add_ssl_opts(ClientOpts0, SSL), + ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID), case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of ok -> ?tp( @@ -163,7 +168,21 @@ on_start(ResourceId, Config) -> start_consumer(Config, ResourceId, ClientID). -spec on_stop(resource_id(), state()) -> ok. -on_stop(_ResourceID, State) -> +on_stop(ResourceId, _State = undefined) -> + case emqx_resource:get_allocated_resources(ResourceId) of + #{?kafka_client_id := ClientID, ?kafka_subscriber_id := SubscriberId} -> + stop_subscriber(SubscriberId), + stop_client(ClientID), + ?tp(kafka_consumer_subcriber_and_client_stopped, #{}), + ok; + #{?kafka_client_id := ClientID} -> + stop_client(ClientID), + ?tp(kafka_consumer_just_client_stopped, #{}), + ok; + _ -> + ok + end; +on_stop(_ResourceId, State) -> #{ subscriber_id := SubscriberId, kafka_client_id := ClientID @@ -333,6 +352,9 @@ start_consumer(Config, ResourceId, ClientID) -> %% spawns one worker for each assigned topic-partition %% automatically, so we should not spawn duplicate workers. SubscriberId = make_subscriber_id(BridgeName), + ?tp(kafka_consumer_about_to_start_subscriber, #{}), + ok = emqx_resource:allocate_resource(ResourceId, ?kafka_subscriber_id, SubscriberId), + ?tp(kafka_consumer_subscriber_allocated, #{}), case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of {ok, _ConsumerPid} -> ?tp( diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 7bee2c70d..8b8337b09 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -23,6 +23,11 @@ -include_lib("emqx/include/logger.hrl"). +%% Allocatable resources +-define(kafka_resource_id, kafka_resource_id). +-define(kafka_client_id, kafka_client_id). +-define(kafka_producers, kafka_producers). + %% TODO: rename this to `kafka_producer' after alias support is added %% to hocon; keeping this as just `kafka' for backwards compatibility. -define(BRIDGE_TYPE, kafka). @@ -46,9 +51,11 @@ on_start(InstId, Config) -> } = Config, BridgeType = ?BRIDGE_TYPE, ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), + ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId), _ = maybe_install_wolff_telemetry_handlers(ResourceId), Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), + ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), ClientConfig = #{ min_metadata_refresh_interval => MinMetaRefreshInterval, connect_timeout => ConnTimeout, @@ -86,6 +93,7 @@ on_start(InstId, Config) -> WolffProducerConfig = producers_config(BridgeName, ClientId, KafkaConfig, IsDryRun), case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of {ok, Producers} -> + ok = emqx_resource:allocate_resource(InstId, ?kafka_producers, Producers), {ok, #{ message_template => compile_message_template(MessageTemplate), client_id => ClientId, @@ -120,28 +128,63 @@ on_start(InstId, Config) -> ) end. -on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) -> - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, +on_stop(InstanceId, _State) -> + case emqx_resource:get_allocated_resources(InstanceId) of #{ - msg => "failed_to_delete_kafka_producer", - client_id => ClientID - } - ), - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_client(ClientID) end, - #{ - msg => "failed_to_delete_kafka_client", - client_id => ClientID - } - ), - with_log_at_error( - fun() -> uninstall_telemetry_handlers(ResourceID) end, - #{ - msg => "failed_to_uninstall_telemetry_handlers", - client_id => ClientID - } - ). + ?kafka_client_id := ClientId, + ?kafka_producers := Producers, + ?kafka_resource_id := ResourceId + } -> + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, + #{ + msg => "failed_to_delete_kafka_producer", + client_id => ClientId + } + ), + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientId + } + ), + _ = with_log_at_error( + fun() -> uninstall_telemetry_handlers(ResourceId) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + resource_id => ResourceId + } + ), + ok; + #{?kafka_client_id := ClientId, ?kafka_resource_id := ResourceId} -> + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientId + } + ), + _ = with_log_at_error( + fun() -> uninstall_telemetry_handlers(ResourceId) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + resource_id => ResourceId + } + ), + ok; + #{?kafka_resource_id := ResourceId} -> + _ = with_log_at_error( + fun() -> uninstall_telemetry_handlers(ResourceId) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + resource_id => ResourceId + } + ), + ok; + _ -> + ok + end. on_query( _InstId, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index c17d21635..bffd4caa4 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -59,7 +59,9 @@ only_once_tests() -> t_cluster_group, t_node_joins_existing_cluster, t_cluster_node_down, - t_multiple_topic_mappings + t_multiple_topic_mappings, + t_resource_manager_crash_after_subscriber_started, + t_resource_manager_crash_before_subscriber_started ]. init_per_suite(Config) -> @@ -1118,6 +1120,24 @@ stop_async_publisher(Pid) -> end, ok. +kill_resource_managers() -> + ct:pal("gonna kill resource managers"), + lists:foreach( + fun({_, Pid, _, _}) -> + ct:pal("terminating resource manager ~p", [Pid]), + Ref = monitor(process, Pid), + exit(Pid, kill), + receive + {'DOWN', Ref, process, Pid, killed} -> + ok + after 500 -> + ct:fail("pid ~p didn't die!", [Pid]) + end, + ok + end, + supervisor:which_children(emqx_resource_manager_sup) + ). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -2024,3 +2044,65 @@ t_begin_offset_earliest(Config) -> end ), ok. + +t_resource_manager_crash_after_subscriber_started(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := kafka_consumer_subscriber_allocated}, + #{?snk_kind := will_kill_resource_manager} + ), + ?force_ordering( + #{?snk_kind := resource_manager_killed}, + #{?snk_kind := kafka_consumer_subscriber_started} + ), + spawn_link(fun() -> + ?tp(will_kill_resource_manager, #{}), + kill_resource_managers(), + ?tp(resource_manager_killed, #{}), + ok + end), + %% even if the resource manager is dead, we can still + %% clear the allocated resources. + {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := kafka_consumer_subcriber_and_client_stopped}, + 10_000 + ), + ok + end, + [] + ), + ok. + +t_resource_manager_crash_before_subscriber_started(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := kafka_consumer_client_started}, + #{?snk_kind := will_kill_resource_manager} + ), + ?force_ordering( + #{?snk_kind := resource_manager_killed}, + #{?snk_kind := kafka_consumer_about_to_start_subscriber} + ), + spawn_link(fun() -> + ?tp(will_kill_resource_manager, #{}), + kill_resource_managers(), + ?tp(resource_manager_killed, #{}), + ok + end), + %% even if the resource manager is dead, we can still + %% clear the allocated resources. + {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := kafka_consumer_just_client_stopped}, + 10_000 + ), + ok + end, + [] + ), + ok. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index d1a29fffe..ad41c9904 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -446,6 +446,8 @@ t_failed_creation_then_fix(Config) -> ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), %% TODO: refactor those into init/end per testcase ok = ?PRODUCER:on_stop(ResourceId, State), + ?assertEqual([], supervisor:which_children(wolff_client_sup)), + ?assertEqual([], supervisor:which_children(wolff_producers_sup)), ok = emqx_bridge_resource:remove(BridgeId), delete_all_bridges(), ok. diff --git a/changes/ee/feat-10813.en.md b/changes/ee/feat-10813.en.md new file mode 100644 index 000000000..b36039f4c --- /dev/null +++ b/changes/ee/feat-10813.en.md @@ -0,0 +1 @@ +Refactored Kafka Producer and Consumer bridges to avoid leaking resources during crashes at creation. From 0ca3f51503f6381dcaac69442d9a2e67c5e2a4ca Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 17:47:26 -0300 Subject: [PATCH 05/10] fix(kafka): improve shutdown and health check logs during shutdown --- .../src/emqx_bridge_kafka_impl_consumer.erl | 34 +++++++++++++------ .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 1 + 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 225f90c18..c0de23d94 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -381,7 +381,13 @@ start_consumer(Config, ResourceId, ClientID) -> stop_subscriber(SubscriberId) -> _ = log_when_error( fun() -> - emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId) + try + emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId) + catch + exit:{noproc, _} -> + %% may happen when node is shutting down + ok + end end, #{ msg => "failed_to_delete_kafka_subscriber", @@ -465,16 +471,22 @@ do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> end. are_subscriber_workers_alive(SubscriberId) -> - Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup), - case lists:keyfind(SubscriberId, 1, Children) of - false -> - false; - {_, Pid, _, _} -> - Workers = brod_group_subscriber_v2:get_workers(Pid), - %% we can't enforce the number of partitions on a single - %% node, as the group might be spread across an emqx - %% cluster. - lists:all(fun is_process_alive/1, maps:values(Workers)) + try + Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup), + case lists:keyfind(SubscriberId, 1, Children) of + false -> + false; + {_, Pid, _, _} -> + Workers = brod_group_subscriber_v2:get_workers(Pid), + %% we can't enforce the number of partitions on a single + %% node, as the group might be spread across an emqx + %% cluster. + lists:all(fun is_process_alive/1, maps:values(Workers)) + end + catch + exit:{shutdown, _} -> + %% may happen if node is shutting down + false end. log_when_error(Fun, Log) -> diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index bffd4caa4..194ca95d6 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -335,6 +335,7 @@ init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config0) -> ct:timetrap(timer:seconds(60)), delete_all_bridges(), + emqx_config:delete_override_conf_files(), KafkaTopic = << (atom_to_binary(TestCase))/binary, From cb34bc5c46ddf5a369a77ecbf50089494eb1ea35 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 13:13:54 -0300 Subject: [PATCH 06/10] test(kafka_consumer): attempt to stabilize cluster tests Example failure: https://github.com/emqx/emqx/actions/runs/5070096710/jobs/9105822319#step:7:515 The attempt here is to setup the spy as early as possible, before the bridge starts, so we avoid missing rebalancing events. --- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 61 +++++++++---------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 194ca95d6..8b61b2ee1 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -997,36 +997,33 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) -> Assignments ). -setup_group_subscriber_spy(Node) -> +setup_group_subscriber_spy_fn() -> TestPid = self(), - ok = erpc:call( - Node, - fun() -> - ok = meck:new(brod_group_subscriber_v2, [ - passthrough, no_link, no_history, non_strict - ]), - ok = meck:expect( - brod_group_subscriber_v2, - assignments_received, - fun(Pid, MemberId, GenerationId, TopicAssignments) -> - ?tp( - kafka_assignment, - #{ - node => node(), - pid => Pid, - member_id => MemberId, - generation_id => GenerationId, - topic_assignments => TopicAssignments - } - ), - TestPid ! - {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, - meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) - end - ), - ok - end - ). + fun() -> + ok = meck:new(brod_group_subscriber_v2, [ + passthrough, no_link, no_history, non_strict + ]), + ok = meck:expect( + brod_group_subscriber_v2, + assignments_received, + fun(Pid, MemberId, GenerationId, TopicAssignments) -> + ?tp( + kafka_assignment, + #{ + node => node(), + pid => Pid, + member_id => MemberId, + generation_id => GenerationId, + topic_assignments => TopicAssignments + } + ), + TestPid ! + {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, + meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) + end + ), + ok + end. wait_for_cluster_rpc(Node) -> %% need to wait until the config handler is ready after @@ -1070,6 +1067,7 @@ cluster(Config) -> _ -> ct_slave end, + ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(), Cluster = emqx_common_test_helpers:emqx_cluster( [core, core], [ @@ -1083,6 +1081,7 @@ cluster(Config) -> {env_handler, fun (emqx) -> application:set_env(emqx, boot_modules, [broker, router]), + ExtraEnvHandlerHook(), ok; (emqx_conf) -> ok; @@ -1701,7 +1700,6 @@ t_cluster_group(Config) -> Nodes ) end), - lists:foreach(fun setup_group_subscriber_spy/1, Nodes), {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}), length(Nodes), @@ -1778,7 +1776,6 @@ t_node_joins_existing_cluster(Config) -> ct:pal("stopping ~p", [N1]), ok = emqx_common_test_helpers:stop_slave(N1) end), - setup_group_subscriber_spy(N1), {{ok, _}, {ok, _}} = ?wait_async_action( erpc:call(N1, fun() -> @@ -1822,7 +1819,6 @@ t_node_joins_existing_cluster(Config) -> ct:pal("stopping ~p", [N2]), ok = emqx_common_test_helpers:stop_slave(N2) end), - setup_group_subscriber_spy(N2), Nodes = [N1, N2], wait_for_cluster_rpc(N2), @@ -1923,7 +1919,6 @@ t_cluster_node_down(Config) -> Nodes ) end), - lists:foreach(fun setup_group_subscriber_spy/1, Nodes), {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}), length(Nodes), From e43517188fbd61857a3d92e5a32009dcc58e6571 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 17:47:57 -0300 Subject: [PATCH 07/10] fix(bridge): remove bridge if exception happens during creation --- apps/emqx_bridge/src/emqx_bridge.erl | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 1fbd6902e..cbbe937aa 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -227,9 +227,13 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> diff_confs(NewConf, OldConf), %% The config update will be failed if any task in `perform_bridge_changes` failed. Result = perform_bridge_changes([ - {fun emqx_bridge_resource:remove/4, Removed}, - {fun emqx_bridge_resource:create/4, Added}, - {fun emqx_bridge_resource:update/4, Updated} + #{action => fun emqx_bridge_resource:remove/4, data => Removed}, + #{ + action => fun emqx_bridge_resource:create/4, + data => Added, + on_exception_fn => fun emqx_bridge_resource:remove/4 + }, + #{action => fun emqx_bridge_resource:update/4, data => Updated} ]), ok = unload_hook(), ok = load_hook(NewConf), @@ -345,7 +349,8 @@ perform_bridge_changes(Tasks) -> perform_bridge_changes([], Result) -> Result; -perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> +perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> + OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), Result = maps:fold( fun ({_Type, _Name}, _Conf, {error, Reason}) -> @@ -359,9 +364,21 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> end; ({Type, Name}, Conf, _) -> ResOpts = emqx_resource:fetch_creation_opts(Conf), - case Action(Type, Name, Conf, ResOpts) of + try Action(Type, Name, Conf, ResOpts) of {error, Reason} -> {error, Reason}; Return -> Return + catch + Kind:Error:Stacktrace -> + ?SLOG(error, #{ + msg => "bridge_config_update_exception", + kind => Kind, + error => Error, + type => Type, + name => Name, + stacktrace => Stacktrace + }), + OnException(Type, Name, Conf, ResOpts), + erlang:raise(Kind, Error, Stacktrace) end end, Result0, From 32e6213ce3e01cca814015766887de890b4bfb3d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 25 May 2023 15:01:39 -0300 Subject: [PATCH 08/10] fix(resource_manager_sup): use `one_for_one` instead of `simple_one_for_one` Using `simple_one_for_one` has a potential race condition issue where we read the PID of the resource manager before trying to remove a resource, and then that PID changes because it was either dead at first, or it crashed and changed, and later we use this stale PID to try to remove it from the supervisor. Under such circumstances, the restarting child might linger in the supervisor, leaking resources. By using the resource ID itself as a child ID (and using `one_for_one` restart strategy), we ensure the child is truly removed. --- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 57 ++++++++++++++++++- ...emqx_bridge_pulsar_impl_producer_SUITE.erl | 9 +++ .../src/emqx_resource_manager.erl | 3 +- .../src/emqx_resource_manager_sup.erl | 37 ++++++------ 4 files changed, 84 insertions(+), 22 deletions(-) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 8b61b2ee1..7e7acbcd5 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -2058,14 +2058,41 @@ t_resource_manager_crash_after_subscriber_started(Config) -> ?tp(resource_manager_killed, #{}), ok end), + %% even if the resource manager is dead, we can still %% clear the allocated resources. - {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + + %% We avoid asserting only the `config_update_crashed' + %% error here because there's a race condition (just a + %% problem for the test assertion below) in which the + %% `emqx_resource_manager:create/5' call returns a failure + %% (not checked) and then `lookup' in that module is + %% delayed enough so that the manager supervisor has time + %% to restart the manager process and for the latter to + %% startup successfully. Occurs frequently in CI... + + {Res, {ok, _}} = ?wait_async_action( create_bridge(Config), #{?snk_kind := kafka_consumer_subcriber_and_client_stopped}, 10_000 ), + case Res of + {error, {config_update_crashed, {killed, _}}} -> + ok; + {ok, _} -> + %% the new manager may have had time to startup + %% before the resource status cache is read... + ok; + _ -> + ct:fail("unexpected result: ~p", [Res]) + end, + ?assertMatch({ok, _}, delete_bridge(Config)), + ?retry( + _Sleep = 50, + _Attempts = 50, + ?assertEqual([], supervisor:which_children(emqx_bridge_kafka_consumer_sup)) + ), ok end, [] @@ -2089,14 +2116,40 @@ t_resource_manager_crash_before_subscriber_started(Config) -> ?tp(resource_manager_killed, #{}), ok end), + %% even if the resource manager is dead, we can still %% clear the allocated resources. - {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + + %% We avoid asserting only the `config_update_crashed' + %% error here because there's a race condition (just a + %% problem for the test assertion below) in which the + %% `emqx_resource_manager:create/5' call returns a failure + %% (not checked) and then `lookup' in that module is + %% delayed enough so that the manager supervisor has time + %% to restart the manager process and for the latter to + %% startup successfully. Occurs frequently in CI... + {Res, {ok, _}} = ?wait_async_action( create_bridge(Config), #{?snk_kind := kafka_consumer_just_client_stopped}, 10_000 ), + case Res of + {error, {config_update_crashed, {killed, _}}} -> + ok; + {ok, _} -> + %% the new manager may have had time to startup + %% before the resource status cache is read... + ok; + _ -> + ct:fail("unexpected result: ~p", [Res]) + end, + ?assertMatch({ok, _}, delete_bridge(Config)), + ?retry( + _Sleep = 50, + _Attempts = 50, + ?assertEqual([], supervisor:which_children(emqx_bridge_kafka_consumer_sup)) + ), ok end, [] diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl index 4dc318205..a5c04160c 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_impl_producer_SUITE.erl @@ -285,6 +285,11 @@ create_bridge(Config, Overrides) -> PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides), emqx_bridge:create(Type, Name, PulsarConfig). +delete_bridge(Config) -> + Type = ?BRIDGE_TYPE_BIN, + Name = ?config(pulsar_name, Config), + emqx_bridge:remove(Type, Name). + create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). @@ -1008,6 +1013,8 @@ t_resource_manager_crash_after_producers_started(Config) -> Producers =/= undefined, 10_000 ), + ?assertMatch({ok, _}, delete_bridge(Config)), + ?assertEqual([], get_pulsar_producers()), ok end, [] @@ -1039,6 +1046,8 @@ t_resource_manager_crash_before_producers_started(Config) -> #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, 10_000 ), + ?assertMatch({ok, _}, delete_bridge(Config)), + ?assertEqual([], get_pulsar_producers()), ok end, [] diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 7a54bfa97..388251c0b 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -192,14 +192,13 @@ remove(ResId) when is_binary(ResId) -> %% @doc Stops a running resource_manager and optionally clears the metrics for the resource -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. remove(ResId, ClearMetrics) when is_binary(ResId) -> - ResourceManagerPid = gproc:whereis_name(?NAME(ResId)), try safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) after %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process %% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition. %% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long. - emqx_resource_manager_sup:delete_child(ResourceManagerPid) + emqx_resource_manager_sup:delete_child(ResId) end. %% @doc Stops and then starts an instance that was already running diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 9e86e6363..732d5e513 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -26,12 +26,12 @@ -export([init/1]). ensure_child(ResId, Group, ResourceType, Config, Opts) -> - _ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]), + _ = supervisor:start_child(?MODULE, child_spec(ResId, Group, ResourceType, Config, Opts)), ok. -delete_child(Pid) -> - _ = supervisor:terminate_child(?MODULE, Pid), - _ = supervisor:delete_child(?MODULE, Pid), +delete_child(ResId) -> + _ = supervisor:terminate_child(?MODULE, ResId), + _ = supervisor:delete_child(?MODULE, ResId), ok. start_link() -> @@ -44,18 +44,19 @@ init([]) -> public, {read_concurrency, true} ]), - ChildSpecs = [ - #{ - id => emqx_resource_manager, - start => {emqx_resource_manager, start_link, []}, - restart => transient, - %% never force kill a resource manager. - %% becasue otherwise it may lead to release leak, - %% resource_manager's terminate callback calls resource on_stop - shutdown => infinity, - type => worker, - modules => [emqx_resource_manager] - } - ], - SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10}, + ChildSpecs = [], + SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, {ok, {SupFlags, ChildSpecs}}. + +child_spec(ResId, Group, ResourceType, Config, Opts) -> + #{ + id => ResId, + start => {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]}, + restart => transient, + %% never force kill a resource manager. + %% becasue otherwise it may lead to release leak, + %% resource_manager's terminate callback calls resource on_stop + shutdown => infinity, + type => worker, + modules => [emqx_resource_manager] + }. From 7188346d6c3b683e20bf991f5f68c4be55066b73 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 May 2023 11:29:55 -0300 Subject: [PATCH 09/10] ci: improve script Co-authored-by: Zaiming (Stone) Shi --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 200fa7c71..d220c8a50 100644 --- a/Makefile +++ b/Makefile @@ -297,4 +297,4 @@ fmt: $(REBAR) .PHONY: clean-test-cluster-config clean-test-cluster-config: - @rm apps/emqx_conf/data/configs/cluster.hocon || true + @rm -f apps/emqx_conf/data/configs/cluster.hocon || true From d27f593309e928dad2fb361af018d2032d952314 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 May 2023 16:40:18 -0300 Subject: [PATCH 10/10] test: fix flaky authn test --- apps/emqx/test/emqx_authentication_SUITE.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqx/test/emqx_authentication_SUITE.erl b/apps/emqx/test/emqx_authentication_SUITE.erl index 0190ab936..652b634ca 100644 --- a/apps/emqx/test/emqx_authentication_SUITE.erl +++ b/apps/emqx/test/emqx_authentication_SUITE.erl @@ -98,6 +98,8 @@ init_per_suite(Config) -> LogLevel = emqx_logger:get_primary_log_level(), ok = emqx_logger:set_log_level(debug), application:set_env(ekka, strict_mode, true), + emqx_config:erase_all(), + emqx_common_test_helpers:stop_apps([]), emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), [{log_level, LogLevel} | Config].