diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 2d7f5b5be..0746736f3 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -15,7 +15,8 @@ api_schemas(Method) -> [ ref(emqx_ee_bridge_gcp_pubsub, Method), - ref(emqx_ee_bridge_kafka, Method), + ref(emqx_ee_bridge_kafka, Method ++ "_consumer"), + ref(emqx_ee_bridge_kafka, Method ++ "_producer"), ref(emqx_ee_bridge_mysql, Method), ref(emqx_ee_bridge_pgsql, Method), ref(emqx_ee_bridge_mongodb, Method ++ "_rs"), @@ -147,9 +148,9 @@ kafka_structs() -> {Type, mk( hoconsc:map(name, ref(emqx_ee_bridge_kafka, Type)), - #{desc => <<"EMQX Enterprise Config">>, required => false} + #{desc => <<"Kafka ", Name/binary, " Bridge Config">>, required => false} )} - || Type <- [kafka_producer, kafka_consumer] + || {Type, Name} <- [{kafka_producer, <<"Producer">>}, {kafka_consumer, <<"Consumer">>}] ]. influxdb_structs() -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index f812901be..d8f7f7fc8 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -36,8 +36,14 @@ conn_bridge_examples(Method) -> [ #{ - <<"kafka">> => #{ - summary => <<"Kafka Bridge">>, + <<"kafka_producer">> => #{ + summary => <<"Kafka Producer Bridge">>, + value => values(Method) + } + }, + #{ + <<"kafka_consumer">> => #{ + summary => <<"Kafka Consumer Bridge">>, value => values(Method) } } @@ -60,14 +66,18 @@ host_opts() -> namespace() -> "bridge_kafka". -roots() -> ["config"]. +roots() -> ["config_consumer", "config_producer"]. -fields("post") -> - [type_field(), name_field() | fields("config")]; -fields("put") -> - fields("config"); -fields("get") -> - emqx_bridge_schema:status_fields() ++ fields("post"); +fields("post_" ++ Type) -> + [type_field(), name_field() | fields("config_" ++ Type)]; +fields("put_" ++ Type) -> + fields("config_" ++ Type); +fields("get_" ++ Type) -> + emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type); +fields("config_producer") -> + fields(kafka_producer); +fields("config_consumer") -> + fields(kafka_consumer); fields(kafka_producer) -> fields("config") ++ fields(producer_opts); fields(kafka_consumer) -> @@ -292,8 +302,12 @@ fields(consumer_kafka_opts) -> desc("config") -> ?DESC("desc_config"); -desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> - ["Configuration for Kafka using `", string:to_upper(Method), "` method."]; +desc("get_" ++ Type) when Type =:= "consumer"; Type =:= "producer" -> + ["Configuration for Kafka using `GET` method."]; +desc("put_" ++ Type) when Type =:= "consumer"; Type =:= "producer" -> + ["Configuration for Kafka using `PUT` method."]; +desc("post_" ++ Type) when Type =:= "consumer"; Type =:= "producer" -> + ["Configuration for Kafka using `POST` method."]; desc(Name) -> lists:member(Name, struct_names()) orelse throw({missing_desc, Name}), ?DESC(Name). @@ -317,7 +331,8 @@ struct_names() -> %% ------------------------------------------------------------------------------------------------- %% internal type_field() -> - {type, mk(enum([kafka]), #{required => true, desc => ?DESC("desc_type")})}. + {type, + mk(enum([kafka_consumer, kafka_producer]), #{required => true, desc => ?DESC("desc_type")})}. name_field() -> {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.