fix(kafka_producer): avoid multiplication of metrics when bridge is recreated

This commit is contained in:
Thales Macedo Garitezi 2023-01-02 16:08:26 -03:00
parent 8e59319bfe
commit 0fd8880d0a
1 changed files with 26 additions and 37 deletions

View File

@ -1,8 +1,10 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_impl_kafka_producer).
-include_lib("emqx_resource/include/emqx_resource.hrl").
%% callbacks of behaviour emqx_resource
-export([
callback_mode/0,
@ -36,7 +38,7 @@ on_start(InstId, Config) ->
%% TODO: change this to `kafka_producer` after refactoring for kafka_consumer
BridgeType = kafka,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
_ = maybe_install_wolff_telemetry_handlers(InstId, ResourceID),
_ = maybe_install_wolff_telemetry_handlers(ResourceID),
%% it's a bug if producer config is not found
%% the caller should not try to start a producer if
%% there is no producer config
@ -79,7 +81,8 @@ on_start(InstId, Config) ->
{ok, #{
message_template => compile_message_template(MessageTemplate),
client_id => ClientId,
producers => Producers
producers => Producers,
resource_id => ResourceID
}};
{error, Reason2} ->
?SLOG(error, #{
@ -105,7 +108,7 @@ on_start(InstId, Config) ->
throw(failed_to_start_kafka_producer)
end.
on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) ->
on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) ->
_ = with_log_at_error(
fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
#{
@ -121,7 +124,7 @@ on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) ->
}
),
with_log_at_error(
fun() -> uninstall_telemetry_handlers(InstanceID) end,
fun() -> uninstall_telemetry_handlers(ResourceID) end,
#{
msg => "failed_to_uninstall_telemetry_handlers",
client_id => ClientID
@ -176,27 +179,14 @@ on_kafka_ack(_Partition, _Offset, _Extra) ->
%% Maybe need to bump some counters?
ok.
on_get_status(_InstId, #{producers := Producers}) ->
%% Just to pick some producer.
RandomVal = emqx_guid:gen(),
FakeMsg = [#{key => RandomVal, value => RandomVal}],
%% Note: we must not check the connectivity to Kafka itself, but
%% only if there are producers available. Otherwise, if Kafka
%% goes down, the resource will be considered down and messages
%% *will not* be sent to the wolff producers and buffered,
%% effectively losing the message as there are no buffer workers
%% for Kafka producer.
try wolff_producers:pick_producer(Producers, FakeMsg) of
{_Partition, _Pid} ->
connected
catch
error:{producer_down, _} ->
disconnected
end.
on_get_status(_InstId, _State) ->
connected.
%% Parse comma separated host:port list into a [{Host,Port}] list
hosts(Hosts) ->
emqx_schema:parse_servers(Hosts, emqx_ee_bridge_kafka:host_opts()).
hosts(Hosts) when is_binary(Hosts) ->
hosts(binary_to_list(Hosts));
hosts(Hosts) when is_list(Hosts) ->
kpro:parse_endpoints(Hosts).
%% Extra socket options, such as sndbuf size etc.
socket_opts(Opts) when is_map(Opts) ->
@ -252,14 +242,10 @@ producers_config(BridgeName, ClientId, Input) ->
mode := BufferMode,
per_partition_limit := PerPartitionLimit,
segment_bytes := SegmentBytes,
memory_overload_protection := MemOLP0
memory_overload_protection := MemOLP
}
} = Input,
MemOLP =
case os:type() of
{unix, linux} -> MemOLP0;
_ -> false
end,
{OffloadMode, ReplayqDir} =
case BufferMode of
memory -> {false, false};
@ -396,20 +382,23 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
%% Event that we do not handle
ok.
-spec telemetry_handler_id(emqx_resource:resource_id()) -> binary().
telemetry_handler_id(InstanceID) ->
<<"emqx-bridge-kafka-producer-", InstanceID/binary>>.
%% Note: don't use the instance/manager ID, as that changes everytime
%% the bridge is recreated, and will lead to multiplication of
%% metrics.
-spec telemetry_handler_id(resource_id()) -> binary().
telemetry_handler_id(ResourceID) ->
<<"emqx-bridge-kafka-producer-", ResourceID/binary>>.
uninstall_telemetry_handlers(InstanceID) ->
HandlerID = telemetry_handler_id(InstanceID),
uninstall_telemetry_handlers(ResourceID) ->
HandlerID = telemetry_handler_id(ResourceID),
telemetry:detach(HandlerID).
maybe_install_wolff_telemetry_handlers(InstanceID, ResourceID) ->
maybe_install_wolff_telemetry_handlers(ResourceID) ->
%% Attach event handlers for Kafka telemetry events. If a handler with the
%% handler id already exists, the attach_many function does nothing
telemetry:attach_many(
%% unique handler id
telemetry_handler_id(InstanceID),
telemetry_handler_id(ResourceID),
[
[wolff, dropped],
[wolff, dropped_queue_full],