From a852695950dd8658aede8ab394cf79969ebce427 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 13 Mar 2024 09:57:46 -0300 Subject: [PATCH] fix(kafka_consumer): validate topic mapping in v2 schema Fixes https://emqx.atlassian.net/browse/EMQX-12008 --- .../src/emqx_bridge_kafka.erl | 3 ++ .../src/emqx_bridge_kafka_consumer_schema.erl | 8 ++++- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 35 +++++++++++++++++-- 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 84bbe01c1..f021eaa84 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -32,6 +32,9 @@ producer_opts/1 ]). +%% Internal export to be used in v2 schema +-export([consumer_topic_mapping_validator/1]). + -export([ kafka_connector_config_fields/0, kafka_producer_converter/2, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl index 266d3f7d9..b5b0224de 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl @@ -65,7 +65,7 @@ fields(source_parameters) -> type => hocon_schema:field_schema(Sc, type), required => false, default => [], - validator => fun(_) -> ok end, + validator => fun legacy_consumer_topic_mapping_validator/1, importance => ?IMPORTANCE_HIDDEN }, {Name, hocon_schema:override(Sc, Override)}; @@ -231,3 +231,9 @@ connector_example(put) -> start_timeout => <<"5s">> } }. + +legacy_consumer_topic_mapping_validator(_TopicMapping = []) -> + %% Can be (and should be, unless it has migrated from v1) empty in v2. + ok; +legacy_consumer_topic_mapping_validator(TopicMapping = [_ | _]) -> + emqx_bridge_kafka:consumer_topic_mapping_validator(TopicMapping). diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 402841f99..56aabb1c3 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -74,6 +74,7 @@ testcases(once) -> t_node_joins_existing_cluster, t_cluster_node_down, t_multiple_topic_mappings, + t_duplicated_kafka_topics, t_dynamic_mqtt_topic, t_resource_manager_crash_after_subscriber_started, t_resource_manager_crash_before_subscriber_started @@ -292,7 +293,10 @@ end_per_group(_Group, _Config) -> init_per_testcase(t_cluster_group = TestCase, Config0) -> Config = emqx_utils:merge_opts(Config0, [{num_partitions, 6}]), common_init_per_testcase(TestCase, Config); -init_per_testcase(t_multiple_topic_mappings = TestCase, Config0) -> +init_per_testcase(TestCase, Config0) when + TestCase =:= t_multiple_topic_mappings; + TestCase =:= t_duplicated_kafka_topics +-> KafkaTopicBase = << (atom_to_binary(TestCase))/binary, @@ -671,7 +675,12 @@ authentication(_) -> parse_and_check(ConfigString, Name) -> {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), TypeBin = ?BRIDGE_TYPE_BIN, - hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{TypeBin := #{Name := _}}} = + hocon_tconf:check_plain( + emqx_bridge_schema, + RawConf, + #{required => false, atom_key => false} + ), #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf, Config. @@ -1359,6 +1368,28 @@ t_multiple_topic_mappings(Config) -> ), ok. +%% Although we have a test for the v1 schema, the v1 compatibility layer does some +%% shenanigans that do not go through V1 schema validations... +t_duplicated_kafka_topics(Config) -> + #{<<"topic_mapping">> := [#{<<"kafka_topic">> := KT} | _] = TM0} = + ?config(kafka_config, Config), + TM = [M#{<<"kafka_topic">> := KT} || M <- TM0], + ?check_trace( + begin + ?assertMatch( + {error, {{_, 400, _}, _, _}}, + create_bridge_api( + Config, + #{<<"topic_mapping">> => TM} + ) + ), + + ok + end, + [] + ), + ok. + t_on_get_status(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config),