From 3ac4ddcbe376deb930ef33dced07d238c6a99354 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 17 Jun 2024 17:11:48 +0200 Subject: [PATCH] fix(kafka): handle message_too_large bump 'failed' counter --- .../src/emqx_bridge_kafka_impl_producer.erl | 13 +++++--- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 30 +++++++++++++++++-- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 53f2603d4..872e0027a 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -487,12 +487,17 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> %% Wolff producer never gives up retrying %% so there can only be 'ok' results. on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) -> - %% the ReplyFn is emqx_resource_buffer_worker:handle_async_reply/2 + %% the ReplyFn is emqx_rule_runtime:inc_action_metrics/2 apply(ReplyFn, Args ++ [ok]); on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> - %% wolff should bump the dropped_queue_full counter - %% do not apply the callback (which is basically to bump success or fail counter) - ok. + %% wolff should bump the dropped_queue_full counter in handle_telemetry_event/4 + %% so there is no need to apply the callback here + ok; +on_kafka_ack(_Partition, message_too_large, {ReplyFn, Args}) -> + %% wolff should bump the message 'dropped' counter with handle_telemetry_event/4. + %% however 'dropped' is not mapped to EMQX metrics name + %% so we reply error here + apply(ReplyFn, Args ++ [{error, message_too_large}]). %% Note: since wolff client has its own replayq that is not managed by %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index b51bd196c..afd6e05a5 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -185,6 +185,10 @@ action_config(ConnectorName, Overrides) -> emqx_utils_maps:deep_merge(Cfg1, Overrides). bridge_v2_config(ConnectorName) -> + KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + bridge_v2_config(ConnectorName, KafkaTopic). + +bridge_v2_config(ConnectorName, KafkaTopic) -> #{ <<"connector">> => ConnectorName, <<"enable">> => true, @@ -209,9 +213,7 @@ bridge_v2_config(ConnectorName) -> <<"query_mode">> => <<"sync">>, <<"required_acks">> => <<"all_isr">>, <<"sync_query_timeout">> => <<"5s">>, - <<"topic">> => list_to_binary( - emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() - ) + <<"topic">> => list_to_binary(KafkaTopic) }, <<"local_topic">> => <<"kafka_t/#">>, <<"resource_opts">> => #{ @@ -378,6 +380,28 @@ t_local_topic(_) -> ok = emqx_connector:remove(?TYPE, test_connector), ok. +t_message_too_large(_) -> + BridgeV2Config = bridge_v2_config(<<"test_connector4">>, "max-100-bytes"), + ConnectorConfig = connector_config(), + {ok, _} = emqx_connector:create(?TYPE, test_connector4, ConnectorConfig), + BridgeName = test_bridge4, + {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, BridgeV2Config), + BridgeV2Id = emqx_bridge_v2:id(?TYPE, BridgeName), + TooLargePayload = iolist_to_binary(lists:duplicate(100, 100)), + ?assertEqual(0, emqx_resource_metrics:failed_get(BridgeV2Id)), + emqx:publish(emqx_message:make(<<"kafka_t/hej">>, TooLargePayload)), + ?retry( + _Sleep0 = 50, + _Attempts0 = 100, + begin + ?assertEqual(1, emqx_resource_metrics:failed_get(BridgeV2Id)), + ok + end + ), + ok = emqx_bridge_v2:remove(?TYPE, BridgeName), + ok = emqx_connector:remove(?TYPE, test_connector4), + ok. + t_unknown_topic(_Config) -> ConnectorName = <<"test_connector">>, BridgeName = <<"test_bridge">>,