diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 16e9561e8..0b8f526f6 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -664,6 +664,9 @@ kvlist_headers([#{<<"key">> := K, <<"value">> := V} | Headers], Acc) -> kvlist_headers(Headers, [{K, V} | Acc]); kvlist_headers([{K, V} | Headers], Acc) -> kvlist_headers(Headers, [{K, V} | Acc]); +kvlist_headers([KVList | Headers], Acc) when is_list(KVList) -> + %% for instance, when user sets a json list as headers like '[{"foo":"bar"}, {"foo2":"bar2"}]'. + kvlist_headers(KVList ++ Headers, Acc); kvlist_headers([BadHeader | _], _) -> throw({bad_kafka_header, BadHeader}). @@ -694,7 +697,7 @@ merge_kafka_headers(HeadersTks, ExtHeaders, Msg) -> [undefined] -> ExtHeaders; [MaybeJson] when is_binary(MaybeJson) -> - case emqx_utils_json:safe_decode(MaybeJson) of + case emqx_utils_json:safe_decode(MaybeJson, [return_maps]) of {ok, JsonTerm} when is_map(JsonTerm) -> maps:to_list(JsonTerm) ++ ExtHeaders; {ok, JsonTerm} when is_list(JsonTerm) -> diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 00fa21626..ba11ddf14 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -560,7 +560,7 @@ t_send_message_with_headers(Config) -> "kafka_hosts_string" => HostsString, "kafka_topic" => KafkaTopic, "instance_id" => ResourceId, - "kafka_headers" => <<"${pub_props}">>, + "kafka_headers" => <<"${payload.header}">>, "kafka_ext_headers" => emqx_utils_json:encode( [ #{ @@ -568,8 +568,8 @@ t_send_message_with_headers(Config) -> <<"kafka_ext_header_value">> => <<"${clientid}">> }, #{ - <<"kafka_ext_header_key">> => <<"payload">>, - <<"kafka_ext_header_value">> => <<"${payload}">> + <<"kafka_ext_header_key">> => <<"ext_header_val">>, + <<"kafka_ext_header_value">> => <<"${payload.ext_header_val}">> } ] ), @@ -587,12 +587,42 @@ t_send_message_with_headers(Config) -> ), ConfigAtom = ConfigAtom1#{bridge_name => Name}, {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom), - Time = erlang:unique_integer(), - BinTime = integer_to_binary(Time), - Msg = #{ - clientid => BinTime, - payload => <<"payload">>, - timestamp => Time + Time1 = erlang:unique_integer(), + BinTime1 = integer_to_binary(Time1), + Payload1 = emqx_utils_json:encode( + #{ + <<"header">> => #{ + <<"foo">> => <<"bar">> + }, + <<"ext_header_val">> => <<"ext header ok">> + } + ), + Msg1 = #{ + clientid => BinTime1, + payload => Payload1, + timestamp => Time1 + }, + Time2 = erlang:unique_integer(), + BinTime2 = integer_to_binary(Time2), + Payload2 = emqx_utils_json:encode( + #{ + <<"header">> => [ + #{ + <<"key">> => <<"foo1">>, + <<"value">> => <<"bar1">> + }, + #{ + <<"key">> => <<"foo2">>, + <<"value">> => <<"bar2">> + } + ], + <<"ext_header_val">> => <<"ext header ok">> + } + ), + Msg2 = #{ + clientid => BinTime2, + payload => Payload2, + timestamp => Time2 }, {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), ct:pal("base offset before testing ~p", [Offset]), @@ -603,7 +633,8 @@ t_send_message_with_headers(Config) -> end, ?check_trace( begin - ok = send(Config, ResourceId, Msg, State) + ok = send(Config, ResourceId, Msg1, State), + ok = send(Config, ResourceId, Msg2, State) end, fun(Trace) -> ?assertMatch( @@ -616,11 +647,27 @@ t_send_message_with_headers(Config) -> [{var, [<<"clientid">>]}] }, { - [{str, <<"payload">>}], - [{var, [<<"payload">>]}] + [{str, <<"ext_header_val">>}], + [{var, [<<"payload">>, <<"ext_header_val">>]}] } ], - headers_tokens := [{var, [<<"pub_props">>]}], + headers_tokens := [{var, [<<"payload">>, <<"header">>]}], + headers_val_encode_mode := json + } + }, + #{ + headers_config := #{ + ext_headers_tokens := [ + { + [{str, <<"clientid">>}], + [{var, [<<"clientid">>]}] + }, + { + [{str, <<"ext_header_val">>}], + [{var, [<<"payload">>, <<"ext_header_val">>]}] + } + ], + headers_tokens := [{var, [<<"payload">>, <<"header">>]}], headers_val_encode_mode := json } } @@ -629,16 +676,28 @@ t_send_message_with_headers(Config) -> ) end ), - {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), + {ok, {_, KafkaMsgs}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch( - #kafka_message{ - headers = [ - {<<"clientid">>, _}, - {<<"payload">>, <<"\"payload\"">>} - ], - key = BinTime - }, - KafkaMsg + [ + #kafka_message{ + headers = [ + {<<"foo">>, <<"\"bar\"">>}, + {<<"clientid">>, _}, + {<<"ext_header_val">>, <<"\"ext header ok\"">>} + ], + key = BinTime1 + }, + #kafka_message{ + headers = [ + {<<"foo1">>, <<"\"bar1\"">>}, + {<<"foo2">>, <<"\"bar2\"">>}, + {<<"clientid">>, _}, + {<<"ext_header_val">>, <<"\"ext header ok\"">>} + ], + key = BinTime2 + } + ], + KafkaMsgs ), %% TODO: refactor those into init/end per testcase ok = ?PRODUCER:on_stop(ResourceId, State), @@ -769,7 +828,7 @@ t_wrong_headers_from_message(Config) -> timestamp => Time2 }, ?assertError( - {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}}, + {badmatch, {error, {unrecoverable_error, {bad_kafka_header, #{<<"foo">> := <<"bar">>}}}}}, send(Config, ResourceId, Msg2, State) ), Time3 = erlang:unique_integer(), @@ -780,23 +839,9 @@ t_wrong_headers_from_message(Config) -> timestamp => Time3 }, ?assertError( - {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}}, + {badmatch, {error, {unrecoverable_error, {bad_kafka_header, #{<<"key">> := <<"foo">>}}}}}, send(Config, ResourceId, Msg3, State) ), - Time4 = erlang:unique_integer(), - Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>, - Msg4 = #{ - clientid => integer_to_binary(Time4), - payload => Payload4, - timestamp => Time4 - }, - ?assertError( - {badmatch, - {error, - {unrecoverable_error, - {bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}}, - send(Config, ResourceId, Msg4, State) - ), %% TODO: refactor those into init/end per testcase ok = ?PRODUCER:on_stop(ResourceId, State), ?assertEqual([], supervisor:which_children(wolff_client_sup)), diff --git a/changes/ee/fix-11527.en.md b/changes/ee/fix-11527.en.md new file mode 100644 index 000000000..33d077e94 --- /dev/null +++ b/changes/ee/fix-11527.en.md @@ -0,0 +1 @@ +Fixed an issue with Kafka header handling when placeholders resolve to an array of key-value pairs (e.g.: `[{"key": "foo", "value": "bar"}]`).