diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 666ab3f8e..a012a7ab0 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -1,8 +1,10 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_impl_kafka_producer). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + %% callbacks of behaviour emqx_resource -export([ callback_mode/0, @@ -36,7 +38,7 @@ on_start(InstId, Config) -> %% TODO: change this to `kafka_producer` after refactoring for kafka_consumer BridgeType = kafka, ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), - _ = maybe_install_wolff_telemetry_handlers(InstId, ResourceID), + _ = maybe_install_wolff_telemetry_handlers(ResourceID), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -79,7 +81,8 @@ on_start(InstId, Config) -> {ok, #{ message_template => compile_message_template(MessageTemplate), client_id => ClientId, - producers => Producers + producers => Producers, + resource_id => ResourceID }}; {error, Reason2} -> ?SLOG(error, #{ @@ -105,7 +108,7 @@ on_start(InstId, Config) -> throw(failed_to_start_kafka_producer) end. -on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) -> +on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) -> _ = with_log_at_error( fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, #{ @@ -121,7 +124,7 @@ on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) -> } ), with_log_at_error( - fun() -> uninstall_telemetry_handlers(InstanceID) end, + fun() -> uninstall_telemetry_handlers(ResourceID) end, #{ msg => "failed_to_uninstall_telemetry_handlers", client_id => ClientID @@ -176,27 +179,14 @@ on_kafka_ack(_Partition, _Offset, _Extra) -> %% Maybe need to bump some counters? ok. -on_get_status(_InstId, #{producers := Producers}) -> - %% Just to pick some producer. - RandomVal = emqx_guid:gen(), - FakeMsg = [#{key => RandomVal, value => RandomVal}], - %% Note: we must not check the connectivity to Kafka itself, but - %% only if there are producers available. Otherwise, if Kafka - %% goes down, the resource will be considered down and messages - %% *will not* be sent to the wolff producers and buffered, - %% effectively losing the message as there are no buffer workers - %% for Kafka producer. - try wolff_producers:pick_producer(Producers, FakeMsg) of - {_Partition, _Pid} -> - connected - catch - error:{producer_down, _} -> - disconnected - end. +on_get_status(_InstId, _State) -> + connected. %% Parse comma separated host:port list into a [{Host,Port}] list -hosts(Hosts) -> - emqx_schema:parse_servers(Hosts, emqx_ee_bridge_kafka:host_opts()). +hosts(Hosts) when is_binary(Hosts) -> + hosts(binary_to_list(Hosts)); +hosts(Hosts) when is_list(Hosts) -> + kpro:parse_endpoints(Hosts). %% Extra socket options, such as sndbuf size etc. socket_opts(Opts) when is_map(Opts) -> @@ -252,14 +242,10 @@ producers_config(BridgeName, ClientId, Input) -> mode := BufferMode, per_partition_limit := PerPartitionLimit, segment_bytes := SegmentBytes, - memory_overload_protection := MemOLP0 + memory_overload_protection := MemOLP } } = Input, - MemOLP = - case os:type() of - {unix, linux} -> MemOLP0; - _ -> false - end, + {OffloadMode, ReplayqDir} = case BufferMode of memory -> {false, false}; @@ -396,20 +382,23 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Event that we do not handle ok. --spec telemetry_handler_id(emqx_resource:resource_id()) -> binary(). -telemetry_handler_id(InstanceID) -> - <<"emqx-bridge-kafka-producer-", InstanceID/binary>>. +%% Note: don't use the instance/manager ID, as that changes everytime +%% the bridge is recreated, and will lead to multiplication of +%% metrics. +-spec telemetry_handler_id(resource_id()) -> binary(). +telemetry_handler_id(ResourceID) -> + <<"emqx-bridge-kafka-producer-", ResourceID/binary>>. -uninstall_telemetry_handlers(InstanceID) -> - HandlerID = telemetry_handler_id(InstanceID), +uninstall_telemetry_handlers(ResourceID) -> + HandlerID = telemetry_handler_id(ResourceID), telemetry:detach(HandlerID). -maybe_install_wolff_telemetry_handlers(InstanceID, ResourceID) -> +maybe_install_wolff_telemetry_handlers(ResourceID) -> %% Attach event handlers for Kafka telemetry events. If a handler with the %% handler id already exists, the attach_many function does nothing telemetry:attach_many( %% unique handler id - telemetry_handler_id(InstanceID), + telemetry_handler_id(ResourceID), [ [wolff, dropped], [wolff, dropped_queue_full],