fix(kafka_producer): avoid multiplication of metrics when bridge is recreated
This commit is contained in:
parent
8e59319bfe
commit
0fd8880d0a
|
@ -1,8 +1,10 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_impl_kafka_producer).
|
||||
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
|
||||
%% callbacks of behaviour emqx_resource
|
||||
-export([
|
||||
callback_mode/0,
|
||||
|
@ -36,7 +38,7 @@ on_start(InstId, Config) ->
|
|||
%% TODO: change this to `kafka_producer` after refactoring for kafka_consumer
|
||||
BridgeType = kafka,
|
||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||
_ = maybe_install_wolff_telemetry_handlers(InstId, ResourceID),
|
||||
_ = maybe_install_wolff_telemetry_handlers(ResourceID),
|
||||
%% it's a bug if producer config is not found
|
||||
%% the caller should not try to start a producer if
|
||||
%% there is no producer config
|
||||
|
@ -79,7 +81,8 @@ on_start(InstId, Config) ->
|
|||
{ok, #{
|
||||
message_template => compile_message_template(MessageTemplate),
|
||||
client_id => ClientId,
|
||||
producers => Producers
|
||||
producers => Producers,
|
||||
resource_id => ResourceID
|
||||
}};
|
||||
{error, Reason2} ->
|
||||
?SLOG(error, #{
|
||||
|
@ -105,7 +108,7 @@ on_start(InstId, Config) ->
|
|||
throw(failed_to_start_kafka_producer)
|
||||
end.
|
||||
|
||||
on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) ->
|
||||
on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) ->
|
||||
_ = with_log_at_error(
|
||||
fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
|
||||
#{
|
||||
|
@ -121,7 +124,7 @@ on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) ->
|
|||
}
|
||||
),
|
||||
with_log_at_error(
|
||||
fun() -> uninstall_telemetry_handlers(InstanceID) end,
|
||||
fun() -> uninstall_telemetry_handlers(ResourceID) end,
|
||||
#{
|
||||
msg => "failed_to_uninstall_telemetry_handlers",
|
||||
client_id => ClientID
|
||||
|
@ -176,27 +179,14 @@ on_kafka_ack(_Partition, _Offset, _Extra) ->
|
|||
%% Maybe need to bump some counters?
|
||||
ok.
|
||||
|
||||
on_get_status(_InstId, #{producers := Producers}) ->
|
||||
%% Just to pick some producer.
|
||||
RandomVal = emqx_guid:gen(),
|
||||
FakeMsg = [#{key => RandomVal, value => RandomVal}],
|
||||
%% Note: we must not check the connectivity to Kafka itself, but
|
||||
%% only if there are producers available. Otherwise, if Kafka
|
||||
%% goes down, the resource will be considered down and messages
|
||||
%% *will not* be sent to the wolff producers and buffered,
|
||||
%% effectively losing the message as there are no buffer workers
|
||||
%% for Kafka producer.
|
||||
try wolff_producers:pick_producer(Producers, FakeMsg) of
|
||||
{_Partition, _Pid} ->
|
||||
connected
|
||||
catch
|
||||
error:{producer_down, _} ->
|
||||
disconnected
|
||||
end.
|
||||
on_get_status(_InstId, _State) ->
|
||||
connected.
|
||||
|
||||
%% Parse comma separated host:port list into a [{Host,Port}] list
|
||||
hosts(Hosts) ->
|
||||
emqx_schema:parse_servers(Hosts, emqx_ee_bridge_kafka:host_opts()).
|
||||
hosts(Hosts) when is_binary(Hosts) ->
|
||||
hosts(binary_to_list(Hosts));
|
||||
hosts(Hosts) when is_list(Hosts) ->
|
||||
kpro:parse_endpoints(Hosts).
|
||||
|
||||
%% Extra socket options, such as sndbuf size etc.
|
||||
socket_opts(Opts) when is_map(Opts) ->
|
||||
|
@ -252,14 +242,10 @@ producers_config(BridgeName, ClientId, Input) ->
|
|||
mode := BufferMode,
|
||||
per_partition_limit := PerPartitionLimit,
|
||||
segment_bytes := SegmentBytes,
|
||||
memory_overload_protection := MemOLP0
|
||||
memory_overload_protection := MemOLP
|
||||
}
|
||||
} = Input,
|
||||
MemOLP =
|
||||
case os:type() of
|
||||
{unix, linux} -> MemOLP0;
|
||||
_ -> false
|
||||
end,
|
||||
|
||||
{OffloadMode, ReplayqDir} =
|
||||
case BufferMode of
|
||||
memory -> {false, false};
|
||||
|
@ -396,20 +382,23 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
|
|||
%% Event that we do not handle
|
||||
ok.
|
||||
|
||||
-spec telemetry_handler_id(emqx_resource:resource_id()) -> binary().
|
||||
telemetry_handler_id(InstanceID) ->
|
||||
<<"emqx-bridge-kafka-producer-", InstanceID/binary>>.
|
||||
%% Note: don't use the instance/manager ID, as that changes everytime
|
||||
%% the bridge is recreated, and will lead to multiplication of
|
||||
%% metrics.
|
||||
-spec telemetry_handler_id(resource_id()) -> binary().
|
||||
telemetry_handler_id(ResourceID) ->
|
||||
<<"emqx-bridge-kafka-producer-", ResourceID/binary>>.
|
||||
|
||||
uninstall_telemetry_handlers(InstanceID) ->
|
||||
HandlerID = telemetry_handler_id(InstanceID),
|
||||
uninstall_telemetry_handlers(ResourceID) ->
|
||||
HandlerID = telemetry_handler_id(ResourceID),
|
||||
telemetry:detach(HandlerID).
|
||||
|
||||
maybe_install_wolff_telemetry_handlers(InstanceID, ResourceID) ->
|
||||
maybe_install_wolff_telemetry_handlers(ResourceID) ->
|
||||
%% Attach event handlers for Kafka telemetry events. If a handler with the
|
||||
%% handler id already exists, the attach_many function does nothing
|
||||
telemetry:attach_many(
|
||||
%% unique handler id
|
||||
telemetry_handler_id(InstanceID),
|
||||
telemetry_handler_id(ResourceID),
|
||||
[
|
||||
[wolff, dropped],
|
||||
[wolff, dropped_queue_full],
|
||||
|
|
Loading…
Reference in New Issue