diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 9c9f05852..890a07a30 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -31,10 +31,9 @@ -include_lib("emqx/include/logger.hrl"). %% Allocatable resources --define(kafka_telementry_id, kafka_telementry_id). +-define(kafka_telemetry_id, kafka_telemetry_id). -define(kafka_client_id, kafka_client_id). -define(kafka_producers, kafka_producers). --define(CONNECTOR_TYPE, kafka). query_mode(#{kafka := #{query_mode := sync}}) -> simple_sync_internal_buffer; @@ -43,22 +42,22 @@ query_mode(_) -> callback_mode() -> async_if_possible. -%% @doc Config schema is defined in emqx_connector_kafka. +%% @doc Config schema is defined in emqx_bridge_kafka. on_start(InstId, Config) -> #{ authentication := Auth, bootstrap_hosts := Hosts0, connector_name := ConnectorName, + connector_type := ConnectorType, connect_timeout := ConnTimeout, metadata_request_timeout := MetaReqTimeout, min_metadata_refresh_interval := MinMetaRefreshInterval, socket_opts := SocketOpts, ssl := SSL } = Config, - ConnectorType = ?CONNECTOR_TYPE, ResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName), Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), - ClientId = emqx_bridge_kafka_impl:make_client_id(?CONNECTOR_TYPE, ConnectorName), + ClientId = emqx_bridge_kafka_impl:make_client_id(ConnectorType, ConnectorName), ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), ClientConfig = #{ min_metadata_refresh_interval => MinMetaRefreshInterval, @@ -160,7 +159,7 @@ create_producers_for_bridge_v2( {ok, Producers} -> ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers), ok = emqx_resource:allocate_resource( - InstId, {?kafka_telementry_id, BridgeV2Id}, BridgeV2Id + InstId, {?kafka_telemetry_id, BridgeV2Id}, BridgeV2Id ), _ = maybe_install_wolff_telemetry_handlers(BridgeV2Id), {ok, #{ @@ -203,8 +202,8 @@ on_stop(InstanceId, _State) -> fun ({?kafka_producers, _BridgeV2Id}, Producers) -> deallocate_producers(ClientId, Producers); - ({?kafka_telementry_id, _BridgeV2Id}, TelementryId) -> - deallocate_telementry_handlers(TelementryId); + ({?kafka_telemetry_id, _BridgeV2Id}, TelemetryId) -> + deallocate_telemetry_handlers(TelemetryId); (_, _) -> ok end, @@ -231,12 +230,12 @@ deallocate_producers(ClientId, Producers) -> } ). -deallocate_telementry_handlers(TelementryId) -> +deallocate_telemetry_handlers(TelemetryId) -> _ = with_log_at_error( - fun() -> uninstall_telemetry_handlers(TelementryId) end, + fun() -> uninstall_telemetry_handlers(TelemetryId) end, #{ msg => "failed_to_uninstall_telemetry_handlers", - resource_id => TelementryId + resource_id => TelemetryId } ). @@ -249,10 +248,10 @@ remove_producers_for_bridge_v2( fun ({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id -> deallocate_producers(ClientId, Producers); - ({?kafka_telementry_id, BridgeV2IdCheck}, TelementryId) when + ({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when BridgeV2IdCheck =:= BridgeV2Id -> - deallocate_telementry_handlers(TelementryId); + deallocate_telemetry_handlers(TelemetryId); (_, _) -> ok end,