diff --git a/.ci/docker-compose-file/kafka/kafka-entrypoint.sh b/.ci/docker-compose-file/kafka/kafka-entrypoint.sh index 336a78e74..987bfbccd 100755 --- a/.ci/docker-compose-file/kafka/kafka-entrypoint.sh +++ b/.ci/docker-compose-file/kafka/kafka-entrypoint.sh @@ -49,6 +49,9 @@ echo "+++++++ Creating Kafka Topics ++++++++" # there seem to be a race condition when creating the topics (too early) env KAFKA_CREATE_TOPICS="$KAFKA_CREATE_TOPICS_NG" KAFKA_PORT="$PORT1" create-topics.sh +# create a topic with max.message.bytes=100 +/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server "${SERVER}:${PORT1}" --topic max-100-bytes --partitions 1 --replication-factor 1 --config max.message.bytes=100 + echo "+++++++ Wait until Kafka ports are down ++++++++" bash -c 'while printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1 diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 5236d9a0e..a0cc8def3 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index a3997ada7..f518c8d4f 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index 78569b321..500c5a394 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 53f2603d4..872e0027a 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -487,12 +487,17 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> %% Wolff producer never gives up retrying %% so there can only be 'ok' results. on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) -> - %% the ReplyFn is emqx_resource_buffer_worker:handle_async_reply/2 + %% the ReplyFn is emqx_rule_runtime:inc_action_metrics/2 apply(ReplyFn, Args ++ [ok]); on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> - %% wolff should bump the dropped_queue_full counter - %% do not apply the callback (which is basically to bump success or fail counter) - ok. + %% wolff should bump the dropped_queue_full counter in handle_telemetry_event/4 + %% so there is no need to apply the callback here + ok; +on_kafka_ack(_Partition, message_too_large, {ReplyFn, Args}) -> + %% wolff should bump the message 'dropped' counter with handle_telemetry_event/4. + %% however 'dropped' is not mapped to EMQX metrics name + %% so we reply error here + apply(ReplyFn, Args ++ [{error, message_too_large}]). %% Note: since wolff client has its own replayq that is not managed by %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index b51bd196c..afd6e05a5 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -185,6 +185,10 @@ action_config(ConnectorName, Overrides) -> emqx_utils_maps:deep_merge(Cfg1, Overrides). bridge_v2_config(ConnectorName) -> + KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + bridge_v2_config(ConnectorName, KafkaTopic). + +bridge_v2_config(ConnectorName, KafkaTopic) -> #{ <<"connector">> => ConnectorName, <<"enable">> => true, @@ -209,9 +213,7 @@ bridge_v2_config(ConnectorName) -> <<"query_mode">> => <<"sync">>, <<"required_acks">> => <<"all_isr">>, <<"sync_query_timeout">> => <<"5s">>, - <<"topic">> => list_to_binary( - emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() - ) + <<"topic">> => list_to_binary(KafkaTopic) }, <<"local_topic">> => <<"kafka_t/#">>, <<"resource_opts">> => #{ @@ -378,6 +380,28 @@ t_local_topic(_) -> ok = emqx_connector:remove(?TYPE, test_connector), ok. +t_message_too_large(_) -> + BridgeV2Config = bridge_v2_config(<<"test_connector4">>, "max-100-bytes"), + ConnectorConfig = connector_config(), + {ok, _} = emqx_connector:create(?TYPE, test_connector4, ConnectorConfig), + BridgeName = test_bridge4, + {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, BridgeV2Config), + BridgeV2Id = emqx_bridge_v2:id(?TYPE, BridgeName), + TooLargePayload = iolist_to_binary(lists:duplicate(100, 100)), + ?assertEqual(0, emqx_resource_metrics:failed_get(BridgeV2Id)), + emqx:publish(emqx_message:make(<<"kafka_t/hej">>, TooLargePayload)), + ?retry( + _Sleep0 = 50, + _Attempts0 = 100, + begin + ?assertEqual(1, emqx_resource_metrics:failed_get(BridgeV2Id)), + ok + end + ), + ok = emqx_bridge_v2:remove(?TYPE, BridgeName), + ok = emqx_connector:remove(?TYPE, test_connector4), + ok. + t_unknown_topic(_Config) -> ConnectorName = <<"test_connector">>, BridgeName = <<"test_bridge">>, diff --git a/changes/ee/fix-13079.en.md b/changes/ee/fix-13277.en.md similarity index 100% rename from changes/ee/fix-13079.en.md rename to changes/ee/fix-13277.en.md diff --git a/mix.exs b/mix.exs index 157039274..e37ab83b2 100644 --- a/mix.exs +++ b/mix.exs @@ -211,7 +211,7 @@ defmodule EMQXUmbrella.MixProject do {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.10.5"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.18.0"},