fix(kafka_producer): cleanup client after failing to start producers

https://emqx.atlassian.net/browse/EMQX-8547

If a Kafka Producer bridge is given bad configuration (e.g.: bad authn
credentials), the Wolff client process is started successfully, as it
does not attempt to connect, but when the producer process is
attempted to be started, it fails (only then the client tries to
connect to Kafka).  At this point, an error was thrown, but the
supervised client process remained running.

If the configuration was later fixed and the bridge updated, which
prompted its removal and recreation, the Wolff client would report to
be "already started", so it would never pick up the new (fixed)
configuration, and the producers would perpetually fail to start until
the node would be restarted.

We simply ensure the client is stopped before throwing the error,
unrolling the start-up procedure.
This commit is contained in:
Thales Macedo Garitezi 2022-12-19 16:09:51 -03:00
parent 7aee1a08aa
commit 7242ce426b
2 changed files with 74 additions and 0 deletions

View File

@ -86,6 +86,19 @@ on_start(InstId, Config) ->
kafka_topic => KafkaTopic, kafka_topic => KafkaTopic,
reason => Reason2 reason => Reason2
}), }),
%% Need to stop the already running client; otherwise, the
%% next `on_start' call will try to ensure the client
%% exists and it will be already present and using the old
%% config. This is specially bad if the original crash
%% was due to misconfiguration and we are trying to fix
%% it...
_ = with_log_at_error(
fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,
#{
msg => "failed_to_delete_kafka_client",
client_id => ClientId
}
),
throw(failed_to_start_kafka_producer) throw(failed_to_start_kafka_producer)
end. end.

View File

@ -337,6 +337,67 @@ kafka_bridge_rest_api_helper(Config) ->
false = MyKafkaBridgeExists(), false = MyKafkaBridgeExists(),
ok. ok.
%%------------------------------------------------------------------------------
%% Other tests
%%------------------------------------------------------------------------------
%% Need to stop the already running client; otherwise, the
%% next `on_start' call will try to ensure the client
%% exists and it will. This is specially bad if the
%% original crash was due to misconfiguration and we are
%% trying to fix it...
t_failed_creation_then_fix(_Config) ->
HostsString = kafka_hosts_string_sasl(),
ValidAuthSettings = valid_sasl_plain_settings(),
WrongAuthSettings = ValidAuthSettings#{"password" := "wrong"},
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
ResourceId = emqx_bridge_resource:resource_id("kafka", Name),
BridgeId = emqx_bridge_resource:bridge_id("kafka", Name),
KafkaTopic = "test-topic-one-partition",
WrongConf = config(#{
"authentication" => WrongAuthSettings,
"kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic,
"instance_id" => ResourceId,
"ssl" => #{}
}),
ValidConf = config(#{
"authentication" => ValidAuthSettings,
"kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic,
"instance_id" => ResourceId,
"ssl" => #{}
}),
%% creates, but fails to start producers
%% FIXME: change to kafka_producer after config refactoring
?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), WrongConf, #{})),
?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConf)),
%% before throwing, it should cleanup the client process.
?assertEqual([], supervisor:which_children(wolff_client_sup)),
%% FIXME: change to kafka_producer after config refactoring
%% must succeed with correct config
?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), ValidConf, #{})),
{ok, State} = ?PRODUCER:on_start(ResourceId, ValidConf),
%% To make sure we get unique value
timer:sleep(1),
Time = erlang:monotonic_time(),
BinTime = integer_to_binary(Time),
Msg = #{
clientid => BinTime,
payload => <<"payload">>,
timestamp => Time
},
{ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
ct:pal("base offset before testing ~p", [Offset]),
?assertEqual(ok, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)),
{ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
%% TODO: refactor those into init/end per testcase
ok = ?PRODUCER:on_stop(ResourceId, State),
ok = emqx_bridge_resource:remove(BridgeId),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper functions %% Helper functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------