diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
index 4db685657..394093918 100644
--- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
+++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
@@ -92,6 +92,18 @@ values(producer) ->
partition_strategy => <<"random">>,
required_acks => <<"all_isr">>,
partition_count_refresh_interval => <<"60s">>,
+ kafka_headers => <<"${pub_props}">>,
+ kafka_ext_headers => [
+ #{
+ kafka_ext_header_key => <<"clientid">>,
+ kafka_ext_header_value => <<"${clientid}">>
+ },
+ #{
+ kafka_ext_header_key => <<"topic">>,
+ kafka_ext_header_value => <<"${topic}">>
+ }
+ ],
+ kafka_header_value_encode_mode => none,
max_inflight => 10,
buffer => #{
mode => <<"hybrid">>,
@@ -281,6 +293,31 @@ fields(producer_kafka_opts) ->
desc => ?DESC(required_acks)
}
)},
+ {kafka_headers,
+ mk(
+ binary(),
+ #{
+ required => false,
+ validator => fun kafka_header_validator/1,
+ desc => ?DESC(kafka_headers)
+ }
+ )},
+ {kafka_ext_headers,
+ mk(
+ hoconsc:array(ref(producer_kafka_ext_headers)),
+ #{
+ desc => ?DESC(producer_kafka_ext_headers),
+ required => false
+ }
+ )},
+ {kafka_header_value_encode_mode,
+ mk(
+ enum([none, json]),
+ #{
+ default => none,
+ desc => ?DESC(kafka_header_value_encode_mode)
+ }
+ )},
{partition_count_refresh_interval,
mk(
emqx_schema:timeout_duration_s(),
@@ -319,6 +356,23 @@ fields(producer_kafka_opts) ->
}
)}
];
+fields(producer_kafka_ext_headers) ->
+ [
+ {kafka_ext_header_key,
+ mk(
+ binary(),
+ #{required => true, desc => ?DESC(producer_kafka_ext_header_key)}
+ )},
+ {kafka_ext_header_value,
+ mk(
+ binary(),
+ #{
+ required => true,
+ validator => fun kafka_ext_header_value_validator/1,
+ desc => ?DESC(producer_kafka_ext_header_value)
+ }
+ )}
+ ];
fields(kafka_message) ->
[
{key, mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC(kafka_message_key)})},
@@ -433,7 +487,8 @@ struct_names() ->
producer_opts,
consumer_opts,
consumer_kafka_opts,
- consumer_topic_mapping
+ consumer_topic_mapping,
+ producer_kafka_ext_headers
].
%% -------------------------------------------------------------------------------------------------
@@ -491,3 +546,27 @@ producer_strategy_key_validator(#{
{error, "Message key cannot be empty when `key_dispatch` strategy is used"};
producer_strategy_key_validator(_) ->
ok.
+
+kafka_header_validator(undefined) ->
+ ok;
+kafka_header_validator(Value) ->
+ case emqx_placeholder:preproc_tmpl(Value) of
+ [{var, _}] ->
+ ok;
+ _ ->
+ {error, "The 'kafka_headers' must be a single placeholder like ${pub_props}"}
+ end.
+
+kafka_ext_header_value_validator(undefined) ->
+ ok;
+kafka_ext_header_value_validator(Value) ->
+ case emqx_placeholder:preproc_tmpl(Value) of
+ [{Type, _}] when Type =:= var orelse Type =:= str ->
+ ok;
+ _ ->
+ {
+ error,
+ "The value of 'kafka_ext_headers' must either be a single "
+ "placeholder like ${foo}, or a simple string."
+ }
+ end.
diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
index be4c2d860..4fa188c95 100644
--- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
+++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
@@ -57,6 +57,9 @@ on_start(InstId, Config) ->
socket_opts := SocketOpts,
ssl := SSL
} = Config,
+ KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
+ KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
+ KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
BridgeType = ?BRIDGE_TYPE,
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId),
@@ -116,7 +119,10 @@ on_start(InstId, Config) ->
resource_id => ResourceId,
sync_query_timeout => SyncQueryTimeout,
hosts => Hosts,
- kafka_config => KafkaConfig
+ kafka_config => KafkaConfig,
+ headers_tokens => KafkaHeadersTokens,
+ ext_headers_tokens => KafkaExtHeadersTokens,
+ headers_val_encode_mode => KafkaHeadersValEncodeMode
}};
{error, Reason2} ->
?SLOG(error, #{
@@ -210,11 +216,19 @@ on_query(
#{
message_template := Template,
producers := Producers,
- sync_query_timeout := SyncTimeout
+ sync_query_timeout := SyncTimeout,
+ headers_tokens := KafkaHeadersTokens,
+ ext_headers_tokens := KafkaExtHeadersTokens,
+ headers_val_encode_mode := KafkaHeadersValEncodeMode
}
) ->
- ?tp(emqx_bridge_kafka_impl_producer_sync_query, #{}),
- KafkaMessage = render_message(Template, Message),
+ KafkaHeaders = #{
+ headers_tokens => KafkaHeadersTokens,
+ ext_headers_tokens => KafkaExtHeadersTokens,
+ headers_val_encode_mode => KafkaHeadersValEncodeMode
+ },
+ KafkaMessage = render_message(Template, KafkaHeaders, Message),
+ ?tp(emqx_bridge_kafka_impl_producer_sync_query, KafkaHeaders),
try
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
ok
@@ -235,10 +249,21 @@ on_query_async(
_InstId,
{send_message, Message},
AsyncReplyFn,
- #{message_template := Template, producers := Producers}
+ #{
+ message_template := Template,
+ producers := Producers,
+ headers_tokens := KafkaHeadersTokens,
+ ext_headers_tokens := KafkaExtHeadersTokens,
+ headers_val_encode_mode := KafkaHeadersValEncodeMode
+ }
) ->
- ?tp(emqx_bridge_kafka_impl_producer_async_query, #{}),
- KafkaMessage = render_message(Template, Message),
+ KafkaHeaders = #{
+ headers_tokens => KafkaHeadersTokens,
+ ext_headers_tokens => KafkaExtHeadersTokens,
+ headers_val_encode_mode => KafkaHeadersValEncodeMode
+ },
+ KafkaMessage = render_message(Template, KafkaHeaders, Message),
+ ?tp(emqx_bridge_kafka_impl_producer_async_query, KafkaHeaders),
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
%% * Must be a single element batch because wolff books calls, but not batch sizes
%% for counters and gauges.
@@ -264,11 +289,25 @@ preproc_tmpl(Tmpl) ->
emqx_placeholder:preproc_tmpl(Tmpl).
render_message(
- #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
+ #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate},
+ #{
+ headers_tokens := KafkaHeadersTokens,
+ ext_headers_tokens := KafkaExtHeadersTokens,
+ headers_val_encode_mode := KafkaHeadersValEncodeMode
+ },
+ Message
) ->
+ ExtHeaders = proc_ext_headers(KafkaExtHeadersTokens, Message),
+ KafkaHeaders =
+ case KafkaHeadersTokens of
+ undefined -> ExtHeaders;
+ HeadersTks -> merge_kafka_headers(HeadersTks, ExtHeaders, Message)
+ end,
+ Headers = formalize_kafka_headers(KafkaHeaders, KafkaHeadersValEncodeMode),
#{
key => render(KeyTemplate, Message),
value => render(ValueTemplate, Message),
+ headers => Headers,
ts => render_timestamp(TimestampTemplate, Message)
}.
@@ -509,3 +548,105 @@ maybe_install_wolff_telemetry_handlers(ResourceID) ->
%% multiplying the metric counts...
#{bridge_id => ResourceID}
).
+
+preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined ->
+ undefined;
+preproc_kafka_headers(HeadersTmpl) ->
+ %% the value is already validated by schema, so we do not validate it again.
+ emqx_placeholder:preproc_tmpl(HeadersTmpl).
+
+preproc_ext_headers(Headers) ->
+ [
+ {emqx_placeholder:preproc_tmpl(K), preproc_ext_headers_value(V)}
+ || #{kafka_ext_header_key := K, kafka_ext_header_value := V} <- Headers
+ ].
+
+preproc_ext_headers_value(ValTmpl) ->
+ %% the value is already validated by schema, so we do not validate it again.
+ emqx_placeholder:preproc_tmpl(ValTmpl).
+
+proc_ext_headers(ExtHeaders, Msg) ->
+ lists:filtermap(
+ fun({KTks, VTks}) ->
+ try
+ Key = proc_ext_headers_key(KTks, Msg),
+ Val = proc_ext_headers_value(VTks, Msg),
+ {true, {Key, Val}}
+ catch
+ throw:placeholder_not_found -> false
+ end
+ end,
+ ExtHeaders
+ ).
+
+proc_ext_headers_key(KeyTks, Msg) ->
+ RawList = emqx_placeholder:proc_tmpl(KeyTks, Msg, #{return => rawlist}),
+ list_to_binary(
+ lists:map(
+ fun
+ (undefined) -> throw(placeholder_not_found);
+ (Key) -> bin(Key)
+ end,
+ RawList
+ )
+ ).
+
+proc_ext_headers_value(ValTks, Msg) ->
+ case emqx_placeholder:proc_tmpl(ValTks, Msg, #{return => rawlist}) of
+ [undefined] -> throw(placeholder_not_found);
+ [Value] -> Value
+ end.
+
+kvlist_headers([], Acc) ->
+ lists:reverse(Acc);
+kvlist_headers([#{key := K, value := V} | Headers], Acc) ->
+ kvlist_headers(Headers, [{K, V} | Acc]);
+kvlist_headers([#{<<"key">> := K, <<"value">> := V} | Headers], Acc) ->
+ kvlist_headers(Headers, [{K, V} | Acc]);
+kvlist_headers([{K, V} | Headers], Acc) ->
+ kvlist_headers(Headers, [{K, V} | Acc]);
+kvlist_headers([BadHeader | _], _) ->
+ throw({bad_kafka_header, BadHeader}).
+
+-define(IS_STR_KEY(K), (is_list(K) orelse is_atom(K) orelse is_binary(K))).
+formalize_kafka_headers(Headers, none) ->
+ %% Note that we will drop all the non-binary values in the NONE mode
+ [{bin(K), V} || {K, V} <- Headers, is_binary(V) andalso ?IS_STR_KEY(K)];
+formalize_kafka_headers(Headers, json) ->
+ lists:filtermap(
+ fun({K, V}) ->
+ try
+ {true, {bin(K), emqx_utils_json:encode(V)}}
+ catch
+ _:_ -> false
+ end
+ end,
+ Headers
+ ).
+
+merge_kafka_headers(HeadersTks, ExtHeaders, Msg) ->
+ case emqx_placeholder:proc_tmpl(HeadersTks, Msg, #{return => rawlist}) of
+ % Headers by map object.
+ [Map] when is_map(Map) ->
+ maps:to_list(Map) ++ ExtHeaders;
+ [KVList] when is_list(KVList) ->
+ kvlist_headers(KVList, []) ++ ExtHeaders;
+ %% the placeholder cannot be found in Msg
+ [undefined] ->
+ ExtHeaders;
+ [MaybeJson] when is_binary(MaybeJson) ->
+ case emqx_utils_json:safe_decode(MaybeJson) of
+ {ok, JsonTerm} when is_map(JsonTerm) ->
+ maps:to_list(JsonTerm) ++ ExtHeaders;
+ {ok, JsonTerm} when is_list(JsonTerm) ->
+ kvlist_headers(JsonTerm, []) ++ ExtHeaders;
+ _ ->
+ throw({bad_kafka_headers, MaybeJson})
+ end;
+ BadHeaders ->
+ throw({bad_kafka_headers, BadHeaders})
+ end.
+
+bin(B) when is_binary(B) -> B;
+bin(L) when is_list(L) -> erlang:list_to_binary(L);
+bin(A) when is_atom(A) -> erlang:atom_to_binary(A, utf8).
diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl
index 6031c21cb..38d58c1e7 100644
--- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl
+++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl
@@ -504,6 +504,180 @@ t_table_removed(_Config) ->
delete_all_bridges(),
ok.
+t_send_message_with_headers(Config) ->
+ HostsString = kafka_hosts_string_sasl(),
+ AuthSettings = valid_sasl_plain_settings(),
+ Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
+ Type = ?BRIDGE_TYPE,
+ Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
+ ResourceId = emqx_bridge_resource:resource_id(Type, Name),
+ BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
+ KafkaTopic = "test-topic-one-partition",
+ Conf = config_with_headers(#{
+ "authentication" => AuthSettings,
+ "kafka_hosts_string" => HostsString,
+ "kafka_topic" => KafkaTopic,
+ "instance_id" => ResourceId,
+ "kafka_headers" => <<"${pub_props}">>,
+ "kafka_ext_headers" => emqx_utils_json:encode(
+ [
+ #{
+ <<"kafka_ext_header_key">> => <<"clientid">>,
+ <<"kafka_ext_header_value">> => <<"${clientid}">>
+ },
+ #{
+ <<"kafka_ext_header_key">> => <<"payload">>,
+ <<"kafka_ext_header_value">> => <<"${payload}">>
+ }
+ ]
+ ),
+ "producer" => #{
+ "kafka" => #{
+ "buffer" => #{
+ "memory_overload_protection" => false
+ }
+ }
+ },
+ "ssl" => #{}
+ }),
+ {ok, #{config := ConfigAtom1}} = emqx_bridge:create(
+ Type, erlang:list_to_atom(Name), Conf
+ ),
+ ConfigAtom = ConfigAtom1#{bridge_name => Name},
+ {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
+ Time = erlang:unique_integer(),
+ BinTime = integer_to_binary(Time),
+ Msg = #{
+ clientid => BinTime,
+ payload => <<"payload">>,
+ timestamp => Time
+ },
+ {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
+ ct:pal("base offset before testing ~p", [Offset]),
+ Kind =
+ case proplists:get_value(query_api, Config) of
+ on_query -> emqx_bridge_kafka_impl_producer_sync_query;
+ on_query_async -> emqx_bridge_kafka_impl_producer_async_query
+ end,
+ ?check_trace(
+ begin
+ ok = send(Config, ResourceId, Msg, State)
+ end,
+ fun(Trace) ->
+ ?assertMatch(
+ [
+ #{
+ ext_headers_tokens := [
+ {
+ [{str, <<"clientid">>}],
+ [{var, [<<"clientid">>]}]
+ },
+ {
+ [{str, <<"payload">>}],
+ [{var, [<<"payload">>]}]
+ }
+ ],
+ headers_tokens := [{var, [<<"pub_props">>]}],
+ headers_val_encode_mode := json
+ }
+ ],
+ ?of_kind(Kind, Trace)
+ )
+ end
+ ),
+ {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
+ ?assertMatch(
+ #kafka_message{
+ headers = [
+ {<<"clientid">>, _},
+ {<<"payload">>, <<"\"payload\"">>}
+ ],
+ key = BinTime
+ },
+ KafkaMsg
+ ),
+ ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
+ %% TODO: refactor those into init/end per testcase
+ ok = ?PRODUCER:on_stop(ResourceId, State),
+ ?assertEqual([], supervisor:which_children(wolff_client_sup)),
+ ?assertEqual([], supervisor:which_children(wolff_producers_sup)),
+ ok = emqx_bridge_resource:remove(BridgeId),
+ delete_all_bridges(),
+ ok.
+
+t_wrong_headers(_Config) ->
+ HostsString = kafka_hosts_string_sasl(),
+ AuthSettings = valid_sasl_plain_settings(),
+ Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
+ Type = ?BRIDGE_TYPE,
+ Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
+ ResourceId = emqx_bridge_resource:resource_id(Type, Name),
+ KafkaTopic = "test-topic-one-partition",
+ ?assertThrow(
+ {
+ emqx_bridge_schema,
+ [
+ #{
+ kind := validation_error,
+ reason := "The 'kafka_headers' must be a single placeholder like ${pub_props}"
+ }
+ ]
+ },
+ config_with_headers(#{
+ "authentication" => AuthSettings,
+ "kafka_hosts_string" => HostsString,
+ "kafka_topic" => KafkaTopic,
+ "instance_id" => ResourceId,
+ "kafka_headers" => <<"wrong_header">>,
+ "kafka_ext_headers" => <<"[]">>,
+ "producer" => #{
+ "kafka" => #{
+ "buffer" => #{
+ "memory_overload_protection" => false
+ }
+ }
+ },
+ "ssl" => #{}
+ })
+ ),
+ ?assertThrow(
+ {
+ emqx_bridge_schema,
+ [
+ #{
+ kind := validation_error,
+ reason :=
+ "The value of 'kafka_ext_headers' must either be a single "
+ "placeholder like ${foo}, or a simple string."
+ }
+ ]
+ },
+ config_with_headers(#{
+ "authentication" => AuthSettings,
+ "kafka_hosts_string" => HostsString,
+ "kafka_topic" => KafkaTopic,
+ "instance_id" => ResourceId,
+ "kafka_headers" => <<"${pub_props}">>,
+ "kafka_ext_headers" => emqx_utils_json:encode(
+ [
+ #{
+ <<"kafka_ext_header_key">> => <<"clientid">>,
+ <<"kafka_ext_header_value">> => <<"wrong ${header}">>
+ }
+ ]
+ ),
+ "producer" => #{
+ "kafka" => #{
+ "buffer" => #{
+ "memory_overload_protection" => false
+ }
+ }
+ },
+ "ssl" => #{}
+ })
+ ),
+ ok.
+
%%------------------------------------------------------------------------------
%% Helper functions
%%------------------------------------------------------------------------------
@@ -644,9 +818,15 @@ config(Args) ->
config(Args, #{}).
config(Args0, More) ->
+ config(Args0, More, fun hocon_config_template/0).
+
+config_with_headers(Args) ->
+ config(Args, #{}, fun hocon_config_template_with_headers/0).
+
+config(Args0, More, ConfigTemplateFun) ->
Args1 = maps:merge(default_config(), Args0),
Args = maps:merge(Args1, More),
- ConfText = hocon_config(Args),
+ ConfText = hocon_config(Args, ConfigTemplateFun),
{ok, Conf} = hocon:binary(ConfText, #{format => map}),
ct:pal("Running tests with conf:\n~p", [Conf]),
InstId = maps:get("instance_id", Args),
@@ -661,7 +841,7 @@ config(Args0, More) ->
#{<<"bridges">> := #{TypeBin := #{Name := Parsed}}} = Conf,
Parsed.
-hocon_config(Args) ->
+hocon_config(Args, ConfigTemplateFun) ->
InstId = maps:get("instance_id", Args),
<<"bridge:", BridgeId/binary>> = InstId,
{_Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId, #{atom_name => false}),
@@ -672,13 +852,17 @@ hocon_config(Args) ->
SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)),
QueryMode = maps:get("query_mode", Args, <<"async">>),
SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf),
+ KafkaHeaders = maps:get("kafka_headers", Args, undefined),
+ KafkaExtHeaders = maps:get("kafka_ext_headers", Args, <<"[]">>),
Hocon = bbmustache:render(
- iolist_to_binary(hocon_config_template()),
+ iolist_to_binary(ConfigTemplateFun()),
Args#{
"authentication" => AuthConfRendered,
"bridge_name" => Name,
"ssl" => SSLConfRendered,
- "query_mode" => QueryMode
+ "query_mode" => QueryMode,
+ "kafka_headers" => KafkaHeaders,
+ "kafka_ext_headers" => KafkaExtHeaders
}
),
Hocon.
@@ -717,6 +901,43 @@ bridges.kafka.{{ bridge_name }} {
}
""".
+%% erlfmt-ignore
+hocon_config_template_with_headers() ->
+%% TODO: rename the type to `kafka_producer' after alias support is
+%% added to hocon; keeping this as just `kafka' for backwards
+%% compatibility.
+"""
+bridges.kafka.{{ bridge_name }} {
+ bootstrap_hosts = \"{{ kafka_hosts_string }}\"
+ enable = true
+ authentication = {{{ authentication }}}
+ ssl = {{{ ssl }}}
+ local_topic = \"{{ local_topic }}\"
+ kafka = {
+ message = {
+ key = \"${clientid}\"
+ value = \"${.payload}\"
+ timestamp = \"${timestamp}\"
+ }
+ buffer = {
+ memory_overload_protection = false
+ }
+ kafka_headers = \"{{ kafka_headers }}\"
+ kafka_header_value_encode_mode: json
+ kafka_ext_headers: {{{ kafka_ext_headers }}}
+ partition_strategy = {{ partition_strategy }}
+ topic = \"{{ kafka_topic }}\"
+ query_mode = {{ query_mode }}
+ }
+ metadata_request_timeout = 5s
+ min_metadata_refresh_interval = 3s
+ socket_opts {
+ nodelay = true
+ }
+ connect_timeout = 5s
+}
+""".
+
%% erlfmt-ignore
hocon_config_template_authentication("none") ->
"none";
diff --git a/changes/ee/feat-11079.en.md b/changes/ee/feat-11079.en.md
new file mode 100644
index 000000000..2ac835fbd
--- /dev/null
+++ b/changes/ee/feat-11079.en.md
@@ -0,0 +1 @@
+Add support for custom headers in messages for Kafka producer bridge.
diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon
index d35e31a12..07b4cdd4e 100644
--- a/rel/i18n/emqx_bridge_kafka.hocon
+++ b/rel/i18n/emqx_bridge_kafka.hocon
@@ -200,6 +200,46 @@ required_acks.desc:
required_acks.label:
"""Required Acks"""
+kafka_headers.desc:
+"""Please provide a placeholder to be used as Kafka Headers
+e.g. ${pub_props}
+Notice that the value of the placeholder must either be an object:
+{\"foo\": \"bar\"}
+or an array of key-value pairs:
+[{\"key\": \"foo\", \"value\": \"bar\"}]
"""
+
+kafka_headers.label:
+"""Kafka Headers"""
+
+producer_kafka_ext_headers.desc:
+"""Please provide more key-value pairs for Kafka headers
+The key-value pairs here will be combined with the
+value of kafka_headers
field before sending to Kafka."""
+
+producer_kafka_ext_headers.label:
+"""Extra Kafka headers"""
+
+producer_kafka_ext_header_key.desc:
+"""Key of the Kafka header. Placeholders in format of ${var} are supported."""
+
+producer_kafka_ext_header_key.label:
+"""Kafka extra header key."""
+
+producer_kafka_ext_header_value.desc:
+"""Value of the Kafka header. Placeholders in format of ${var} are supported."""
+
+producer_kafka_ext_header_value.label:
+"""Value"""
+
+kafka_header_value_encode_mode.desc:
+"""Kafka headers value encode mode
+ - NONE: only add binary values to Kafka headers;
+ - JSON: only add JSON values to Kafka headers,
+and encode it to JSON strings before sending."""
+
+kafka_header_value_encode_mode.label:
+"""Kafka headers value encode mode"""
+
metadata_request_timeout.desc:
"""Maximum wait time when fetching metadata from Kafka."""