diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl index 8fe0d6351..a4bb48aec 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl @@ -90,6 +90,9 @@ fields(source_parameters) -> binary(), #{ required => false, + validator => [ + emqx_resource_validator:not_empty("Group id must not be empty") + ], desc => ?DESC(group_id) } )} diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 54f6f9efc..4de01c98d 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -204,6 +204,41 @@ test_keepalive_validation(Name, Conf) -> [?_assertThrow(_, check(C)) || C <- InvalidConfs] ++ [?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs]. +%% assert compatibility +bridge_schema_json_test() -> + JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()), + Map = emqx_utils_json:decode(JSON), + Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>], + ?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)). + +custom_group_id_test() -> + BaseConfig = kafka_consumer_source_config(), + BadSourceConfig = emqx_utils_maps:deep_merge( + BaseConfig, + #{<<"parameters">> => #{<<"group_id">> => <<>>}} + ), + ?assertThrow( + {_, [ + #{ + path := "sources.kafka_consumer.my_consumer.parameters.group_id", + reason := "Group id must not be empty" + } + ]}, + emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, BadSourceConfig) + ), + + CustomId = <<"custom_id">>, + OkSourceConfig = emqx_utils_maps:deep_merge( + BaseConfig, + #{<<"parameters">> => #{<<"group_id">> => CustomId}} + ), + ?assertMatch( + #{<<"parameters">> := #{<<"group_id">> := CustomId}}, + emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, OkSourceConfig) + ), + + ok. + %%=========================================================================== %% Helper functions %%=========================================================================== @@ -355,9 +390,21 @@ kafka_consumer_hocon() -> "\n }" "\n }". -%% assert compatibility -bridge_schema_json_test() -> - JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()), - Map = emqx_utils_json:decode(JSON), - Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>], - ?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)). +kafka_consumer_source_config() -> + #{ + <<"enable">> => true, + <<"connector">> => <<"my_connector">>, + <<"parameters">> => + #{ + <<"key_encoding_mode">> => <<"none">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_rejoin_attempts">> => <<"5">>, + <<"offset_reset_policy">> => <<"latest">>, + <<"topic">> => <<"please override">>, + <<"value_encoding_mode">> => <<"none">> + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"2s">>, + <<"resume_interval">> => <<"2s">> + } + }.