From 2dd506164328b8f3bc29befdef47f40d4043fb64 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Wed, 1 Nov 2023 08:44:10 +0100 Subject: [PATCH] fix(kafka): kafka bridge replaq dir conflict --- .../src/emqx_bridge_kafka_impl_producer.erl | 100 +++++++++++------- 1 file changed, 64 insertions(+), 36 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 50c2ddbe1..2c6e6bc1c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -63,6 +63,11 @@ tr_config(_Key, Value) -> %% @doc Config schema is defined in emqx_bridge_kafka. on_start(InstId, Config) -> + ?SLOG(debug, #{ + msg => "kafka_client_starting", + instance_id => InstId, + config => emqx_utils:redact(Config) + }), C = fun(Key) -> check_config(Key, Config) end, Hosts = C(bootstrap_hosts), ClientConfig = #{ @@ -74,36 +79,8 @@ on_start(InstId, Config) -> ssl => C(ssl) }, ClientId = InstId, - ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), - case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of - {ok, _} -> - case wolff_client_sup:find_client(ClientId) of - {ok, Pid} -> - case wolff_client:check_connectivity(Pid) of - ok -> - ok; - {error, Error} -> - deallocate_client(ClientId), - throw({failed_to_connect, Error}) - end; - {error, Reason} -> - deallocate_client(ClientId), - throw({failed_to_find_created_client, Reason}) - end, - ?SLOG(info, #{ - msg => "kafka_client_started", - instance_id => InstId, - kafka_hosts => Hosts - }); - {error, Reason} -> - ?SLOG(error, #{ - msg => failed_to_start_kafka_client, - instance_id => InstId, - kafka_hosts => Hosts, - reason => Reason - }), - throw(failed_to_start_kafka_client) - end, + emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), + ok = ensure_client(ClientId, Hosts, ClientConfig), %% Check if this is a dry run {ok, #{ client_id => ClientId, @@ -156,7 +133,7 @@ create_producers_for_bridge_v2( end, ok = check_topic_and_leader_connections(ClientId, KafkaTopic), WolffProducerConfig = producers_config( - BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id + BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id ), case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of {ok, Producers} -> @@ -215,6 +192,54 @@ on_stop(InstanceId, _State) -> ?tp(kafka_producer_stopped, #{instance_id => InstanceId}), ok. +ensure_client(ClientId, Hosts, ClientConfig) -> + case wolff_client_sup:find_client(ClientId) of + {ok, Pid} -> + case wolff_client:check_connectivity(Pid) of + ok -> + ok; + {error, Error} -> + deallocate_client(ClientId), + throw({failed_to_connect, Error}) + end; + {error, no_such_client} -> + case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of + {ok, _} -> + ok = ensure_connectivity(ClientId), + ?SLOG(info, #{ + msg => "kafka_client_started", + client_id => ClientId, + kafka_hosts => Hosts + }); + {error, Reason} -> + ?SLOG(error, #{ + msg => failed_to_start_kafka_client, + client_id => ClientId, + kafka_hosts => Hosts, + reason => Reason + }), + throw(failed_to_start_kafka_client) + end; + {error, Reason} -> + deallocate_client(ClientId), + throw({failed_to_find_created_client, Reason}) + end. + +ensure_connectivity(ClientId) -> + case wolff_client_sup:find_client(ClientId) of + {ok, Pid} -> + case wolff_client:check_connectivity(Pid) of + ok -> + ok; + {error, Error} -> + deallocate_client(ClientId), + throw({failed_to_connect, Error}) + end; + {error, Reason} -> + deallocate_client(ClientId), + throw({failed_to_find_created_client, Reason}) + end. + deallocate_client(ClientId) -> _ = with_log_at_error( fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, @@ -573,7 +598,7 @@ ssl(#{enable := true} = SSL) -> ssl(_) -> false. -producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id) -> +producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> #{ max_batch_bytes := MaxBatchBytes, compression := Compression, @@ -596,8 +621,8 @@ producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id) {OffloadMode, ReplayqDir} = case BufferMode of memory -> {false, false}; - disk -> {false, replayq_dir(ClientId)}; - hybrid -> {true, replayq_dir(ClientId)} + disk -> {false, replayq_dir(BridgeType, BridgeName)}; + hybrid -> {true, replayq_dir(BridgeType, BridgeName)} end, #{ name => make_producer_name(BridgeType, BridgeName, IsDryRun), @@ -620,8 +645,11 @@ producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id) partitioner(random) -> random; partitioner(key_dispatch) -> first_key_dispatch. -replayq_dir(ClientId) -> - filename:join([emqx:data_dir(), "kafka", ClientId]). +replayq_dir(BridgeType, BridgeName) -> + DirName = iolist_to_binary([ + emqx_bridge_lib:downgrade_type(BridgeType), ":", BridgeName, ":", atom_to_list(node()) + ]), + filename:join([emqx:data_dir(), "kafka", DirName]). %% Producer name must be an atom which will be used as a ETS table name for %% partition worker lookup.