fix(kafka): kafka bridge replaq dir conflict

This commit is contained in:
Ivan Dyachkov 2023-11-01 08:44:10 +01:00
parent b2ff9b4897
commit 2dd5061643
1 changed files with 64 additions and 36 deletions

View File

@ -63,6 +63,11 @@ tr_config(_Key, Value) ->
%% @doc Config schema is defined in emqx_bridge_kafka.
on_start(InstId, Config) ->
?SLOG(debug, #{
msg => "kafka_client_starting",
instance_id => InstId,
config => emqx_utils:redact(Config)
}),
C = fun(Key) -> check_config(Key, Config) end,
Hosts = C(bootstrap_hosts),
ClientConfig = #{
@ -74,36 +79,8 @@ on_start(InstId, Config) ->
ssl => C(ssl)
},
ClientId = InstId,
ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
{ok, _} ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
case wolff_client:check_connectivity(Pid) of
ok ->
ok;
{error, Error} ->
deallocate_client(ClientId),
throw({failed_to_connect, Error})
end;
{error, Reason} ->
deallocate_client(ClientId),
throw({failed_to_find_created_client, Reason})
end,
?SLOG(info, #{
msg => "kafka_client_started",
instance_id => InstId,
kafka_hosts => Hosts
});
{error, Reason} ->
?SLOG(error, #{
msg => failed_to_start_kafka_client,
instance_id => InstId,
kafka_hosts => Hosts,
reason => Reason
}),
throw(failed_to_start_kafka_client)
end,
emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
ok = ensure_client(ClientId, Hosts, ClientConfig),
%% Check if this is a dry run
{ok, #{
client_id => ClientId,
@ -156,7 +133,7 @@ create_producers_for_bridge_v2(
end,
ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
WolffProducerConfig = producers_config(
BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id
BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id
),
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
{ok, Producers} ->
@ -215,6 +192,54 @@ on_stop(InstanceId, _State) ->
?tp(kafka_producer_stopped, #{instance_id => InstanceId}),
ok.
ensure_client(ClientId, Hosts, ClientConfig) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
case wolff_client:check_connectivity(Pid) of
ok ->
ok;
{error, Error} ->
deallocate_client(ClientId),
throw({failed_to_connect, Error})
end;
{error, no_such_client} ->
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
{ok, _} ->
ok = ensure_connectivity(ClientId),
?SLOG(info, #{
msg => "kafka_client_started",
client_id => ClientId,
kafka_hosts => Hosts
});
{error, Reason} ->
?SLOG(error, #{
msg => failed_to_start_kafka_client,
client_id => ClientId,
kafka_hosts => Hosts,
reason => Reason
}),
throw(failed_to_start_kafka_client)
end;
{error, Reason} ->
deallocate_client(ClientId),
throw({failed_to_find_created_client, Reason})
end.
ensure_connectivity(ClientId) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
case wolff_client:check_connectivity(Pid) of
ok ->
ok;
{error, Error} ->
deallocate_client(ClientId),
throw({failed_to_connect, Error})
end;
{error, Reason} ->
deallocate_client(ClientId),
throw({failed_to_find_created_client, Reason})
end.
deallocate_client(ClientId) ->
_ = with_log_at_error(
fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,
@ -573,7 +598,7 @@ ssl(#{enable := true} = SSL) ->
ssl(_) ->
false.
producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id) ->
producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
#{
max_batch_bytes := MaxBatchBytes,
compression := Compression,
@ -596,8 +621,8 @@ producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id)
{OffloadMode, ReplayqDir} =
case BufferMode of
memory -> {false, false};
disk -> {false, replayq_dir(ClientId)};
hybrid -> {true, replayq_dir(ClientId)}
disk -> {false, replayq_dir(BridgeType, BridgeName)};
hybrid -> {true, replayq_dir(BridgeType, BridgeName)}
end,
#{
name => make_producer_name(BridgeType, BridgeName, IsDryRun),
@ -620,8 +645,11 @@ producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id)
partitioner(random) -> random;
partitioner(key_dispatch) -> first_key_dispatch.
replayq_dir(ClientId) ->
filename:join([emqx:data_dir(), "kafka", ClientId]).
replayq_dir(BridgeType, BridgeName) ->
DirName = iolist_to_binary([
emqx_bridge_lib:downgrade_type(BridgeType), ":", BridgeName, ":", atom_to_list(node())
]),
filename:join([emqx:data_dir(), "kafka", DirName]).
%% Producer name must be an atom which will be used as a ETS table name for
%% partition worker lookup.