diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 89cb9a78f..24ea4d300 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -251,7 +251,7 @@ do_handle_message(Message, State) -> Payload = render(FullMessage, PayloadTemplate), MQTTTopic = render(FullMessage, MQTTTopicTemplate), MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload), - _ = emqx:publish(MQTTMessage), + _ = emqx_broker:safe_publish(MQTTMessage), emqx_hooks:run(Hookpoint, [FullMessage]), emqx_resource_metrics:received_inc(ResourceId), %% note: just `ack' does not commit the offset to the