Merge pull request #11527 from paulozulato/fix-kafka-header-template-handling

fix(kafka): fix template processing for header
This commit is contained in:
Paulo Zulato 2023-08-28 18:55:33 -03:00 committed by GitHub
commit 2c89be04d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 39 deletions

View File

@ -664,6 +664,9 @@ kvlist_headers([#{<<"key">> := K, <<"value">> := V} | Headers], Acc) ->
kvlist_headers(Headers, [{K, V} | Acc]); kvlist_headers(Headers, [{K, V} | Acc]);
kvlist_headers([{K, V} | Headers], Acc) -> kvlist_headers([{K, V} | Headers], Acc) ->
kvlist_headers(Headers, [{K, V} | Acc]); kvlist_headers(Headers, [{K, V} | Acc]);
kvlist_headers([KVList | Headers], Acc) when is_list(KVList) ->
%% for instance, when user sets a json list as headers like '[{"foo":"bar"}, {"foo2":"bar2"}]'.
kvlist_headers(KVList ++ Headers, Acc);
kvlist_headers([BadHeader | _], _) -> kvlist_headers([BadHeader | _], _) ->
throw({bad_kafka_header, BadHeader}). throw({bad_kafka_header, BadHeader}).
@ -694,7 +697,7 @@ merge_kafka_headers(HeadersTks, ExtHeaders, Msg) ->
[undefined] -> [undefined] ->
ExtHeaders; ExtHeaders;
[MaybeJson] when is_binary(MaybeJson) -> [MaybeJson] when is_binary(MaybeJson) ->
case emqx_utils_json:safe_decode(MaybeJson) of case emqx_utils_json:safe_decode(MaybeJson, [return_maps]) of
{ok, JsonTerm} when is_map(JsonTerm) -> {ok, JsonTerm} when is_map(JsonTerm) ->
maps:to_list(JsonTerm) ++ ExtHeaders; maps:to_list(JsonTerm) ++ ExtHeaders;
{ok, JsonTerm} when is_list(JsonTerm) -> {ok, JsonTerm} when is_list(JsonTerm) ->

View File

@ -560,7 +560,7 @@ t_send_message_with_headers(Config) ->
"kafka_hosts_string" => HostsString, "kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic, "kafka_topic" => KafkaTopic,
"instance_id" => ResourceId, "instance_id" => ResourceId,
"kafka_headers" => <<"${pub_props}">>, "kafka_headers" => <<"${payload.header}">>,
"kafka_ext_headers" => emqx_utils_json:encode( "kafka_ext_headers" => emqx_utils_json:encode(
[ [
#{ #{
@ -568,8 +568,8 @@ t_send_message_with_headers(Config) ->
<<"kafka_ext_header_value">> => <<"${clientid}">> <<"kafka_ext_header_value">> => <<"${clientid}">>
}, },
#{ #{
<<"kafka_ext_header_key">> => <<"payload">>, <<"kafka_ext_header_key">> => <<"ext_header_val">>,
<<"kafka_ext_header_value">> => <<"${payload}">> <<"kafka_ext_header_value">> => <<"${payload.ext_header_val}">>
} }
] ]
), ),
@ -587,12 +587,42 @@ t_send_message_with_headers(Config) ->
), ),
ConfigAtom = ConfigAtom1#{bridge_name => Name}, ConfigAtom = ConfigAtom1#{bridge_name => Name},
{ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom), {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom),
Time = erlang:unique_integer(), Time1 = erlang:unique_integer(),
BinTime = integer_to_binary(Time), BinTime1 = integer_to_binary(Time1),
Msg = #{ Payload1 = emqx_utils_json:encode(
clientid => BinTime, #{
payload => <<"payload">>, <<"header">> => #{
timestamp => Time <<"foo">> => <<"bar">>
},
<<"ext_header_val">> => <<"ext header ok">>
}
),
Msg1 = #{
clientid => BinTime1,
payload => Payload1,
timestamp => Time1
},
Time2 = erlang:unique_integer(),
BinTime2 = integer_to_binary(Time2),
Payload2 = emqx_utils_json:encode(
#{
<<"header">> => [
#{
<<"key">> => <<"foo1">>,
<<"value">> => <<"bar1">>
},
#{
<<"key">> => <<"foo2">>,
<<"value">> => <<"bar2">>
}
],
<<"ext_header_val">> => <<"ext header ok">>
}
),
Msg2 = #{
clientid => BinTime2,
payload => Payload2,
timestamp => Time2
}, },
{ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
ct:pal("base offset before testing ~p", [Offset]), ct:pal("base offset before testing ~p", [Offset]),
@ -603,7 +633,8 @@ t_send_message_with_headers(Config) ->
end, end,
?check_trace( ?check_trace(
begin begin
ok = send(Config, ResourceId, Msg, State) ok = send(Config, ResourceId, Msg1, State),
ok = send(Config, ResourceId, Msg2, State)
end, end,
fun(Trace) -> fun(Trace) ->
?assertMatch( ?assertMatch(
@ -616,11 +647,27 @@ t_send_message_with_headers(Config) ->
[{var, [<<"clientid">>]}] [{var, [<<"clientid">>]}]
}, },
{ {
[{str, <<"payload">>}], [{str, <<"ext_header_val">>}],
[{var, [<<"payload">>]}] [{var, [<<"payload">>, <<"ext_header_val">>]}]
} }
], ],
headers_tokens := [{var, [<<"pub_props">>]}], headers_tokens := [{var, [<<"payload">>, <<"header">>]}],
headers_val_encode_mode := json
}
},
#{
headers_config := #{
ext_headers_tokens := [
{
[{str, <<"clientid">>}],
[{var, [<<"clientid">>]}]
},
{
[{str, <<"ext_header_val">>}],
[{var, [<<"payload">>, <<"ext_header_val">>]}]
}
],
headers_tokens := [{var, [<<"payload">>, <<"header">>]}],
headers_val_encode_mode := json headers_val_encode_mode := json
} }
} }
@ -629,16 +676,28 @@ t_send_message_with_headers(Config) ->
) )
end end
), ),
{ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), {ok, {_, KafkaMsgs}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
?assertMatch( ?assertMatch(
[
#kafka_message{ #kafka_message{
headers = [ headers = [
{<<"foo">>, <<"\"bar\"">>},
{<<"clientid">>, _}, {<<"clientid">>, _},
{<<"payload">>, <<"\"payload\"">>} {<<"ext_header_val">>, <<"\"ext header ok\"">>}
], ],
key = BinTime key = BinTime1
}, },
KafkaMsg #kafka_message{
headers = [
{<<"foo1">>, <<"\"bar1\"">>},
{<<"foo2">>, <<"\"bar2\"">>},
{<<"clientid">>, _},
{<<"ext_header_val">>, <<"\"ext header ok\"">>}
],
key = BinTime2
}
],
KafkaMsgs
), ),
%% TODO: refactor those into init/end per testcase %% TODO: refactor those into init/end per testcase
ok = ?PRODUCER:on_stop(ResourceId, State), ok = ?PRODUCER:on_stop(ResourceId, State),
@ -769,7 +828,7 @@ t_wrong_headers_from_message(Config) ->
timestamp => Time2 timestamp => Time2
}, },
?assertError( ?assertError(
{badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}}, {badmatch, {error, {unrecoverable_error, {bad_kafka_header, #{<<"foo">> := <<"bar">>}}}}},
send(Config, ResourceId, Msg2, State) send(Config, ResourceId, Msg2, State)
), ),
Time3 = erlang:unique_integer(), Time3 = erlang:unique_integer(),
@ -780,23 +839,9 @@ t_wrong_headers_from_message(Config) ->
timestamp => Time3 timestamp => Time3
}, },
?assertError( ?assertError(
{badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}}, {badmatch, {error, {unrecoverable_error, {bad_kafka_header, #{<<"key">> := <<"foo">>}}}}},
send(Config, ResourceId, Msg3, State) send(Config, ResourceId, Msg3, State)
), ),
Time4 = erlang:unique_integer(),
Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>,
Msg4 = #{
clientid => integer_to_binary(Time4),
payload => Payload4,
timestamp => Time4
},
?assertError(
{badmatch,
{error,
{unrecoverable_error,
{bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}},
send(Config, ResourceId, Msg4, State)
),
%% TODO: refactor those into init/end per testcase %% TODO: refactor those into init/end per testcase
ok = ?PRODUCER:on_stop(ResourceId, State), ok = ?PRODUCER:on_stop(ResourceId, State),
?assertEqual([], supervisor:which_children(wolff_client_sup)), ?assertEqual([], supervisor:which_children(wolff_client_sup)),

View File

@ -0,0 +1 @@
Fixed an issue with Kafka header handling when placeholders resolve to an array of key-value pairs (e.g.: `[{"key": "foo", "value": "bar"}]`).