From f47cc2a458df761eb19d5c4cf111e0958c243bbb Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Fri, 16 Jun 2023 15:43:23 -0300 Subject: [PATCH] feat(kafka): add support for kafka headers and ext headers Fixes https://emqx.atlassian.net/browse/EMQX-9175 --- .../src/emqx_bridge_kafka.erl | 81 ++++++- .../src/emqx_bridge_kafka_impl_producer.erl | 157 +++++++++++- .../emqx_bridge_kafka_impl_producer_SUITE.erl | 229 +++++++++++++++++- changes/ee/feat-11079.en.md | 1 + rel/i18n/emqx_bridge_kafka.hocon | 40 +++ 5 files changed, 495 insertions(+), 13 deletions(-) create mode 100644 changes/ee/feat-11079.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 4db685657..394093918 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -92,6 +92,18 @@ values(producer) -> partition_strategy => <<"random">>, required_acks => <<"all_isr">>, partition_count_refresh_interval => <<"60s">>, + kafka_headers => <<"${pub_props}">>, + kafka_ext_headers => [ + #{ + kafka_ext_header_key => <<"clientid">>, + kafka_ext_header_value => <<"${clientid}">> + }, + #{ + kafka_ext_header_key => <<"topic">>, + kafka_ext_header_value => <<"${topic}">> + } + ], + kafka_header_value_encode_mode => none, max_inflight => 10, buffer => #{ mode => <<"hybrid">>, @@ -281,6 +293,31 @@ fields(producer_kafka_opts) -> desc => ?DESC(required_acks) } )}, + {kafka_headers, + mk( + binary(), + #{ + required => false, + validator => fun kafka_header_validator/1, + desc => ?DESC(kafka_headers) + } + )}, + {kafka_ext_headers, + mk( + hoconsc:array(ref(producer_kafka_ext_headers)), + #{ + desc => ?DESC(producer_kafka_ext_headers), + required => false + } + )}, + {kafka_header_value_encode_mode, + mk( + enum([none, json]), + #{ + default => none, + desc => ?DESC(kafka_header_value_encode_mode) + } + )}, {partition_count_refresh_interval, mk( emqx_schema:timeout_duration_s(), @@ -319,6 +356,23 @@ fields(producer_kafka_opts) -> } )} ]; +fields(producer_kafka_ext_headers) -> + [ + {kafka_ext_header_key, + mk( + binary(), + #{required => true, desc => ?DESC(producer_kafka_ext_header_key)} + )}, + {kafka_ext_header_value, + mk( + binary(), + #{ + required => true, + validator => fun kafka_ext_header_value_validator/1, + desc => ?DESC(producer_kafka_ext_header_value) + } + )} + ]; fields(kafka_message) -> [ {key, mk(string(), #{default => <<"${.clientid}">>, desc => ?DESC(kafka_message_key)})}, @@ -433,7 +487,8 @@ struct_names() -> producer_opts, consumer_opts, consumer_kafka_opts, - consumer_topic_mapping + consumer_topic_mapping, + producer_kafka_ext_headers ]. %% ------------------------------------------------------------------------------------------------- @@ -491,3 +546,27 @@ producer_strategy_key_validator(#{ {error, "Message key cannot be empty when `key_dispatch` strategy is used"}; producer_strategy_key_validator(_) -> ok. + +kafka_header_validator(undefined) -> + ok; +kafka_header_validator(Value) -> + case emqx_placeholder:preproc_tmpl(Value) of + [{var, _}] -> + ok; + _ -> + {error, "The 'kafka_headers' must be a single placeholder like ${pub_props}"} + end. + +kafka_ext_header_value_validator(undefined) -> + ok; +kafka_ext_header_value_validator(Value) -> + case emqx_placeholder:preproc_tmpl(Value) of + [{Type, _}] when Type =:= var orelse Type =:= str -> + ok; + _ -> + { + error, + "The value of 'kafka_ext_headers' must either be a single " + "placeholder like ${foo}, or a simple string." + } + end. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index be4c2d860..4fa188c95 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -57,6 +57,9 @@ on_start(InstId, Config) -> socket_opts := SocketOpts, ssl := SSL } = Config, + KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), + KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), + KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), BridgeType = ?BRIDGE_TYPE, ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId), @@ -116,7 +119,10 @@ on_start(InstId, Config) -> resource_id => ResourceId, sync_query_timeout => SyncQueryTimeout, hosts => Hosts, - kafka_config => KafkaConfig + kafka_config => KafkaConfig, + headers_tokens => KafkaHeadersTokens, + ext_headers_tokens => KafkaExtHeadersTokens, + headers_val_encode_mode => KafkaHeadersValEncodeMode }}; {error, Reason2} -> ?SLOG(error, #{ @@ -210,11 +216,19 @@ on_query( #{ message_template := Template, producers := Producers, - sync_query_timeout := SyncTimeout + sync_query_timeout := SyncTimeout, + headers_tokens := KafkaHeadersTokens, + ext_headers_tokens := KafkaExtHeadersTokens, + headers_val_encode_mode := KafkaHeadersValEncodeMode } ) -> - ?tp(emqx_bridge_kafka_impl_producer_sync_query, #{}), - KafkaMessage = render_message(Template, Message), + KafkaHeaders = #{ + headers_tokens => KafkaHeadersTokens, + ext_headers_tokens => KafkaExtHeadersTokens, + headers_val_encode_mode => KafkaHeadersValEncodeMode + }, + KafkaMessage = render_message(Template, KafkaHeaders, Message), + ?tp(emqx_bridge_kafka_impl_producer_sync_query, KafkaHeaders), try {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), ok @@ -235,10 +249,21 @@ on_query_async( _InstId, {send_message, Message}, AsyncReplyFn, - #{message_template := Template, producers := Producers} + #{ + message_template := Template, + producers := Producers, + headers_tokens := KafkaHeadersTokens, + ext_headers_tokens := KafkaExtHeadersTokens, + headers_val_encode_mode := KafkaHeadersValEncodeMode + } ) -> - ?tp(emqx_bridge_kafka_impl_producer_async_query, #{}), - KafkaMessage = render_message(Template, Message), + KafkaHeaders = #{ + headers_tokens => KafkaHeadersTokens, + ext_headers_tokens => KafkaExtHeadersTokens, + headers_val_encode_mode => KafkaHeadersValEncodeMode + }, + KafkaMessage = render_message(Template, KafkaHeaders, Message), + ?tp(emqx_bridge_kafka_impl_producer_async_query, KafkaHeaders), %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs %% * Must be a single element batch because wolff books calls, but not batch sizes %% for counters and gauges. @@ -264,11 +289,25 @@ preproc_tmpl(Tmpl) -> emqx_placeholder:preproc_tmpl(Tmpl). render_message( - #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message + #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, + #{ + headers_tokens := KafkaHeadersTokens, + ext_headers_tokens := KafkaExtHeadersTokens, + headers_val_encode_mode := KafkaHeadersValEncodeMode + }, + Message ) -> + ExtHeaders = proc_ext_headers(KafkaExtHeadersTokens, Message), + KafkaHeaders = + case KafkaHeadersTokens of + undefined -> ExtHeaders; + HeadersTks -> merge_kafka_headers(HeadersTks, ExtHeaders, Message) + end, + Headers = formalize_kafka_headers(KafkaHeaders, KafkaHeadersValEncodeMode), #{ key => render(KeyTemplate, Message), value => render(ValueTemplate, Message), + headers => Headers, ts => render_timestamp(TimestampTemplate, Message) }. @@ -509,3 +548,105 @@ maybe_install_wolff_telemetry_handlers(ResourceID) -> %% multiplying the metric counts... #{bridge_id => ResourceID} ). + +preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined -> + undefined; +preproc_kafka_headers(HeadersTmpl) -> + %% the value is already validated by schema, so we do not validate it again. + emqx_placeholder:preproc_tmpl(HeadersTmpl). + +preproc_ext_headers(Headers) -> + [ + {emqx_placeholder:preproc_tmpl(K), preproc_ext_headers_value(V)} + || #{kafka_ext_header_key := K, kafka_ext_header_value := V} <- Headers + ]. + +preproc_ext_headers_value(ValTmpl) -> + %% the value is already validated by schema, so we do not validate it again. + emqx_placeholder:preproc_tmpl(ValTmpl). + +proc_ext_headers(ExtHeaders, Msg) -> + lists:filtermap( + fun({KTks, VTks}) -> + try + Key = proc_ext_headers_key(KTks, Msg), + Val = proc_ext_headers_value(VTks, Msg), + {true, {Key, Val}} + catch + throw:placeholder_not_found -> false + end + end, + ExtHeaders + ). + +proc_ext_headers_key(KeyTks, Msg) -> + RawList = emqx_placeholder:proc_tmpl(KeyTks, Msg, #{return => rawlist}), + list_to_binary( + lists:map( + fun + (undefined) -> throw(placeholder_not_found); + (Key) -> bin(Key) + end, + RawList + ) + ). + +proc_ext_headers_value(ValTks, Msg) -> + case emqx_placeholder:proc_tmpl(ValTks, Msg, #{return => rawlist}) of + [undefined] -> throw(placeholder_not_found); + [Value] -> Value + end. + +kvlist_headers([], Acc) -> + lists:reverse(Acc); +kvlist_headers([#{key := K, value := V} | Headers], Acc) -> + kvlist_headers(Headers, [{K, V} | Acc]); +kvlist_headers([#{<<"key">> := K, <<"value">> := V} | Headers], Acc) -> + kvlist_headers(Headers, [{K, V} | Acc]); +kvlist_headers([{K, V} | Headers], Acc) -> + kvlist_headers(Headers, [{K, V} | Acc]); +kvlist_headers([BadHeader | _], _) -> + throw({bad_kafka_header, BadHeader}). + +-define(IS_STR_KEY(K), (is_list(K) orelse is_atom(K) orelse is_binary(K))). +formalize_kafka_headers(Headers, none) -> + %% Note that we will drop all the non-binary values in the NONE mode + [{bin(K), V} || {K, V} <- Headers, is_binary(V) andalso ?IS_STR_KEY(K)]; +formalize_kafka_headers(Headers, json) -> + lists:filtermap( + fun({K, V}) -> + try + {true, {bin(K), emqx_utils_json:encode(V)}} + catch + _:_ -> false + end + end, + Headers + ). + +merge_kafka_headers(HeadersTks, ExtHeaders, Msg) -> + case emqx_placeholder:proc_tmpl(HeadersTks, Msg, #{return => rawlist}) of + % Headers by map object. + [Map] when is_map(Map) -> + maps:to_list(Map) ++ ExtHeaders; + [KVList] when is_list(KVList) -> + kvlist_headers(KVList, []) ++ ExtHeaders; + %% the placeholder cannot be found in Msg + [undefined] -> + ExtHeaders; + [MaybeJson] when is_binary(MaybeJson) -> + case emqx_utils_json:safe_decode(MaybeJson) of + {ok, JsonTerm} when is_map(JsonTerm) -> + maps:to_list(JsonTerm) ++ ExtHeaders; + {ok, JsonTerm} when is_list(JsonTerm) -> + kvlist_headers(JsonTerm, []) ++ ExtHeaders; + _ -> + throw({bad_kafka_headers, MaybeJson}) + end; + BadHeaders -> + throw({bad_kafka_headers, BadHeaders}) + end. + +bin(B) when is_binary(B) -> B; +bin(L) when is_list(L) -> erlang:list_to_binary(L); +bin(A) when is_atom(A) -> erlang:atom_to_binary(A, utf8). diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 6031c21cb..38d58c1e7 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -504,6 +504,180 @@ t_table_removed(_Config) -> delete_all_bridges(), ok. +t_send_message_with_headers(Config) -> + HostsString = kafka_hosts_string_sasl(), + AuthSettings = valid_sasl_plain_settings(), + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Type = ?BRIDGE_TYPE, + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + KafkaTopic = "test-topic-one-partition", + Conf = config_with_headers(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "kafka_headers" => <<"${pub_props}">>, + "kafka_ext_headers" => emqx_utils_json:encode( + [ + #{ + <<"kafka_ext_header_key">> => <<"clientid">>, + <<"kafka_ext_header_value">> => <<"${clientid}">> + }, + #{ + <<"kafka_ext_header_key">> => <<"payload">>, + <<"kafka_ext_header_value">> => <<"${payload}">> + } + ] + ), + "producer" => #{ + "kafka" => #{ + "buffer" => #{ + "memory_overload_protection" => false + } + } + }, + "ssl" => #{} + }), + {ok, #{config := ConfigAtom1}} = emqx_bridge:create( + Type, erlang:list_to_atom(Name), Conf + ), + ConfigAtom = ConfigAtom1#{bridge_name => Name}, + {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom), + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + ct:pal("base offset before testing ~p", [Offset]), + Kind = + case proplists:get_value(query_api, Config) of + on_query -> emqx_bridge_kafka_impl_producer_sync_query; + on_query_async -> emqx_bridge_kafka_impl_producer_async_query + end, + ?check_trace( + begin + ok = send(Config, ResourceId, Msg, State) + end, + fun(Trace) -> + ?assertMatch( + [ + #{ + ext_headers_tokens := [ + { + [{str, <<"clientid">>}], + [{var, [<<"clientid">>]}] + }, + { + [{str, <<"payload">>}], + [{var, [<<"payload">>]}] + } + ], + headers_tokens := [{var, [<<"pub_props">>]}], + headers_val_encode_mode := json + } + ], + ?of_kind(Kind, Trace) + ) + end + ), + {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), + ?assertMatch( + #kafka_message{ + headers = [ + {<<"clientid">>, _}, + {<<"payload">>, <<"\"payload\"">>} + ], + key = BinTime + }, + KafkaMsg + ), + ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), + %% TODO: refactor those into init/end per testcase + ok = ?PRODUCER:on_stop(ResourceId, State), + ?assertEqual([], supervisor:which_children(wolff_client_sup)), + ?assertEqual([], supervisor:which_children(wolff_producers_sup)), + ok = emqx_bridge_resource:remove(BridgeId), + delete_all_bridges(), + ok. + +t_wrong_headers(_Config) -> + HostsString = kafka_hosts_string_sasl(), + AuthSettings = valid_sasl_plain_settings(), + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Type = ?BRIDGE_TYPE, + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), + KafkaTopic = "test-topic-one-partition", + ?assertThrow( + { + emqx_bridge_schema, + [ + #{ + kind := validation_error, + reason := "The 'kafka_headers' must be a single placeholder like ${pub_props}" + } + ] + }, + config_with_headers(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "kafka_headers" => <<"wrong_header">>, + "kafka_ext_headers" => <<"[]">>, + "producer" => #{ + "kafka" => #{ + "buffer" => #{ + "memory_overload_protection" => false + } + } + }, + "ssl" => #{} + }) + ), + ?assertThrow( + { + emqx_bridge_schema, + [ + #{ + kind := validation_error, + reason := + "The value of 'kafka_ext_headers' must either be a single " + "placeholder like ${foo}, or a simple string." + } + ] + }, + config_with_headers(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "kafka_headers" => <<"${pub_props}">>, + "kafka_ext_headers" => emqx_utils_json:encode( + [ + #{ + <<"kafka_ext_header_key">> => <<"clientid">>, + <<"kafka_ext_header_value">> => <<"wrong ${header}">> + } + ] + ), + "producer" => #{ + "kafka" => #{ + "buffer" => #{ + "memory_overload_protection" => false + } + } + }, + "ssl" => #{} + }) + ), + ok. + %%------------------------------------------------------------------------------ %% Helper functions %%------------------------------------------------------------------------------ @@ -644,9 +818,15 @@ config(Args) -> config(Args, #{}). config(Args0, More) -> + config(Args0, More, fun hocon_config_template/0). + +config_with_headers(Args) -> + config(Args, #{}, fun hocon_config_template_with_headers/0). + +config(Args0, More, ConfigTemplateFun) -> Args1 = maps:merge(default_config(), Args0), Args = maps:merge(Args1, More), - ConfText = hocon_config(Args), + ConfText = hocon_config(Args, ConfigTemplateFun), {ok, Conf} = hocon:binary(ConfText, #{format => map}), ct:pal("Running tests with conf:\n~p", [Conf]), InstId = maps:get("instance_id", Args), @@ -661,7 +841,7 @@ config(Args0, More) -> #{<<"bridges">> := #{TypeBin := #{Name := Parsed}}} = Conf, Parsed. -hocon_config(Args) -> +hocon_config(Args, ConfigTemplateFun) -> InstId = maps:get("instance_id", Args), <<"bridge:", BridgeId/binary>> = InstId, {_Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId, #{atom_name => false}), @@ -672,13 +852,17 @@ hocon_config(Args) -> SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)), QueryMode = maps:get("query_mode", Args, <<"async">>), SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf), + KafkaHeaders = maps:get("kafka_headers", Args, undefined), + KafkaExtHeaders = maps:get("kafka_ext_headers", Args, <<"[]">>), Hocon = bbmustache:render( - iolist_to_binary(hocon_config_template()), + iolist_to_binary(ConfigTemplateFun()), Args#{ "authentication" => AuthConfRendered, "bridge_name" => Name, "ssl" => SSLConfRendered, - "query_mode" => QueryMode + "query_mode" => QueryMode, + "kafka_headers" => KafkaHeaders, + "kafka_ext_headers" => KafkaExtHeaders } ), Hocon. @@ -717,6 +901,43 @@ bridges.kafka.{{ bridge_name }} { } """. +%% erlfmt-ignore +hocon_config_template_with_headers() -> +%% TODO: rename the type to `kafka_producer' after alias support is +%% added to hocon; keeping this as just `kafka' for backwards +%% compatibility. +""" +bridges.kafka.{{ bridge_name }} { + bootstrap_hosts = \"{{ kafka_hosts_string }}\" + enable = true + authentication = {{{ authentication }}} + ssl = {{{ ssl }}} + local_topic = \"{{ local_topic }}\" + kafka = { + message = { + key = \"${clientid}\" + value = \"${.payload}\" + timestamp = \"${timestamp}\" + } + buffer = { + memory_overload_protection = false + } + kafka_headers = \"{{ kafka_headers }}\" + kafka_header_value_encode_mode: json + kafka_ext_headers: {{{ kafka_ext_headers }}} + partition_strategy = {{ partition_strategy }} + topic = \"{{ kafka_topic }}\" + query_mode = {{ query_mode }} + } + metadata_request_timeout = 5s + min_metadata_refresh_interval = 3s + socket_opts { + nodelay = true + } + connect_timeout = 5s +} +""". + %% erlfmt-ignore hocon_config_template_authentication("none") -> "none"; diff --git a/changes/ee/feat-11079.en.md b/changes/ee/feat-11079.en.md new file mode 100644 index 000000000..2ac835fbd --- /dev/null +++ b/changes/ee/feat-11079.en.md @@ -0,0 +1 @@ +Add support for custom headers in messages for Kafka producer bridge. diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index d35e31a12..07b4cdd4e 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -200,6 +200,46 @@ required_acks.desc: required_acks.label: """Required Acks""" +kafka_headers.desc: +"""Please provide a placeholder to be used as Kafka Headers
+e.g. ${pub_props}
+Notice that the value of the placeholder must either be an object: +{\"foo\": \"bar\"} +or an array of key-value pairs: +[{\"key\": \"foo\", \"value\": \"bar\"}]""" + +kafka_headers.label: +"""Kafka Headers""" + +producer_kafka_ext_headers.desc: +"""Please provide more key-value pairs for Kafka headers
+The key-value pairs here will be combined with the +value of kafka_headers field before sending to Kafka.""" + +producer_kafka_ext_headers.label: +"""Extra Kafka headers""" + +producer_kafka_ext_header_key.desc: +"""Key of the Kafka header. Placeholders in format of ${var} are supported.""" + +producer_kafka_ext_header_key.label: +"""Kafka extra header key.""" + +producer_kafka_ext_header_value.desc: +"""Value of the Kafka header. Placeholders in format of ${var} are supported.""" + +producer_kafka_ext_header_value.label: +"""Value""" + +kafka_header_value_encode_mode.desc: +"""Kafka headers value encode mode
+ - NONE: only add binary values to Kafka headers;
+ - JSON: only add JSON values to Kafka headers, +and encode it to JSON strings before sending.""" + +kafka_header_value_encode_mode.label: +"""Kafka headers value encode mode""" + metadata_request_timeout.desc: """Maximum wait time when fetching metadata from Kafka."""