diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 09712c29d..0a03f582f 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -33,7 +33,7 @@ on_start(InstId, Config) -> authentication := Auth, ssl := SSL } = Config, - _ = maybe_install_wolff_telemetry_handlers(), + _ = maybe_install_wolff_telemetry_handlers(InstId), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -89,7 +89,7 @@ on_start(InstId, Config) -> throw(failed_to_start_kafka_producer) end. -on_stop(_InstId, #{client_id := ClientID, producers := Producers}) -> +on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) -> with_log_at_error( fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, #{ @@ -103,6 +103,13 @@ on_stop(_InstId, #{client_id := ClientID, producers := Producers}) -> msg => "failed_to_delete_kafka_client", client_id => ClientID } + ), + with_log_at_error( + fun() -> uninstall_telemetry_handlers(InstanceID) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + client_id => ClientID + } ). %% @doc The callback API for rule-engine (or bridge without rules) @@ -337,12 +344,20 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Event that we do not handle ok. -maybe_install_wolff_telemetry_handlers() -> +-spec telemetry_handler_id(emqx_resource:resource_id()) -> binary(). +telemetry_handler_id(InstanceID) -> + <<"emqx-bridge-kafka-producer-", InstanceID/binary, "-telemetry-handler">>. + +uninstall_telemetry_handlers(InstanceID) -> + HandlerID = telemetry_handler_id(InstanceID), + telemetry:detach(HandlerID). + +maybe_install_wolff_telemetry_handlers(InstanceID) -> %% Attach event handlers for Kafka telemetry events. If a handler with the %% handler id already exists, the attach_many function does nothing telemetry:attach_many( %% unique handler id - <<"emqx-bridge-kafka-producer-telemetry-handler">>, + telemetry_handler_id(InstanceID), %% Note: we don't handle `[wolff, success]' because, %% currently, we already increment the success counter for %% this resource at `emqx_rule_runtime:handle_action' when @@ -358,6 +373,6 @@ maybe_install_wolff_telemetry_handlers() -> [wolff, retried_failed], [wolff, retried_success] ], - fun emqx_bridge_impl_kafka_producer:handle_telemetry_event/4, + fun ?MODULE:handle_telemetry_event/4, [] ).