fix(kafka_producer): avoid multiplication of metrics when bridge is recreated
This commit is contained in:
parent
8e59319bfe
commit
0fd8880d0a
|
@ -1,8 +1,10 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_bridge_impl_kafka_producer).
|
-module(emqx_bridge_impl_kafka_producer).
|
||||||
|
|
||||||
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
|
||||||
%% callbacks of behaviour emqx_resource
|
%% callbacks of behaviour emqx_resource
|
||||||
-export([
|
-export([
|
||||||
callback_mode/0,
|
callback_mode/0,
|
||||||
|
@ -36,7 +38,7 @@ on_start(InstId, Config) ->
|
||||||
%% TODO: change this to `kafka_producer` after refactoring for kafka_consumer
|
%% TODO: change this to `kafka_producer` after refactoring for kafka_consumer
|
||||||
BridgeType = kafka,
|
BridgeType = kafka,
|
||||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||||
_ = maybe_install_wolff_telemetry_handlers(InstId, ResourceID),
|
_ = maybe_install_wolff_telemetry_handlers(ResourceID),
|
||||||
%% it's a bug if producer config is not found
|
%% it's a bug if producer config is not found
|
||||||
%% the caller should not try to start a producer if
|
%% the caller should not try to start a producer if
|
||||||
%% there is no producer config
|
%% there is no producer config
|
||||||
|
@ -79,7 +81,8 @@ on_start(InstId, Config) ->
|
||||||
{ok, #{
|
{ok, #{
|
||||||
message_template => compile_message_template(MessageTemplate),
|
message_template => compile_message_template(MessageTemplate),
|
||||||
client_id => ClientId,
|
client_id => ClientId,
|
||||||
producers => Producers
|
producers => Producers,
|
||||||
|
resource_id => ResourceID
|
||||||
}};
|
}};
|
||||||
{error, Reason2} ->
|
{error, Reason2} ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
|
@ -105,7 +108,7 @@ on_start(InstId, Config) ->
|
||||||
throw(failed_to_start_kafka_producer)
|
throw(failed_to_start_kafka_producer)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) ->
|
on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) ->
|
||||||
_ = with_log_at_error(
|
_ = with_log_at_error(
|
||||||
fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
|
fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
|
||||||
#{
|
#{
|
||||||
|
@ -121,7 +124,7 @@ on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) ->
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
with_log_at_error(
|
with_log_at_error(
|
||||||
fun() -> uninstall_telemetry_handlers(InstanceID) end,
|
fun() -> uninstall_telemetry_handlers(ResourceID) end,
|
||||||
#{
|
#{
|
||||||
msg => "failed_to_uninstall_telemetry_handlers",
|
msg => "failed_to_uninstall_telemetry_handlers",
|
||||||
client_id => ClientID
|
client_id => ClientID
|
||||||
|
@ -176,27 +179,14 @@ on_kafka_ack(_Partition, _Offset, _Extra) ->
|
||||||
%% Maybe need to bump some counters?
|
%% Maybe need to bump some counters?
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
on_get_status(_InstId, #{producers := Producers}) ->
|
on_get_status(_InstId, _State) ->
|
||||||
%% Just to pick some producer.
|
connected.
|
||||||
RandomVal = emqx_guid:gen(),
|
|
||||||
FakeMsg = [#{key => RandomVal, value => RandomVal}],
|
|
||||||
%% Note: we must not check the connectivity to Kafka itself, but
|
|
||||||
%% only if there are producers available. Otherwise, if Kafka
|
|
||||||
%% goes down, the resource will be considered down and messages
|
|
||||||
%% *will not* be sent to the wolff producers and buffered,
|
|
||||||
%% effectively losing the message as there are no buffer workers
|
|
||||||
%% for Kafka producer.
|
|
||||||
try wolff_producers:pick_producer(Producers, FakeMsg) of
|
|
||||||
{_Partition, _Pid} ->
|
|
||||||
connected
|
|
||||||
catch
|
|
||||||
error:{producer_down, _} ->
|
|
||||||
disconnected
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% Parse comma separated host:port list into a [{Host,Port}] list
|
%% Parse comma separated host:port list into a [{Host,Port}] list
|
||||||
hosts(Hosts) ->
|
hosts(Hosts) when is_binary(Hosts) ->
|
||||||
emqx_schema:parse_servers(Hosts, emqx_ee_bridge_kafka:host_opts()).
|
hosts(binary_to_list(Hosts));
|
||||||
|
hosts(Hosts) when is_list(Hosts) ->
|
||||||
|
kpro:parse_endpoints(Hosts).
|
||||||
|
|
||||||
%% Extra socket options, such as sndbuf size etc.
|
%% Extra socket options, such as sndbuf size etc.
|
||||||
socket_opts(Opts) when is_map(Opts) ->
|
socket_opts(Opts) when is_map(Opts) ->
|
||||||
|
@ -252,14 +242,10 @@ producers_config(BridgeName, ClientId, Input) ->
|
||||||
mode := BufferMode,
|
mode := BufferMode,
|
||||||
per_partition_limit := PerPartitionLimit,
|
per_partition_limit := PerPartitionLimit,
|
||||||
segment_bytes := SegmentBytes,
|
segment_bytes := SegmentBytes,
|
||||||
memory_overload_protection := MemOLP0
|
memory_overload_protection := MemOLP
|
||||||
}
|
}
|
||||||
} = Input,
|
} = Input,
|
||||||
MemOLP =
|
|
||||||
case os:type() of
|
|
||||||
{unix, linux} -> MemOLP0;
|
|
||||||
_ -> false
|
|
||||||
end,
|
|
||||||
{OffloadMode, ReplayqDir} =
|
{OffloadMode, ReplayqDir} =
|
||||||
case BufferMode of
|
case BufferMode of
|
||||||
memory -> {false, false};
|
memory -> {false, false};
|
||||||
|
@ -396,20 +382,23 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
|
||||||
%% Event that we do not handle
|
%% Event that we do not handle
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec telemetry_handler_id(emqx_resource:resource_id()) -> binary().
|
%% Note: don't use the instance/manager ID, as that changes everytime
|
||||||
telemetry_handler_id(InstanceID) ->
|
%% the bridge is recreated, and will lead to multiplication of
|
||||||
<<"emqx-bridge-kafka-producer-", InstanceID/binary>>.
|
%% metrics.
|
||||||
|
-spec telemetry_handler_id(resource_id()) -> binary().
|
||||||
|
telemetry_handler_id(ResourceID) ->
|
||||||
|
<<"emqx-bridge-kafka-producer-", ResourceID/binary>>.
|
||||||
|
|
||||||
uninstall_telemetry_handlers(InstanceID) ->
|
uninstall_telemetry_handlers(ResourceID) ->
|
||||||
HandlerID = telemetry_handler_id(InstanceID),
|
HandlerID = telemetry_handler_id(ResourceID),
|
||||||
telemetry:detach(HandlerID).
|
telemetry:detach(HandlerID).
|
||||||
|
|
||||||
maybe_install_wolff_telemetry_handlers(InstanceID, ResourceID) ->
|
maybe_install_wolff_telemetry_handlers(ResourceID) ->
|
||||||
%% Attach event handlers for Kafka telemetry events. If a handler with the
|
%% Attach event handlers for Kafka telemetry events. If a handler with the
|
||||||
%% handler id already exists, the attach_many function does nothing
|
%% handler id already exists, the attach_many function does nothing
|
||||||
telemetry:attach_many(
|
telemetry:attach_many(
|
||||||
%% unique handler id
|
%% unique handler id
|
||||||
telemetry_handler_id(InstanceID),
|
telemetry_handler_id(ResourceID),
|
||||||
[
|
[
|
||||||
[wolff, dropped],
|
[wolff, dropped],
|
||||||
[wolff, dropped_queue_full],
|
[wolff, dropped_queue_full],
|
||||||
|
|
Loading…
Reference in New Issue