diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index a4ee994b5..94aff0714 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -33,7 +33,10 @@ on_start(InstId, Config) -> authentication := Auth, ssl := SSL } = Config, - _ = maybe_install_wolff_telemetry_handlers(InstId), + %% TODO: change this to `kafka_producer` after refactoring for kafka_consumer + BridgeType = kafka, + ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), + _ = maybe_install_wolff_telemetry_handlers(InstId, ResourceID), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -137,7 +140,7 @@ on_query(_InstId, {send_message, Message}, #{message_template := Template, produ %% If the producer process is down when sending, this function would %% raise an error exception which is to be caught by the caller of this callback {_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}), - ok. + {async_return, ok}. compile_message_template(#{ key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate @@ -299,62 +302,72 @@ get_required(Field, Config, Throw) -> Value =:= none andalso throw(Throw), Value. +%% we *must* match the bridge id in the event metadata with that in +%% the handler config; otherwise, multiple kafka producer bridges will +%% install multiple handlers to the same wolff events, multiplying the handle_telemetry_event( [wolff, dropped], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:dropped_inc(ID, Val); handle_telemetry_event( [wolff, dropped_queue_full], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:dropped_queue_full_inc(ID, Val); handle_telemetry_event( [wolff, queuing], - #{counter_inc := _Val}, + #{gauge_set := Val}, #{bridge_id := ID, partition_id := PartitionID}, - _HandlerConfig -) when is_integer(_Val) -> - emqx_resource_metrics:queuing_set(ID, PartitionID, 0); + #{bridge_id := ID} +) when is_integer(Val) -> + emqx_resource_metrics:queuing_set(ID, PartitionID, Val); handle_telemetry_event( [wolff, retried], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:retried_inc(ID, Val); handle_telemetry_event( [wolff, failed], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:failed_inc(ID, Val); handle_telemetry_event( [wolff, inflight], - #{counter_inc := _Val}, + #{gauge_set := Val}, #{bridge_id := ID, partition_id := PartitionID}, - _HandlerConfig -) when is_integer(_Val) -> - emqx_resource_metrics:inflight_set(ID, PartitionID, 0); + #{bridge_id := ID} +) when is_integer(Val) -> + emqx_resource_metrics:inflight_set(ID, PartitionID, Val); handle_telemetry_event( [wolff, retried_failed], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:retried_failed_inc(ID, Val); handle_telemetry_event( [wolff, retried_success], #{counter_inc := Val}, #{bridge_id := ID}, - _HandlerConfig + #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:retried_success_inc(ID, Val); +handle_telemetry_event( + [wolff, success], + #{counter_inc := Val}, + #{bridge_id := ID}, + #{bridge_id := ID} +) when is_integer(Val) -> + emqx_resource_metrics:success_inc(ID, Val); handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Event that we do not handle ok. @@ -367,17 +380,12 @@ uninstall_telemetry_handlers(InstanceID) -> HandlerID = telemetry_handler_id(InstanceID), telemetry:detach(HandlerID). -maybe_install_wolff_telemetry_handlers(InstanceID) -> +maybe_install_wolff_telemetry_handlers(InstanceID, ResourceID) -> %% Attach event handlers for Kafka telemetry events. If a handler with the %% handler id already exists, the attach_many function does nothing telemetry:attach_many( %% unique handler id telemetry_handler_id(InstanceID), - %% Note: we don't handle `[wolff, success]' because, - %% currently, we already increment the success counter for - %% this resource at `emqx_rule_runtime:handle_action' when - %% the response is `ok' and we would double increment it - %% here. [ [wolff, dropped], [wolff, dropped_queue_full], @@ -386,8 +394,13 @@ maybe_install_wolff_telemetry_handlers(InstanceID) -> [wolff, failed], [wolff, inflight], [wolff, retried_failed], - [wolff, retried_success] + [wolff, retried_success], + [wolff, success] ], fun ?MODULE:handle_telemetry_event/4, - [] + %% we *must* keep track of the same id that is handed down to + %% wolff producers; otherwise, multiple kafka producer bridges + %% will install multiple handlers to the same wolff events, + %% multiplying the metric counts... + #{bridge_id => ResourceID} ). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 44149826d..8ada818e2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -390,7 +390,7 @@ t_failed_creation_then_fix(_Config) -> }, {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), ct:pal("base offset before testing ~p", [Offset]), - ?assertEqual(ok, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)), + ?assertEqual({async_return, ok}, ?PRODUCER:on_query(ResourceId, {send_message, Msg}, State)), {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), %% TODO: refactor those into init/end per testcase @@ -455,7 +455,7 @@ publish_helper(#{ StartRes = ?PRODUCER:on_start(InstId, Conf), {ok, State} = StartRes, OnQueryRes = ?PRODUCER:on_query(InstId, {send_message, Msg}, State), - ok = OnQueryRes, + {async_return, ok} = OnQueryRes, {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), ok = ?PRODUCER:on_stop(InstId, State),