From 34186fcc744da8f48d8855e89b517f11584b08dd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 6 Oct 2023 11:36:42 -0300 Subject: [PATCH] fix(kafka_producer): send messages to wolff producer to buffer even when connector is in `connecting` state Fixes https://emqx.atlassian.net/browse/EMQX-11085 Messages would not be sent to wolff if the connection was down, so they were effectively lost. --- .../src/emqx_bridge_kafka_impl_producer.erl | 2 ++ apps/emqx_resource/src/emqx_resource.app.src | 2 +- apps/emqx_resource/src/emqx_resource.erl | 3 +-- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 6 ++++-- changes/ee/fix-11722.en.md | 1 + 5 files changed, 9 insertions(+), 5 deletions(-) create mode 100644 changes/ee/fix-11722.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 3485ac752..362ead229 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_kafka_impl_producer). +-behaviour(emqx_resource). + -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 532c769e2..8092fadc8 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.23"}, + {vsn, "0.1.24"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index ffb03209b..92701a8c4 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -306,8 +306,7 @@ query(ResId, Request, Opts) -> {simple_async, _} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again - Opts1 = Opts#{is_buffer_supported => true}, - emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts); {simple_sync, _} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index c75770673..9026fe65c 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1048,7 +1048,9 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) -> ?RESOURCE_ERROR(not_found, "resource not found") end. -do_call_query(QM, Id, Index, Ref, Query, #{is_buffer_supported := true} = QueryOpts, Resource) -> +do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when + ResQM =:= simple_async; ResQM =:= simple_sync +-> %% The connector supports buffer, send even in disconnected state #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource, CallMode = call_mode(QM, CBM), @@ -1059,7 +1061,7 @@ do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Res #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource, CallMode = call_mode(QM, CBM), apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts); -do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> +do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Resource) -> ?RESOURCE_ERROR(not_connected, "resource not connected"). -define(APPLY_RESOURCE(NAME, EXPR, REQ), diff --git a/changes/ee/fix-11722.en.md b/changes/ee/fix-11722.en.md new file mode 100644 index 000000000..3a8977a7a --- /dev/null +++ b/changes/ee/fix-11722.en.md @@ -0,0 +1 @@ +Fixed an issue where a Kafka Producer bridge with `sync` query mode would not buffer messages when in the `connecting` state.