diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index ac5177f6e..2540b987c 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -145,7 +145,7 @@ fields(producer_opts) -> })} ]; fields(producer_mqtt_opts) -> - [{topic, mk(string(), #{desc => ?DESC(mqtt_topic)})}]; + [{topic, mk(binary(), #{desc => ?DESC(mqtt_topic)})}]; fields(producer_kafka_opts) -> [ {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 5e8635d44..09712c29d 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -33,7 +33,7 @@ on_start(InstId, Config) -> authentication := Auth, ssl := SSL } = Config, - maybe_install_wolff_telemetry_handlers(), + _ = maybe_install_wolff_telemetry_handlers(), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -226,7 +226,9 @@ producers_config(BridgeName, ClientId, Input) -> disk -> {false, replayq_dir(ClientId)}; hybrid -> {true, replayq_dir(ClientId)} end, - BridgeNameBin = erlang:atom_to_binary(BridgeName), + %% TODO: change this once we add kafka source + BridgeType = kafka, + ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), #{ name => make_producer_name(BridgeName), partitioner => PartitionStrategy, @@ -240,8 +242,7 @@ producers_config(BridgeName, ClientId, Input) -> max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, compression => Compression, - telemetry_meta_data => - #{bridge_id => <<<<"bridge:kafka:">>/binary, BridgeNameBin/binary>>} + telemetry_meta_data => #{bridge_id => ResourceID} }. replayq_dir(ClientId) -> @@ -280,66 +281,59 @@ handle_telemetry_event( [wolff, dropped], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:dropped_inc(ID, Val); handle_telemetry_event( [wolff, dropped_queue_full], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:dropped_queue_full_inc(ID, Val); handle_telemetry_event( [wolff, queuing], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> - emqx_resource_metrics:queuing_inc(ID, Val); + emqx_resource_metrics:queuing_change(ID, Val); handle_telemetry_event( [wolff, retried], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:retried_inc(ID, Val); handle_telemetry_event( [wolff, failed], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:failed_inc(ID, Val); handle_telemetry_event( [wolff, inflight], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> - emqx_resource_metrics:inflight_inc(ID, Val); + emqx_resource_metrics:inflight_change(ID, Val); handle_telemetry_event( [wolff, retried_failed], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:retried_failed_inc(ID, Val); handle_telemetry_event( [wolff, retried_success], #{counter_inc := Val}, #{bridge_id := ID}, - _ + _HandlerConfig ) when is_integer(Val) -> emqx_resource_metrics:retried_success_inc(ID, Val); -handle_telemetry_event( - [wolff, success], - #{counter_inc := Val}, - #{bridge_id := ID}, - _ -) when is_integer(Val) -> - emqx_resource_metrics:success_inc(ID, Val); -handle_telemetry_event(_EventId, _Metrics, _MetaData, _Config) -> +handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Event that we do not handle ok. @@ -349,6 +343,11 @@ maybe_install_wolff_telemetry_handlers() -> telemetry:attach_many( %% unique handler id <<"emqx-bridge-kafka-producer-telemetry-handler">>, + %% Note: we don't handle `[wolff, success]' because, + %% currently, we already increment the success counter for + %% this resource at `emqx_rule_runtime:handle_action' when + %% the response is `ok' and we would double increment it + %% here. [ [wolff, dropped], [wolff, dropped_queue_full], @@ -357,8 +356,7 @@ maybe_install_wolff_telemetry_handlers() -> [wolff, failed], [wolff, inflight], [wolff, retried_failed], - [wolff, retried_success], - [wolff, success] + [wolff, retried_success] ], fun emqx_bridge_impl_kafka_producer:handle_telemetry_event/4, [] diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 152862f6b..2eef1170d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -323,6 +323,22 @@ kafka_bridge_rest_api_helper(Config) -> <<"sql">> => <<"SELECT * from \"kafka_bridge_topic/#\"">> } ), + %% counters should be empty before + ?assertEqual(0, emqx_resource_metrics:matched_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:success_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)), %% Get offset before sending message {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), %% Send message to topic and check that it got forwarded to Kafka @@ -335,12 +351,21 @@ kafka_bridge_rest_api_helper(Config) -> {ok, {_, [KafkaMsg]}} = show(BrodOut), Body = KafkaMsg#kafka_message.value, %% Check crucial counters and gauges - 1 = emqx_resource_metrics:matched_get(ResourceId), - 1 = emqx_resource_metrics:success_get(ResourceId), - 0 = emqx_resource_metrics:dropped_get(ResourceId), - 0 = emqx_resource_metrics:failed_get(ResourceId), - 0 = emqx_resource_metrics:inflight_get(ResourceId), - 0 = emqx_resource_metrics:queuing_get(ResourceId), + ?assertEqual(1, emqx_resource_metrics:matched_get(ResourceId)), + ?assertEqual(1, emqx_resource_metrics:success_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_failed_get(ResourceId)), + ?assertEqual(0, emqx_resource_metrics:retried_success_get(ResourceId)), %% Perform operations {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), @@ -452,7 +477,7 @@ hocon_config_template() -> """ bootstrap_hosts = \"{{ kafka_hosts_string }}\" enable = true -authentication = {{{ authentication }}} +authentication = {{{ authentication }}} ssl = {{{ ssl }}} producer = { mqtt {