Merge pull request #12694 from thalesmg/fix-kconsu-tm-validation-r56-20240313
fix(kafka_consumer): validate topic mapping in v2 schema
This commit is contained in:
commit
d11949ac51
|
@ -32,6 +32,9 @@
|
||||||
producer_opts/1
|
producer_opts/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% Internal export to be used in v2 schema
|
||||||
|
-export([consumer_topic_mapping_validator/1]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
kafka_connector_config_fields/0,
|
kafka_connector_config_fields/0,
|
||||||
kafka_producer_converter/2,
|
kafka_producer_converter/2,
|
||||||
|
|
|
@ -65,7 +65,7 @@ fields(source_parameters) ->
|
||||||
type => hocon_schema:field_schema(Sc, type),
|
type => hocon_schema:field_schema(Sc, type),
|
||||||
required => false,
|
required => false,
|
||||||
default => [],
|
default => [],
|
||||||
validator => fun(_) -> ok end,
|
validator => fun legacy_consumer_topic_mapping_validator/1,
|
||||||
importance => ?IMPORTANCE_HIDDEN
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
},
|
},
|
||||||
{Name, hocon_schema:override(Sc, Override)};
|
{Name, hocon_schema:override(Sc, Override)};
|
||||||
|
@ -231,3 +231,9 @@ connector_example(put) ->
|
||||||
start_timeout => <<"5s">>
|
start_timeout => <<"5s">>
|
||||||
}
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
legacy_consumer_topic_mapping_validator(_TopicMapping = []) ->
|
||||||
|
%% Can be (and should be, unless it has migrated from v1) empty in v2.
|
||||||
|
ok;
|
||||||
|
legacy_consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
|
||||||
|
emqx_bridge_kafka:consumer_topic_mapping_validator(TopicMapping).
|
||||||
|
|
|
@ -74,6 +74,7 @@ testcases(once) ->
|
||||||
t_node_joins_existing_cluster,
|
t_node_joins_existing_cluster,
|
||||||
t_cluster_node_down,
|
t_cluster_node_down,
|
||||||
t_multiple_topic_mappings,
|
t_multiple_topic_mappings,
|
||||||
|
t_duplicated_kafka_topics,
|
||||||
t_dynamic_mqtt_topic,
|
t_dynamic_mqtt_topic,
|
||||||
t_resource_manager_crash_after_subscriber_started,
|
t_resource_manager_crash_after_subscriber_started,
|
||||||
t_resource_manager_crash_before_subscriber_started
|
t_resource_manager_crash_before_subscriber_started
|
||||||
|
@ -292,7 +293,10 @@ end_per_group(_Group, _Config) ->
|
||||||
init_per_testcase(t_cluster_group = TestCase, Config0) ->
|
init_per_testcase(t_cluster_group = TestCase, Config0) ->
|
||||||
Config = emqx_utils:merge_opts(Config0, [{num_partitions, 6}]),
|
Config = emqx_utils:merge_opts(Config0, [{num_partitions, 6}]),
|
||||||
common_init_per_testcase(TestCase, Config);
|
common_init_per_testcase(TestCase, Config);
|
||||||
init_per_testcase(t_multiple_topic_mappings = TestCase, Config0) ->
|
init_per_testcase(TestCase, Config0) when
|
||||||
|
TestCase =:= t_multiple_topic_mappings;
|
||||||
|
TestCase =:= t_duplicated_kafka_topics
|
||||||
|
->
|
||||||
KafkaTopicBase =
|
KafkaTopicBase =
|
||||||
<<
|
<<
|
||||||
(atom_to_binary(TestCase))/binary,
|
(atom_to_binary(TestCase))/binary,
|
||||||
|
@ -671,7 +675,12 @@ authentication(_) ->
|
||||||
parse_and_check(ConfigString, Name) ->
|
parse_and_check(ConfigString, Name) ->
|
||||||
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
||||||
TypeBin = ?BRIDGE_TYPE_BIN,
|
TypeBin = ?BRIDGE_TYPE_BIN,
|
||||||
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
#{<<"bridges">> := #{TypeBin := #{Name := _}}} =
|
||||||
|
hocon_tconf:check_plain(
|
||||||
|
emqx_bridge_schema,
|
||||||
|
RawConf,
|
||||||
|
#{required => false, atom_key => false}
|
||||||
|
),
|
||||||
#{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
|
#{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
@ -1359,6 +1368,28 @@ t_multiple_topic_mappings(Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Although we have a test for the v1 schema, the v1 compatibility layer does some
|
||||||
|
%% shenanigans that do not go through V1 schema validations...
|
||||||
|
t_duplicated_kafka_topics(Config) ->
|
||||||
|
#{<<"topic_mapping">> := [#{<<"kafka_topic">> := KT} | _] = TM0} =
|
||||||
|
?config(kafka_config, Config),
|
||||||
|
TM = [M#{<<"kafka_topic">> := KT} || M <- TM0],
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?assertMatch(
|
||||||
|
{error, {{_, 400, _}, _, _}},
|
||||||
|
create_bridge_api(
|
||||||
|
Config,
|
||||||
|
#{<<"topic_mapping">> => TM}
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_on_get_status(Config) ->
|
t_on_get_status(Config) ->
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
ProxyHost = ?config(proxy_host, Config),
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
|
Loading…
Reference in New Issue