From 893e90b3724107af91d98f5a3aa917fac11652e4 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 15 Nov 2023 22:37:00 +0700 Subject: [PATCH] fix(kafka): use safe publish in consumer Routing with v2 schema is actually more strict with respect to input to `emqx_router` module routines. This causes Kafka consumer bridge to crash when it tries to publish a message to a topic that looks like a topic filter. --- apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 89cb9a78f..24ea4d300 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -251,7 +251,7 @@ do_handle_message(Message, State) -> Payload = render(FullMessage, PayloadTemplate), MQTTTopic = render(FullMessage, MQTTTopicTemplate), MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload), - _ = emqx:publish(MQTTMessage), + _ = emqx_broker:safe_publish(MQTTMessage), emqx_hooks:run(Hookpoint, [FullMessage]), emqx_resource_metrics:received_inc(ResourceId), %% note: just `ack' does not commit the offset to the