diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index a39259d2e..16e9561e8 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -213,7 +213,7 @@ on_stop(InstanceId, _State) -> ok. on_query( - _InstId, + InstId, {send_message, Message}, #{ message_template := Template, @@ -229,19 +229,34 @@ on_query( ext_headers_tokens => KafkaExtHeadersTokens, headers_val_encode_mode => KafkaHeadersValEncodeMode }, - KafkaMessage = render_message(Template, KafkaHeaders, Message), - ?tp( - emqx_bridge_kafka_impl_producer_sync_query, - #{headers_config => KafkaHeaders, instance_id => _InstId} - ), try - {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), - ok + KafkaMessage = render_message(Template, KafkaHeaders, Message), + ?tp( + emqx_bridge_kafka_impl_producer_sync_query, + #{headers_config => KafkaHeaders, instance_id => InstId} + ), + do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) catch - error:{producer_down, _} = Reason -> - {error, Reason}; - error:timeout -> - {error, timeout} + throw:{bad_kafka_header, _} = Error -> + ?tp( + emqx_bridge_kafka_impl_producer_sync_query_failed, + #{ + headers_config => KafkaHeaders, + instance_id => InstId, + reason => Error + } + ), + {error, {unrecoverable_error, Error}}; + throw:{bad_kafka_headers, _} = Error -> + ?tp( + emqx_bridge_kafka_impl_producer_sync_query_failed, + #{ + headers_config => KafkaHeaders, + instance_id => InstId, + reason => Error + } + ), + {error, {unrecoverable_error, Error}} end. %% @doc The callback API for rule-engine (or bridge without rules) @@ -251,7 +266,7 @@ on_query( %% E.g. the output of rule-engine process chain %% or the direct mapping from an MQTT message. on_query_async( - _InstId, + InstId, {send_message, Message}, AsyncReplyFn, #{ @@ -267,21 +282,35 @@ on_query_async( ext_headers_tokens => KafkaExtHeadersTokens, headers_val_encode_mode => KafkaHeadersValEncodeMode }, - KafkaMessage = render_message(Template, KafkaHeaders, Message), - ?tp( - emqx_bridge_kafka_impl_producer_async_query, - #{headers_config => KafkaHeaders, instance_id => _InstId} - ), - %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs - %% * Must be a single element batch because wolff books calls, but not batch sizes - %% for counters and gauges. - Batch = [KafkaMessage], - %% The retuned information is discarded here. - %% If the producer process is down when sending, this function would - %% raise an error exception which is to be caught by the caller of this callback - {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}), - %% this Pid is so far never used because Kafka producer is by-passing the buffer worker - {ok, Pid}. + try + KafkaMessage = render_message(Template, KafkaHeaders, Message), + ?tp( + emqx_bridge_kafka_impl_producer_async_query, + #{headers_config => KafkaHeaders, instance_id => InstId} + ), + do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) + catch + throw:{bad_kafka_header, _} = Error -> + ?tp( + emqx_bridge_kafka_impl_producer_async_query_failed, + #{ + headers_config => KafkaHeaders, + instance_id => InstId, + reason => Error + } + ), + {error, {unrecoverable_error, Error}}; + throw:{bad_kafka_headers, _} = Error -> + ?tp( + emqx_bridge_kafka_impl_producer_async_query_failed, + #{ + headers_config => KafkaHeaders, + instance_id => InstId, + reason => Error + } + ), + {error, {unrecoverable_error, Error}} + end. compile_message_template(T) -> KeyTemplate = maps:get(key, T, <<"${.clientid}">>), @@ -337,6 +366,28 @@ render_timestamp(Template, Message) -> erlang:system_time(millisecond) end. +do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) -> + try + {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), + ok + catch + error:{producer_down, _} = Reason -> + {error, Reason}; + error:timeout -> + {error, timeout} + end; +do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) -> + %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs + %% * Must be a single element batch because wolff books calls, but not batch sizes + %% for counters and gauges. + Batch = [KafkaMessage], + %% The retuned information is discarded here. + %% If the producer process is down when sending, this function would + %% raise an error exception which is to be caught by the caller of this callback + {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}), + %% this Pid is so far never used because Kafka producer is by-passing the buffer worker + {ok, Pid}. + %% Wolff producer never gives up retrying %% so there can only be 'ok' results. on_kafka_ack(_Partition, Offset, {ReplyFn, Args}) when is_integer(Offset) -> diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index b920b39ae..00fa21626 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -721,6 +721,90 @@ t_wrong_headers(_Config) -> ), ok. +t_wrong_headers_from_message(Config) -> + HostsString = kafka_hosts_string_sasl(), + AuthSettings = valid_sasl_plain_settings(), + Hash = erlang:phash2([HostsString, ?FUNCTION_NAME]), + Type = ?BRIDGE_TYPE, + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + ResourceId = emqx_bridge_resource:resource_id(Type, Name), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + KafkaTopic = "test-topic-one-partition", + Conf = config_with_headers(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => ResourceId, + "kafka_headers" => <<"${payload}">>, + "producer" => #{ + "kafka" => #{ + "buffer" => #{ + "memory_overload_protection" => false + } + } + }, + "ssl" => #{} + }), + {ok, #{config := ConfigAtom1}} = emqx_bridge:create( + Type, erlang:list_to_atom(Name), Conf + ), + ConfigAtom = ConfigAtom1#{bridge_name => Name}, + {ok, State} = ?PRODUCER:on_start(ResourceId, ConfigAtom), + Time1 = erlang:unique_integer(), + Payload1 = <<"wrong_header">>, + Msg1 = #{ + clientid => integer_to_binary(Time1), + payload => Payload1, + timestamp => Time1 + }, + ?assertError( + {badmatch, {error, {unrecoverable_error, {bad_kafka_headers, Payload1}}}}, + send(Config, ResourceId, Msg1, State) + ), + Time2 = erlang:unique_integer(), + Payload2 = <<"[{\"foo\":\"bar\"}, {\"foo2\":\"bar2\"}]">>, + Msg2 = #{ + clientid => integer_to_binary(Time2), + payload => Payload2, + timestamp => Time2 + }, + ?assertError( + {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"foo">>, <<"bar">>}]}}}}, + send(Config, ResourceId, Msg2, State) + ), + Time3 = erlang:unique_integer(), + Payload3 = <<"[{\"key\":\"foo\"}, {\"value\":\"bar\"}]">>, + Msg3 = #{ + clientid => integer_to_binary(Time3), + payload => Payload3, + timestamp => Time3 + }, + ?assertError( + {badmatch, {error, {unrecoverable_error, {bad_kafka_header, [{<<"key">>, <<"foo">>}]}}}}, + send(Config, ResourceId, Msg3, State) + ), + Time4 = erlang:unique_integer(), + Payload4 = <<"[{\"key\":\"foo\", \"value\":\"bar\"}]">>, + Msg4 = #{ + clientid => integer_to_binary(Time4), + payload => Payload4, + timestamp => Time4 + }, + ?assertError( + {badmatch, + {error, + {unrecoverable_error, + {bad_kafka_header, [{<<"key">>, <<"foo">>}, {<<"value">>, <<"bar">>}]}}}}, + send(Config, ResourceId, Msg4, State) + ), + %% TODO: refactor those into init/end per testcase + ok = ?PRODUCER:on_stop(ResourceId, State), + ?assertEqual([], supervisor:which_children(wolff_client_sup)), + ?assertEqual([], supervisor:which_children(wolff_producers_sup)), + ok = emqx_bridge_resource:remove(BridgeId), + delete_all_bridges(), + ok. + %%------------------------------------------------------------------------------ %% Helper functions %%------------------------------------------------------------------------------ diff --git a/changes/ee/fix-11508.en.md b/changes/ee/fix-11508.en.md new file mode 100644 index 000000000..54ea90db3 --- /dev/null +++ b/changes/ee/fix-11508.en.md @@ -0,0 +1 @@ +Fix message error handling on Kafka bridge when headers translate to an invalid value.