feat(kafka): add support for kafka headers and ext headers

Fixes https://emqx.atlassian.net/browse/EMQX-9175
This commit is contained in:
Paulo Zulato 2023-06-16 15:43:23 -03:00
parent 62d3766726
commit f47cc2a458
5 changed files with 495 additions and 13 deletions

View File

@ -92,6 +92,18 @@ values(producer) ->
partition_strategy => <<"random">>,
required_acks => <<"all_isr">>,
partition_count_refresh_interval => <<"60s">>,
kafka_headers => <<"${pub_props}">>,
kafka_ext_headers => [
#{
kafka_ext_header_key => <<"clientid">>,
kafka_ext_header_value => <<"${clientid}">>
},
#{
kafka_ext_header_key => <<"topic">>,
kafka_ext_header_value => <<"${topic}">>
}
],
kafka_header_value_encode_mode => none,
max_inflight => 10,
buffer => #{
mode => <<"hybrid">>,
@ -281,6 +293,31 @@ fields(producer_kafka_opts) ->
desc => ?DESC(required_acks)
}
)},
{kafka_headers,
mk(
binary(),
#{
required => false,
validator => fun kafka_header_validator/1,
desc => ?DESC(kafka_headers)
}
)},
{kafka_ext_headers,
mk(
hoconsc:array(ref(producer_kafka_ext_headers)),
#{
desc => ?DESC(producer_kafka_ext_headers),
required => false
}
)},
{kafka_header_value_encode_mode,
mk(
enum([none, json]),
#{
default => none,
desc => ?DESC(kafka_header_value_encode_mode)
}
)},
{partition_count_refresh_interval,
mk(
emqx_schema:timeout_duration_s(),
@ -319,6 +356,23 @@ fields(producer_kafka_opts) ->
}
)}
];
fields(producer_kafka_ext_headers) ->
[
{kafka_ext_header_key,
mk(
binary(),
#{required => true, desc => ?DESC(producer_kafka_ext_header_key)}
)},
{kafka_ext_header_value,
mk(
binary(),
#{
required => true,
validator => fun kafka_ext_header_value_validator/1,
desc => ?DESC(producer_kafka_ext_header_value)
}
)}
];
fields(kafka_message) ->
[
{key, mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC(kafka_message_key)})},
@ -433,7 +487,8 @@ struct_names() ->
producer_opts,
consumer_opts,
consumer_kafka_opts,
consumer_topic_mapping
consumer_topic_mapping,
producer_kafka_ext_headers
].
%% -------------------------------------------------------------------------------------------------
@ -491,3 +546,27 @@ producer_strategy_key_validator(#{
{error, "Message key cannot be empty when `key_dispatch` strategy is used"};
producer_strategy_key_validator(_) ->
ok.
kafka_header_validator(undefined) ->
ok;
kafka_header_validator(Value) ->
case emqx_placeholder:preproc_tmpl(Value) of
[{var, _}] ->
ok;
_ ->
{error, "The 'kafka_headers' must be a single placeholder like ${pub_props}"}
end.
kafka_ext_header_value_validator(undefined) ->
ok;
kafka_ext_header_value_validator(Value) ->
case emqx_placeholder:preproc_tmpl(Value) of
[{Type, _}] when Type =:= var orelse Type =:= str ->
ok;
_ ->
{
error,
"The value of 'kafka_ext_headers' must either be a single "
"placeholder like ${foo}, or a simple string."
}
end.

View File

@ -57,6 +57,9 @@ on_start(InstId, Config) ->
socket_opts := SocketOpts,
ssl := SSL
} = Config,
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
BridgeType = ?BRIDGE_TYPE,
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId),
@ -116,7 +119,10 @@ on_start(InstId, Config) ->
resource_id => ResourceId,
sync_query_timeout => SyncQueryTimeout,
hosts => Hosts,
kafka_config => KafkaConfig
kafka_config => KafkaConfig,
headers_tokens => KafkaHeadersTokens,
ext_headers_tokens => KafkaExtHeadersTokens,
headers_val_encode_mode => KafkaHeadersValEncodeMode
}};
{error, Reason2} ->
?SLOG(error, #{
@ -210,11 +216,19 @@ on_query(
#{
message_template := Template,
producers := Producers,
sync_query_timeout := SyncTimeout
sync_query_timeout := SyncTimeout,
headers_tokens := KafkaHeadersTokens,
ext_headers_tokens := KafkaExtHeadersTokens,
headers_val_encode_mode := KafkaHeadersValEncodeMode
}
) ->
?tp(emqx_bridge_kafka_impl_producer_sync_query, #{}),
KafkaMessage = render_message(Template, Message),
KafkaHeaders = #{
headers_tokens => KafkaHeadersTokens,
ext_headers_tokens => KafkaExtHeadersTokens,
headers_val_encode_mode => KafkaHeadersValEncodeMode
},
KafkaMessage = render_message(Template, KafkaHeaders, Message),
?tp(emqx_bridge_kafka_impl_producer_sync_query, KafkaHeaders),
try
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
ok
@ -235,10 +249,21 @@ on_query_async(
_InstId,
{send_message, Message},
AsyncReplyFn,
#{message_template := Template, producers := Producers}
#{
message_template := Template,
producers := Producers,
headers_tokens := KafkaHeadersTokens,
ext_headers_tokens := KafkaExtHeadersTokens,
headers_val_encode_mode := KafkaHeadersValEncodeMode
}
) ->
?tp(emqx_bridge_kafka_impl_producer_async_query, #{}),
KafkaMessage = render_message(Template, Message),
KafkaHeaders = #{
headers_tokens => KafkaHeadersTokens,
ext_headers_tokens => KafkaExtHeadersTokens,
headers_val_encode_mode => KafkaHeadersValEncodeMode
},
KafkaMessage = render_message(Template, KafkaHeaders, Message),
?tp(emqx_bridge_kafka_impl_producer_async_query, KafkaHeaders),
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
%% * Must be a single element batch because wolff books calls, but not batch sizes
%% for counters and gauges.
@ -264,11 +289,25 @@ preproc_tmpl(Tmpl) ->
emqx_placeholder:preproc_tmpl(Tmpl).
render_message(
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate},
#{
headers_tokens := KafkaHeadersTokens,
ext_headers_tokens := KafkaExtHeadersTokens,
headers_val_encode_mode := KafkaHeadersValEncodeMode
},
Message
) ->
ExtHeaders = proc_ext_headers(KafkaExtHeadersTokens, Message),
KafkaHeaders =
case KafkaHeadersTokens of
undefined -> ExtHeaders;
HeadersTks -> merge_kafka_headers(HeadersTks, ExtHeaders, Message)
end,
Headers = formalize_kafka_headers(KafkaHeaders, KafkaHeadersValEncodeMode),
#{
key => render(KeyTemplate, Message),
value => render(ValueTemplate, Message),
headers => Headers,
ts => render_timestamp(TimestampTemplate, Message)
}.
@ -509,3 +548,105 @@ maybe_install_wolff_telemetry_handlers(ResourceID) ->
%% multiplying the metric counts...
#{bridge_id => ResourceID}
).
preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined ->
undefined;
preproc_kafka_headers(HeadersTmpl) ->
%% the value is already validated by schema, so we do not validate it again.
emqx_placeholder:preproc_tmpl(HeadersTmpl).
preproc_ext_headers(Headers) ->
[
{emqx_placeholder:preproc_tmpl(K), preproc_ext_headers_value(V)}
|| #{kafka_ext_header_key := K, kafka_ext_header_value := V} <- Headers
].
preproc_ext_headers_value(ValTmpl) ->
%% the value is already validated by schema, so we do not validate it again.
emqx_placeholder:preproc_tmpl(ValTmpl).
proc_ext_headers(ExtHeaders, Msg) ->
lists:filtermap(
fun({KTks, VTks}) ->
try
Key = proc_ext_headers_key(KTks, Msg),
Val = proc_ext_headers_value(VTks, Msg),
{true, {Key, Val}}
catch
throw:placeholder_not_found -> false
end
end,
ExtHeaders
).
proc_ext_headers_key(KeyTks, Msg) ->
RawList = emqx_placeholder:proc_tmpl(KeyTks, Msg, #{return => rawlist}),
list_to_binary(
lists:map(
fun
(undefined) -> throw(placeholder_not_found);
(Key) -> bin(Key)
end,
RawList
)
).
proc_ext_headers_value(ValTks, Msg) ->
case emqx_placeholder:proc_tmpl(ValTks, Msg, #{return => rawlist}) of
[undefined] -> throw(placeholder_not_found);
[Value] -> Value
end.
kvlist_headers([], Acc) ->
lists:reverse(Acc);
kvlist_headers([#{key := K, value := V} | Headers], Acc) ->
kvlist_headers(Headers, [{K, V} | Acc]);
kvlist_headers([#{<<"key">> := K, <<"value">> := V} | Headers], Acc) ->
kvlist_headers(Headers, [{K, V} | Acc]);
kvlist_headers([{K, V} | Headers], Acc) ->
kvlist_headers(Headers, [{K, V} | Acc]);
kvlist_headers([BadHeader | _], _) ->
throw({bad_kafka_header, BadHeader}).
-define(IS_STR_KEY(K), (is_list(K) orelse is_atom(K) orelse is_binary(K))).
formalize_kafka_headers(Headers, none) ->
%% Note that we will drop all the non-binary values in the NONE mode
[{bin(K), V} || {K, V} <- Headers, is_binary(V) andalso ?IS_STR_KEY(K)];
formalize_kafka_headers(Headers, json) ->
lists:filtermap(
fun({K, V}) ->
try
{true, {bin(K), emqx_utils_json:encode(V)}}
catch
_:_ -> false
end
end,
Headers
).
merge_kafka_headers(HeadersTks, ExtHeaders, Msg) ->
case emqx_placeholder:proc_tmpl(HeadersTks, Msg, #{return => rawlist}) of
% Headers by map object.
[Map] when is_map(Map) ->
maps:to_list(Map) ++ ExtHeaders;
[KVList] when is_list(KVList) ->
kvlist_headers(KVList, []) ++ ExtHeaders;
%% the placeholder cannot be found in Msg
[undefined] ->
ExtHeaders;
[MaybeJson] when is_binary(MaybeJson) ->
case emqx_utils_json:safe_decode(MaybeJson) of
{ok, JsonTerm} when is_map(JsonTerm) ->
maps:to_list(JsonTerm) ++ ExtHeaders;
{ok, JsonTerm} when is_list(JsonTerm) ->
kvlist_headers(JsonTerm, []) ++ ExtHeaders;
_ ->
throw({bad_kafka_headers, MaybeJson})
end;
BadHeaders ->
throw({bad_kafka_headers, BadHeaders})
end.
bin(B) when is_binary(B) -> B;
bin(L) when is_list(L) -> erlang:list_to_binary(L);
bin(A) when is_atom(A) -> erlang:atom_to_binary(A, utf8).

View File

@ -504,6 +504,180 @@ t_table_removed(_Config) ->
delete_all_bridges(),
ok.
t_send_message_with_headers(Config) ->
HostsString = kafka_hosts_string_sasl(),
AuthSettings = valid_sasl_plain_settings(),
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Type = ?BRIDGE_TYPE,
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
KafkaTopic = "test-topic-one-partition",
Conf = config_with_headers(#{
"authentication" => AuthSettings,
"kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic,
"instance_id" => ResourceId,
"kafka_headers" => <<"${pub_props}">>,
"kafka_ext_headers" => emqx_utils_json:encode(
[
#{
<<"kafka_ext_header_key">> => <<"clientid">>,
<<"kafka_ext_header_value">> => <<"${clientid}">>
},
#{
<<"kafka_ext_header_key">> => <<"payload">>,
<<"kafka_ext_header_value">> => <<"${payload}">>
}
]
),
"producer" => #{
"kafka" => #{
"buffer" => #{
"memory_overload_protection" => false
}
}
},
"ssl" => #{}
}),
{ok, #{config := ConfigAtom1}} = emqx_bridge:create(
Type, erlang:list_to_atom(Name), Conf
),
ConfigAtom = ConfigAtom1#{bridge_name => Name},
{ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Msg = #{
clientid => BinTime,
payload => <<"payload">>,
timestamp => Time
},
{ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
ct:pal("base offset before testing ~p", [Offset]),
Kind =
case proplists:get_value(query_api, Config) of
on_query -> emqx_bridge_kafka_impl_producer_sync_query;
on_query_async -> emqx_bridge_kafka_impl_producer_async_query
end,
?check_trace(
begin
ok = send(Config, ResourceId, Msg, State)
end,
fun(Trace) ->
?assertMatch(
[
#{
ext_headers_tokens := [
{
[{str, <<"clientid">>}],
[{var, [<<"clientid">>]}]
},
{
[{str, <<"payload">>}],
[{var, [<<"payload">>]}]
}
],
headers_tokens := [{var, [<<"pub_props">>]}],
headers_val_encode_mode := json
}
],
?of_kind(Kind, Trace)
)
end
),
{ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
?assertMatch(
#kafka_message{
headers = [
{<<"clientid">>, _},
{<<"payload">>, <<"\"payload\"">>}
],
key = BinTime
},
KafkaMsg
),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
%% TODO: refactor those into init/end per testcase
ok = ?PRODUCER:on_stop(ResourceId, State),
?assertEqual([], supervisor:which_children(wolff_client_sup)),
?assertEqual([], supervisor:which_children(wolff_producers_sup)),
ok = emqx_bridge_resource:remove(BridgeId),
delete_all_bridges(),
ok.
t_wrong_headers(_Config) ->
HostsString = kafka_hosts_string_sasl(),
AuthSettings = valid_sasl_plain_settings(),
Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]),
Type = ?BRIDGE_TYPE,
Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash),
ResourceId = emqx_bridge_resource:resource_id(Type, Name),
KafkaTopic = "test-topic-one-partition",
?assertThrow(
{
emqx_bridge_schema,
[
#{
kind := validation_error,
reason := "The 'kafka_headers' must be a single placeholder like ${pub_props}"
}
]
},
config_with_headers(#{
"authentication" => AuthSettings,
"kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic,
"instance_id" => ResourceId,
"kafka_headers" => <<"wrong_header">>,
"kafka_ext_headers" => <<"[]">>,
"producer" => #{
"kafka" => #{
"buffer" => #{
"memory_overload_protection" => false
}
}
},
"ssl" => #{}
})
),
?assertThrow(
{
emqx_bridge_schema,
[
#{
kind := validation_error,
reason :=
"The value of 'kafka_ext_headers' must either be a single "
"placeholder like ${foo}, or a simple string."
}
]
},
config_with_headers(#{
"authentication" => AuthSettings,
"kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic,
"instance_id" => ResourceId,
"kafka_headers" => <<"${pub_props}">>,
"kafka_ext_headers" => emqx_utils_json:encode(
[
#{
<<"kafka_ext_header_key">> => <<"clientid">>,
<<"kafka_ext_header_value">> => <<"wrong ${header}">>
}
]
),
"producer" => #{
"kafka" => #{
"buffer" => #{
"memory_overload_protection" => false
}
}
},
"ssl" => #{}
})
),
ok.
%%------------------------------------------------------------------------------
%% Helper functions
%%------------------------------------------------------------------------------
@ -644,9 +818,15 @@ config(Args) ->
config(Args, #{}).
config(Args0, More) ->
config(Args0, More, fun hocon_config_template/0).
config_with_headers(Args) ->
config(Args, #{}, fun hocon_config_template_with_headers/0).
config(Args0, More, ConfigTemplateFun) ->
Args1 = maps:merge(default_config(), Args0),
Args = maps:merge(Args1, More),
ConfText = hocon_config(Args),
ConfText = hocon_config(Args, ConfigTemplateFun),
{ok, Conf} = hocon:binary(ConfText, #{format => map}),
ct:pal("Running tests with conf:\n~p", [Conf]),
InstId = maps:get("instance_id", Args),
@ -661,7 +841,7 @@ config(Args0, More) ->
#{<<"bridges">> := #{TypeBin := #{Name := Parsed}}} = Conf,
Parsed.
hocon_config(Args) ->
hocon_config(Args, ConfigTemplateFun) ->
InstId = maps:get("instance_id", Args),
<<"bridge:", BridgeId/binary>> = InstId,
{_Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId, #{atom_name => false}),
@ -672,13 +852,17 @@ hocon_config(Args) ->
SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)),
QueryMode = maps:get("query_mode", Args, <<"async">>),
SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf),
KafkaHeaders = maps:get("kafka_headers", Args, undefined),
KafkaExtHeaders = maps:get("kafka_ext_headers", Args, <<"[]">>),
Hocon = bbmustache:render(
iolist_to_binary(hocon_config_template()),
iolist_to_binary(ConfigTemplateFun()),
Args#{
"authentication" => AuthConfRendered,
"bridge_name" => Name,
"ssl" => SSLConfRendered,
"query_mode" => QueryMode
"query_mode" => QueryMode,
"kafka_headers" => KafkaHeaders,
"kafka_ext_headers" => KafkaExtHeaders
}
),
Hocon.
@ -717,6 +901,43 @@ bridges.kafka.{{ bridge_name }} {
}
""".
%% erlfmt-ignore
hocon_config_template_with_headers() ->
%% TODO: rename the type to `kafka_producer' after alias support is
%% added to hocon; keeping this as just `kafka' for backwards
%% compatibility.
"""
bridges.kafka.{{ bridge_name }} {
bootstrap_hosts = \"{{ kafka_hosts_string }}\"
enable = true
authentication = {{{ authentication }}}
ssl = {{{ ssl }}}
local_topic = \"{{ local_topic }}\"
kafka = {
message = {
key = \"${clientid}\"
value = \"${.payload}\"
timestamp = \"${timestamp}\"
}
buffer = {
memory_overload_protection = false
}
kafka_headers = \"{{ kafka_headers }}\"
kafka_header_value_encode_mode: json
kafka_ext_headers: {{{ kafka_ext_headers }}}
partition_strategy = {{ partition_strategy }}
topic = \"{{ kafka_topic }}\"
query_mode = {{ query_mode }}
}
metadata_request_timeout = 5s
min_metadata_refresh_interval = 3s
socket_opts {
nodelay = true
}
connect_timeout = 5s
}
""".
%% erlfmt-ignore
hocon_config_template_authentication("none") ->
"none";

View File

@ -0,0 +1 @@
Add support for custom headers in messages for Kafka producer bridge.

View File

@ -200,6 +200,46 @@ required_acks.desc:
required_acks.label:
"""Required Acks"""
kafka_headers.desc:
"""Please provide a placeholder to be used as Kafka Headers<br/>
e.g. <code>${pub_props}</code><br/>
Notice that the value of the placeholder must either be an object:
<code>{\"foo\": \"bar\"}</code>
or an array of key-value pairs:
<code>[{\"key\": \"foo\", \"value\": \"bar\"}]</code>"""
kafka_headers.label:
"""Kafka Headers"""
producer_kafka_ext_headers.desc:
"""Please provide more key-value pairs for Kafka headers<br/>
The key-value pairs here will be combined with the
value of <code>kafka_headers</code> field before sending to Kafka."""
producer_kafka_ext_headers.label:
"""Extra Kafka headers"""
producer_kafka_ext_header_key.desc:
"""Key of the Kafka header. Placeholders in format of ${var} are supported."""
producer_kafka_ext_header_key.label:
"""Kafka extra header key."""
producer_kafka_ext_header_value.desc:
"""Value of the Kafka header. Placeholders in format of ${var} are supported."""
producer_kafka_ext_header_value.label:
"""Value"""
kafka_header_value_encode_mode.desc:
"""Kafka headers value encode mode<br/>
- NONE: only add binary values to Kafka headers;<br/>
- JSON: only add JSON values to Kafka headers,
and encode it to JSON strings before sending."""
kafka_header_value_encode_mode.label:
"""Kafka headers value encode mode"""
metadata_request_timeout.desc:
"""Maximum wait time when fetching metadata from Kafka."""