diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 6f18b9135..9f67caf5d 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -30,7 +30,7 @@ {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.7"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.16"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.2.1"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.19"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index b2dda453d..b3ceba9ca 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -900,7 +900,7 @@ format_resource( case emqx_bridge_v2:is_bridge_v2_type(Type) of true -> %% The defaults are already filled in - RawConf; + downgrade_raw_conf(Type, RawConf); false -> fill_defaults(Type, RawConf) end, @@ -1164,3 +1164,19 @@ upgrade_type(Type) -> downgrade_type(Type) -> emqx_bridge_lib:downgrade_type(Type). + +%% TODO: move it to callback +downgrade_raw_conf(kafka_producer, RawConf) -> + rename(<<"parameters">>, <<"kafka">>, RawConf); +downgrade_raw_conf(azure_event_hub_producer, RawConf) -> + rename(<<"parameters">>, <<"kafka">>, RawConf); +downgrade_raw_conf(_Type, RawConf) -> + RawConf. + +rename(OldKey, NewKey, Map) -> + case maps:find(OldKey, Map) of + {ok, Value} -> + maps:remove(OldKey, maps:put(NewKey, Value, Map)); + error -> + Map + end. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index 8a6c3ba12..d6d8eb9a1 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -164,9 +164,8 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> end. common_field_names() -> - %% TODO: add 'config' to the list [ - enable, description, local_topic, connector, resource_opts + enable, description, local_topic, connector, resource_opts, parameters ]. -endif. diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index 047519151..bf2cf5438 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -82,7 +82,7 @@ fields("config_bridge_v2") -> fields(actions); fields("config_connector") -> Fields = override( - emqx_bridge_kafka:fields(kafka_connector), + emqx_bridge_kafka:fields("config_connector"), connector_overrides() ), override_documentations(Fields); @@ -117,7 +117,7 @@ fields(kafka_message) -> fields(actions) -> Fields = override( - emqx_bridge_kafka:fields(producer_opts), + emqx_bridge_kafka:producer_opts(), bridge_v2_overrides() ) ++ [ @@ -267,7 +267,7 @@ values(common_config) -> }; values(producer) -> #{ - kafka => #{ + parameters => #{ topic => <<"topic">>, message => #{ key => <<"${.clientid}">>, @@ -378,18 +378,27 @@ producer_overrides() -> ) } ), + %% NOTE: field 'kafka' is renamed to 'parameters' since e5.3.1 + %% We will keep 'kafka' for backward compatibility. + %% TODO: delete this override when we upgrade bridge schema json to 0.2.0 + %% See emqx_conf:bridge_schema_json/0 kafka => mk(ref(producer_kafka_opts), #{ required => true, validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1 }), + parameters => + mk(ref(producer_kafka_opts), #{ + required => true, + validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1 + }), ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}), type => mk(azure_event_hub_producer, #{required => true}) }. bridge_v2_overrides() -> #{ - kafka => + parameters => mk(ref(producer_kafka_opts), #{ required => true, validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1 diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 84d123ffe..5b83a6af2 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -28,10 +28,14 @@ fields/1, desc/1, host_opts/0, - ssl_client_opts_fields/0 + ssl_client_opts_fields/0, + producer_opts/0 ]). --export([kafka_producer_converter/2, producer_strategy_key_validator/1]). +-export([ + kafka_producer_converter/2, + producer_strategy_key_validator/1 +]). %% ------------------------------------------------------------------------------------------------- %% api @@ -251,15 +255,13 @@ fields("get_" ++ Type) -> fields("config_bridge_v2") -> fields(kafka_producer_action); fields("config_connector") -> - fields(kafka_connector); + connector_config_fields(); fields("config_producer") -> fields(kafka_producer); fields("config_consumer") -> fields(kafka_consumer); -fields(kafka_connector) -> - fields("config"); fields(kafka_producer) -> - fields("config") ++ fields(producer_opts); + connector_config_fields() ++ producer_opts(); fields(kafka_producer_action) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, @@ -268,49 +270,9 @@ fields(kafka_producer_action) -> desc => ?DESC(emqx_connector_schema, "connector_field"), required => true })}, {description, emqx_schema:description_schema()} - ] ++ fields(producer_opts); + ] ++ producer_opts(); fields(kafka_consumer) -> - fields("config") ++ fields(consumer_opts); -fields("config") -> - [ - {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {description, emqx_schema:description_schema()}, - {bootstrap_hosts, - mk( - binary(), - #{ - required => true, - desc => ?DESC(bootstrap_hosts), - validator => emqx_schema:servers_validator( - host_opts(), _Required = true - ) - } - )}, - {connect_timeout, - mk(emqx_schema:timeout_duration_ms(), #{ - default => <<"5s">>, - desc => ?DESC(connect_timeout) - })}, - {min_metadata_refresh_interval, - mk( - emqx_schema:timeout_duration_ms(), - #{ - default => <<"3s">>, - desc => ?DESC(min_metadata_refresh_interval) - } - )}, - {metadata_request_timeout, - mk(emqx_schema:timeout_duration_ms(), #{ - default => <<"5s">>, - desc => ?DESC(metadata_request_timeout) - })}, - {authentication, - mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{ - default => none, desc => ?DESC("authentication") - })}, - {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}, - {ssl, mk(ref(ssl_client_opts), #{})} - ]; + connector_config_fields() ++ fields(consumer_opts); fields(ssl_client_opts) -> ssl_client_opts_fields(); fields(auth_username_password) -> @@ -369,20 +331,6 @@ fields(socket_opts) -> validator => fun emqx_schema:validate_tcp_keepalive/1 })} ]; -fields(producer_opts) -> - [ - %% Note: there's an implicit convention in `emqx_bridge' that, - %% for egress bridges with this config, the published messages - %% will be forwarded to such bridges. - {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})}, - {kafka, - mk(ref(producer_kafka_opts), #{ - required => true, - desc => ?DESC(producer_kafka_opts), - validator => fun producer_strategy_key_validator/1 - })}, - {resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})} - ]; fields(producer_kafka_opts) -> [ {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, @@ -580,7 +528,7 @@ fields(resource_opts) -> CreationOpts = emqx_resource_schema:create_opts(_Overrides = []), lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts). -desc("config") -> +desc("config_connector") -> ?DESC("desc_config"); desc(resource_opts) -> ?DESC(emqx_resource_schema, "resource_opts"); @@ -599,34 +547,86 @@ desc("post_" ++ Type) when desc(kafka_producer_action) -> ?DESC("kafka_producer_action"); desc(Name) -> - lists:member(Name, struct_names()) orelse throw({missing_desc, Name}), ?DESC(Name). -struct_names() -> +connector_config_fields() -> [ - auth_gssapi_kerberos, - auth_username_password, - kafka_message, - kafka_producer, - kafka_consumer, - producer_buffer, - producer_kafka_opts, - socket_opts, - producer_opts, - consumer_opts, - consumer_kafka_opts, - consumer_topic_mapping, - producer_kafka_ext_headers, - ssl_client_opts + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {description, emqx_schema:description_schema()}, + {bootstrap_hosts, + mk( + binary(), + #{ + required => true, + desc => ?DESC(bootstrap_hosts), + validator => emqx_schema:servers_validator( + host_opts(), _Required = true + ) + } + )}, + {connect_timeout, + mk(emqx_schema:timeout_duration_ms(), #{ + default => <<"5s">>, + desc => ?DESC(connect_timeout) + })}, + {min_metadata_refresh_interval, + mk( + emqx_schema:timeout_duration_ms(), + #{ + default => <<"3s">>, + desc => ?DESC(min_metadata_refresh_interval) + } + )}, + {metadata_request_timeout, + mk(emqx_schema:timeout_duration_ms(), #{ + default => <<"5s">>, + desc => ?DESC(metadata_request_timeout) + })}, + {authentication, + mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{ + default => none, desc => ?DESC("authentication") + })}, + {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}, + {ssl, mk(ref(ssl_client_opts), #{})} ]. +producer_opts() -> + [ + %% Note: there's an implicit convention in `emqx_bridge' that, + %% for egress bridges with this config, the published messages + %% will be forwarded to such bridges. + {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})}, + parameters_field(), + {resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})} + ]. + +%% Since e5.3.1, we want to rename the field 'kafka' to 'parameters' +%% Hoever we need to keep it backward compatible for generated schema json (version 0.1.0) +%% since schema is data for the 'schemas' API. +parameters_field() -> + {Name, Alias} = + case get(emqx_bridge_schema_version) of + <<"0.1.0">> -> + {kafka, parameters}; + _ -> + {parameters, kafka} + end, + {Name, + mk(ref(producer_kafka_opts), #{ + required => true, + aliases => [Alias], + desc => ?DESC(producer_kafka_opts), + validator => fun producer_strategy_key_validator/1 + })}. + %% ------------------------------------------------------------------------------------------------- %% internal type_field(BridgeV2Type) when BridgeV2Type =:= "connector"; BridgeV2Type =:= "bridge_v2" -> {type, mk(enum([kafka_producer]), #{required => true, desc => ?DESC("desc_type")})}; type_field(_) -> {type, - mk(enum([kafka_consumer, kafka, kafka_producer]), #{ + %% 'kafka' is kept for backward compatibility + mk(enum([kafka, kafka_producer, kafka_consumer]), #{ required => true, desc => ?DESC("desc_type") })}. @@ -641,17 +641,23 @@ kafka_producer_converter(undefined, _HoconOpts) -> kafka_producer_converter( #{<<"producer">> := OldOpts0, <<"bootstrap_hosts">> := _} = Config0, _HoconOpts ) -> - %% old schema + %% prior to e5.0.2 MQTTOpts = maps:get(<<"mqtt">>, OldOpts0, #{}), LocalTopic = maps:get(<<"topic">>, MQTTOpts, undefined), KafkaOpts = maps:get(<<"kafka">>, OldOpts0), Config = maps:without([<<"producer">>], Config0), case LocalTopic =:= undefined of true -> - Config#{<<"kafka">> => KafkaOpts}; + Config#{<<"parameters">> => KafkaOpts}; false -> - Config#{<<"kafka">> => KafkaOpts, <<"local_topic">> => LocalTopic} + Config#{<<"parameters">> => KafkaOpts, <<"local_topic">> => LocalTopic} end; +kafka_producer_converter( + #{<<"kafka">> := _} = Config0, _HoconOpts +) -> + %% from e5.0.2 to e5.3.0 + {KafkaOpts, Config} = maps:take(<<"kafka">>, Config0), + Config#{<<"parameters">> => KafkaOpts}; kafka_producer_converter(Config, _HoconOpts) -> %% new schema Config. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 42585c3e3..5461829fa 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -35,7 +35,7 @@ -define(kafka_client_id, kafka_client_id). -define(kafka_producers, kafka_producers). -query_mode(#{kafka := #{query_mode := sync}}) -> +query_mode(#{parameters := #{query_mode := sync}}) -> simple_sync_internal_buffer; query_mode(_) -> simple_async_internal_buffer. @@ -111,7 +111,7 @@ create_producers_for_bridge_v2( ClientId, #{ bridge_type := BridgeType, - kafka := KafkaConfig + parameters := KafkaConfig } ) -> #{ diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 77e7e9215..1d9682b9b 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -6,6 +6,10 @@ -include_lib("eunit/include/eunit.hrl"). +-export([atoms/0]). +%% ensure atoms exist +atoms() -> [myproducer, my_consumer]. + %%=========================================================================== %% Test cases %%=========================================================================== @@ -14,7 +18,6 @@ kafka_producer_test() -> Conf1 = parse(kafka_producer_old_hocon(_WithLocalTopic0 = false)), Conf2 = parse(kafka_producer_old_hocon(_WithLocalTopic1 = true)), Conf3 = parse(kafka_producer_new_hocon()), - ?assertMatch( #{ <<"bridges">> := @@ -22,7 +25,7 @@ kafka_producer_test() -> <<"kafka_producer">> := #{ <<"myproducer">> := - #{<<"kafka">> := #{}} + #{<<"parameters">> := #{}} } } }, @@ -49,7 +52,7 @@ kafka_producer_test() -> #{ <<"myproducer">> := #{ - <<"kafka">> := #{}, + <<"parameters">> := #{}, <<"local_topic">> := <<"mqtt/local">> } } @@ -65,7 +68,7 @@ kafka_producer_test() -> #{ <<"myproducer">> := #{ - <<"kafka">> := #{}, + <<"parameters">> := #{}, <<"local_topic">> := <<"mqtt/local">> } } @@ -156,12 +159,14 @@ message_key_dispatch_validations_test() -> <<"message">> := #{<<"key">> := <<>>} } }, - emqx_utils_maps:deep_get([<<"bridges">>, <<"kafka">>, atom_to_binary(Name)], Conf) + emqx_utils_maps:deep_get( + [<<"bridges">>, <<"kafka">>, atom_to_binary(Name)], Conf + ) ), ?assertThrow( {_, [ #{ - path := "bridges.kafka_producer.myproducer.kafka", + path := "bridges.kafka_producer.myproducer.parameters", reason := "Message key cannot be empty when `key_dispatch` strategy is used" } ]}, @@ -170,7 +175,7 @@ message_key_dispatch_validations_test() -> ?assertThrow( {_, [ #{ - path := "bridges.kafka_producer.myproducer.kafka", + path := "bridges.kafka_producer.myproducer.parameters", reason := "Message key cannot be empty when `key_dispatch` strategy is used" } ]}, @@ -181,8 +186,6 @@ message_key_dispatch_validations_test() -> tcp_keepalive_validation_test_() -> ProducerConf = parse(kafka_producer_new_hocon()), ConsumerConf = parse(kafka_consumer_hocon()), - %% ensure atoms exist - _ = [my_producer, my_consumer], test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++ test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf). @@ -358,3 +361,10 @@ bridges.kafka_consumer.my_consumer { } } """. + +%% assert compatibility +bridge_schema_json_test() -> + JSON = iolist_to_binary(emqx_conf:bridge_schema_json()), + Map = emqx_utils_json:decode(JSON), + Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>], + ?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)). diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 1efeb4d69..78a39f5dd 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -188,8 +188,14 @@ hotconf_schema_json() -> %% TODO: move this function to emqx_dashboard when we stop generating this JSON at build time. bridge_schema_json() -> - SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => <<"0.1.0">>}, - gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo). + Version = <<"0.1.0">>, + SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => Version}, + put(emqx_bridge_schema_version, Version), + try + gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo) + after + erase(emqx_bridge_schema_version) + end. %% TODO: remove it and also remove hocon_md.erl and friends. %% markdown generation from schema is a failure and we are moving to an interactive diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 52466e9da..c168a0a18 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -453,9 +453,30 @@ update_connector(ConnectorType, ConnectorName, Conf) -> create_or_update_connector(ConnectorType, ConnectorName, Conf, 200). create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) -> + Check = + try + is_binary(ConnectorType) andalso emqx_resource:validate_type(ConnectorType), + ok = emqx_resource:validate_name(ConnectorName) + catch + throw:Error -> + ?BAD_REQUEST(map_to_json(Error)) + end, + case Check of + ok -> + do_create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode); + BadRequest -> + BadRequest + end. + +do_create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) -> case emqx_connector:create(ConnectorType, ConnectorName, Conf) of {ok, _} -> lookup_from_all_nodes(ConnectorType, ConnectorName, HttpStatusCode); + {error, {PreOrPostConfigUpdate, _HandlerMod, Reason}} when + PreOrPostConfigUpdate =:= pre_config_update; + PreOrPostConfigUpdate =:= post_config_update + -> + ?BAD_REQUEST(map_to_json(redact(Reason))); {error, Reason} when is_map(Reason) -> ?BAD_REQUEST(map_to_json(redact(Reason))) end. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index c07856c55..c8ec8e1be 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -43,7 +43,7 @@ connector_structs() -> [ {kafka_producer, mk( - hoconsc:map(name, ref(emqx_bridge_kafka, "config")), + hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")), #{ desc => <<"Kafka Connector Config">>, required => false diff --git a/mix.exs b/mix.exs index 52e63879a..b50be1835 100644 --- a/mix.exs +++ b/mix.exs @@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true}, - {:hocon, github: "emqx/hocon", tag: "0.39.16", override: true}, + {:hocon, github: "emqx/hocon", tag: "0.39.19", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.3", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, diff --git a/rebar.config b/rebar.config index 3ba8edc4b..d2512ece3 100644 --- a/rebar.config +++ b/rebar.config @@ -75,7 +75,7 @@ , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.19"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index f775c8bb4..86e417be1 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -283,13 +283,6 @@ config_enable.desc: config_enable.label: """Enable or Disable""" - -config_connector.desc: -"""Reference to connector""" - -config_connector.label: -"""Connector""" - consumer_mqtt_payload.desc: """The template for transforming the incoming Kafka message. By default, it will use JSON format to serialize inputs from the Kafka message. Such fields are: headers: an object containing string key-value pairs. @@ -316,10 +309,10 @@ kafka_consumer.label: """Kafka Consumer""" desc_config.desc: -"""Configuration for a Kafka bridge.""" +"""Configuration for a Kafka Producer Client.""" desc_config.label: -"""Kafka Bridge Configuration""" +"""Kafka Producer Client Configuration""" consumer_value_encoding_mode.desc: """Defines how the value from the Kafka message is encoded before being forwarded via MQTT.