diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 453370220..749250306 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -34,7 +34,7 @@ query_mode(#{kafka := #{query_mode := sync}}) -> simple_sync_internal_buffer; query_mode(_) -> - simple_async. + simple_async_internal_buffer. callback_mode() -> async_if_possible. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index cf8c9df9e..df50625ab 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -311,7 +311,15 @@ query(ResId, Request, Opts) -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts); + {simple_async_internal_buffer, _} -> + %% This is for bridges/connectors that have internal buffering, such + %% as Kafka and Pulsar producers. + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts); {simple_sync_internal_buffer, _} -> + %% This is for bridges/connectors that have internal buffering, such + %% as Kafka and Pulsar producers. %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again emqx_resource_buffer_worker:simple_sync_internal_buffer_query( diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 2fab04d62..a47884161 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1088,7 +1088,7 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) -> end. do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when - ResQM =:= simple_async; ResQM =:= simple_sync + ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer -> %% The connector supports buffer, send even in disconnected state #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,