fix(kafka): use safe publish in consumer

Routing with v2 schema is actually more strict with respect to input to
`emqx_router` module routines. This causes Kafka consumer bridge to
crash when it tries to publish a message to a topic that looks like a
topic filter.
This commit is contained in:
Andrew Mayorov 2023-11-15 22:37:00 +07:00
parent 8919b08207
commit 893e90b372
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 1 additions and 1 deletions

View File

@ -251,7 +251,7 @@ do_handle_message(Message, State) ->
Payload = render(FullMessage, PayloadTemplate),
MQTTTopic = render(FullMessage, MQTTTopicTemplate),
MQTTMessage = emqx_message:make(ResourceId, MQTTQoS, MQTTTopic, Payload),
_ = emqx:publish(MQTTMessage),
_ = emqx_broker:safe_publish(MQTTMessage),
emqx_hooks:run(Hookpoint, [FullMessage]),
emqx_resource_metrics:received_inc(ResourceId),
%% note: just `ack' does not commit the offset to the