From 7242ce426b5f5e842980acbee5c853c11e27fe1b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 19 Dec 2022 16:09:51 -0300 Subject: [PATCH] fix(kafka_producer): cleanup client after failing to start producers https://emqx.atlassian.net/browse/EMQX-8547 If a Kafka Producer bridge is given bad configuration (e.g.: bad authn credentials), the Wolff client process is started successfully, as it does not attempt to connect, but when the producer process is attempted to be started, it fails (only then the client tries to connect to Kafka). At this point, an error was thrown, but the supervised client process remained running. If the configuration was later fixed and the bridge updated, which prompted its removal and recreation, the Wolff client would report to be "already started", so it would never pick up the new (fixed) configuration, and the producers would perpetually fail to start until the node would be restarted. We simply ensure the client is stopped before throwing the error, unrolling the start-up procedure. --- .../kafka/emqx_bridge_impl_kafka_producer.erl | 13 ++++ .../emqx_bridge_impl_kafka_producer_SUITE.erl | 61 +++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 0eeb9db0c..6145716f2 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -86,6 +86,19 @@ on_start(InstId, Config) -> kafka_topic => KafkaTopic, reason => Reason2 }), + %% Need to stop the already running client; otherwise, the + %% next `on_start' call will try to ensure the client + %% exists and it will be already present and using the old + %% config. This is specially bad if the original crash + %% was due to misconfiguration and we are trying to fix + %% it... + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientId + } + ), throw(failed_to_start_kafka_producer) end. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index c9a8230ba..44149826d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -337,6 +337,67 @@ kafka_bridge_rest_api_helper(Config) -> false = MyKafkaBridgeExists(), ok. +%%------------------------------------------------------------------------------ +%% Other tests +%%------------------------------------------------------------------------------ + +%% Need to stop the already running client; otherwise, the +%% next `on_start' call will try to ensure the client +%% exists and it will. This is specially bad if the +%% original crash was due to misconfiguration and we are +%% trying to fix it... +t_failed_creation_then_fix(_Config) -> + HostsString = kafka_hosts_string_sasl(), + ValidAuthSettings = valid_sasl_plain_settings(), + WrongAuthSettings = ValidAuthSettings#{"password" := "wrong"}, + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + ResourceId = emqx_bridge_resource:resource_id("kafka", Name), + BridgeId = emqx_bridge_resource:bridge_id("kafka", Name), + KafkaTopic = "test-topic-one-partition", + WrongConf = config(#{ + "authentication" => WrongAuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "ssl" => #{} + }), + ValidConf = config(#{ + "authentication" => ValidAuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "ssl" => #{} + }), + %% creates, but fails to start producers + %% FIXME: change to kafka_producer after config refactoring + ?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), WrongConf, #{})), + ?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConf)), + %% before throwing, it should cleanup the client process. + ?assertEqual([], supervisor:which_children(wolff_client_sup)), + %% FIXME: change to kafka_producer after config refactoring + %% must succeed with correct config + ?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), ValidConf, #{})), + {ok, State} = ?PRODUCER:on_start(ResourceId, ValidConf), + %% To make sure we get unique value + timer:sleep(1), + Time = erlang:monotonic_time(), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + ct:pal("base offset before testing ~p", [Offset]), + ?assertEqual(ok, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)), + {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), + ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), + %% TODO: refactor those into init/end per testcase + ok = ?PRODUCER:on_stop(ResourceId, State), + ok = emqx_bridge_resource:remove(BridgeId), + ok. + %%------------------------------------------------------------------------------ %% Helper functions %%------------------------------------------------------------------------------