diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 0eeb9db0c..6145716f2 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -86,6 +86,19 @@ on_start(InstId, Config) -> kafka_topic => KafkaTopic, reason => Reason2 }), + %% Need to stop the already running client; otherwise, the + %% next `on_start' call will try to ensure the client + %% exists and it will be already present and using the old + %% config. This is specially bad if the original crash + %% was due to misconfiguration and we are trying to fix + %% it... + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientId + } + ), throw(failed_to_start_kafka_producer) end. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index c9a8230ba..44149826d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -337,6 +337,67 @@ kafka_bridge_rest_api_helper(Config) -> false = MyKafkaBridgeExists(), ok. +%%------------------------------------------------------------------------------ +%% Other tests +%%------------------------------------------------------------------------------ + +%% Need to stop the already running client; otherwise, the +%% next `on_start' call will try to ensure the client +%% exists and it will. This is specially bad if the +%% original crash was due to misconfiguration and we are +%% trying to fix it... +t_failed_creation_then_fix(_Config) -> + HostsString = kafka_hosts_string_sasl(), + ValidAuthSettings = valid_sasl_plain_settings(), + WrongAuthSettings = ValidAuthSettings#{"password" := "wrong"}, + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + ResourceId = emqx_bridge_resource:resource_id("kafka", Name), + BridgeId = emqx_bridge_resource:bridge_id("kafka", Name), + KafkaTopic = "test-topic-one-partition", + WrongConf = config(#{ + "authentication" => WrongAuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "ssl" => #{} + }), + ValidConf = config(#{ + "authentication" => ValidAuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "ssl" => #{} + }), + %% creates, but fails to start producers + %% FIXME: change to kafka_producer after config refactoring + ?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), WrongConf, #{})), + ?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConf)), + %% before throwing, it should cleanup the client process. + ?assertEqual([], supervisor:which_children(wolff_client_sup)), + %% FIXME: change to kafka_producer after config refactoring + %% must succeed with correct config + ?assertMatch(ok, emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), ValidConf, #{})), + {ok, State} = ?PRODUCER:on_start(ResourceId, ValidConf), + %% To make sure we get unique value + timer:sleep(1), + Time = erlang:monotonic_time(), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + ct:pal("base offset before testing ~p", [Offset]), + ?assertEqual(ok, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)), + {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), + ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), + %% TODO: refactor those into init/end per testcase + ok = ?PRODUCER:on_stop(ResourceId, State), + ok = emqx_bridge_resource:remove(BridgeId), + ok. + %%------------------------------------------------------------------------------ %% Helper functions %%------------------------------------------------------------------------------