diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 3485ac752..362ead229 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_kafka_impl_producer). +-behaviour(emqx_resource). + -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 532c769e2..8092fadc8 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.23"}, + {vsn, "0.1.24"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index ffb03209b..92701a8c4 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -306,8 +306,7 @@ query(ResId, Request, Opts) -> {simple_async, _} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again - Opts1 = Opts#{is_buffer_supported => true}, - emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts); {simple_sync, _} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index c75770673..9026fe65c 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1048,7 +1048,9 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) -> ?RESOURCE_ERROR(not_found, "resource not found") end. -do_call_query(QM, Id, Index, Ref, Query, #{is_buffer_supported := true} = QueryOpts, Resource) -> +do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when + ResQM =:= simple_async; ResQM =:= simple_sync +-> %% The connector supports buffer, send even in disconnected state #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource, CallMode = call_mode(QM, CBM), @@ -1059,7 +1061,7 @@ do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Res #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource, CallMode = call_mode(QM, CBM), apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, QueryOpts); -do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> +do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Resource) -> ?RESOURCE_ERROR(not_connected, "resource not connected"). -define(APPLY_RESOURCE(NAME, EXPR, REQ), diff --git a/changes/ee/fix-11722.en.md b/changes/ee/fix-11722.en.md new file mode 100644 index 000000000..3a8977a7a --- /dev/null +++ b/changes/ee/fix-11722.en.md @@ -0,0 +1 @@ +Fixed an issue where a Kafka Producer bridge with `sync` query mode would not buffer messages when in the `connecting` state.