fix(kafka_producer): fix message loss when kafka connection is down

This commit is contained in:
Thales Macedo Garitezi 2023-01-02 15:04:53 -03:00
parent 5bd9f110d6
commit 8e59319bfe
1 changed files with 16 additions and 4 deletions

View File

@ -176,10 +176,22 @@ on_kafka_ack(_Partition, _Offset, _Extra) ->
%% Maybe need to bump some counters? %% Maybe need to bump some counters?
ok. ok.
on_get_status(_InstId, #{client_id := ClientID}) -> on_get_status(_InstId, #{producers := Producers}) ->
case wolff:check_connectivity(ClientID) of %% Just to pick some producer.
ok -> connected; RandomVal = emqx_guid:gen(),
_ -> disconnected FakeMsg = [#{key => RandomVal, value => RandomVal}],
%% Note: we must not check the connectivity to Kafka itself, but
%% only if there are producers available. Otherwise, if Kafka
%% goes down, the resource will be considered down and messages
%% *will not* be sent to the wolff producers and buffered,
%% effectively losing the message as there are no buffer workers
%% for Kafka producer.
try wolff_producers:pick_producer(Producers, FakeMsg) of
{_Partition, _Pid} ->
connected
catch
error:{producer_down, _} ->
disconnected
end. end.
%% Parse comma separated host:port list into a [{Host,Port}] list %% Parse comma separated host:port list into a [{Host,Port}] list