diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 362ead229..453370220 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -32,7 +32,7 @@ -define(kafka_producers, kafka_producers). query_mode(#{kafka := #{query_mode := sync}}) -> - simple_sync; + simple_sync_internal_buffer; query_mode(_) -> simple_async. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 4450030e0..b704fc92c 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -133,7 +133,7 @@ t_query_mode(CtConfig) -> end, fun(Trace) -> %% We should have a sync Snabbkaffe trace - ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace)) + ?assertMatch([_], ?of_kind(simple_sync_internal_buffer_query, Trace)) end ), ?check_trace( @@ -141,7 +141,7 @@ t_query_mode(CtConfig) -> publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"}) end, fun(Trace) -> - %% We should have a sync Snabbkaffe trace + %% We should have an async Snabbkaffe trace ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace)) end ), diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 6a90a1e0a..fe4112c3d 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -22,11 +22,17 @@ -type resource_state() :: term(). -type resource_status() :: connected | disconnected | connecting | stopped. -type callback_mode() :: always_sync | async_if_possible. --type query_mode() :: simple_sync | simple_async | sync | async | no_queries. +-type query_mode() :: + simple_sync + | simple_async + | simple_sync_internal_buffer + | sync + | async + | no_queries. -type result() :: term(). -type reply_fun() :: - {fun((result(), Args :: term()) -> any()), Args :: term()} - | {fun((result(), Args :: term()) -> any()), Args :: term(), reply_context()} + {fun((...) -> any()), Args :: [term()]} + | {fun((...) -> any()), Args :: [term()], reply_context()} | undefined. -type reply_context() :: #{reply_dropped => boolean()}. -type query_opts() :: #{ diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 92701a8c4..cf8c9df9e 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -311,6 +311,12 @@ query(ResId, Request, Opts) -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts); + {simple_sync_internal_buffer, _} -> + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again + emqx_resource_buffer_worker:simple_sync_internal_buffer_query( + ResId, Request, Opts + ); {sync, _} -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); {async, _} -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 9026fe65c..2fab04d62 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -39,7 +39,8 @@ -export([ simple_sync_query/2, simple_sync_query/3, - simple_async_query/3 + simple_async_query/3, + simple_sync_internal_buffer_query/3 ]). -export([ @@ -53,7 +54,9 @@ -export([queue_item_marshaller/1, estimate_size/1]). --export([handle_async_reply/2, handle_async_batch_reply/2, reply_call/2]). +-export([ + handle_async_reply/2, handle_async_batch_reply/2, reply_call/2, reply_call_internal_buffer/3 +]). -export([clear_disk_queue_dir/2]). @@ -169,6 +172,42 @@ simple_async_query(Id, Request, QueryOpts0) -> _ = handle_query_result(Id, Result, _HasBeenSent = false), Result. +%% This is a hack to handle cases where the underlying connector has internal buffering +%% (e.g.: Kafka and Pulsar producers). Since the message may be inernally retried at a +%% later time, we can't bump metrics immediatelly if the return value is not a success +%% (e.g.: if the call timed out, but the message was enqueued nevertheless). +-spec simple_sync_internal_buffer_query(id(), request(), query_opts()) -> term(). +simple_sync_internal_buffer_query(Id, Request, QueryOpts0) -> + ?tp(simple_sync_internal_buffer_query, #{id => Id, request => Request, query_opts => QueryOpts0}), + ReplyAlias = alias([reply]), + try + MaybeReplyTo = maps:get(reply_to, QueryOpts0, undefined), + QueryOpts1 = QueryOpts0#{ + reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]} + }, + QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1), + case simple_async_query(Id, Request, QueryOpts) of + {error, _} = Error -> + Error; + {async_return, {error, _} = Error} -> + Error; + {async_return, {ok, _Pid}} -> + receive + {ReplyAlias, Response} -> + Response + after Timeout -> + _ = unalias(ReplyAlias), + receive + {ReplyAlias, Response} -> + Response + after 0 -> {error, timeout} + end + end + end + after + _ = unalias(ReplyAlias) + end. + simple_query_opts() -> ensure_expire_at(#{simple_query => true, timeout => infinity}). @@ -1908,6 +1947,12 @@ reply_call(Alias, Response) -> erlang:send(Alias, {Alias, Response}), ok. +%% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to' +%% callbacks. +reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) -> + ?MODULE:reply_call(ReplyAlias, Response), + do_reply_caller(MaybeReplyTo, Response). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). adjust_batch_time_test_() -> diff --git a/changes/ee/fix-11724.en.md b/changes/ee/fix-11724.en.md new file mode 100644 index 000000000..633092357 --- /dev/null +++ b/changes/ee/fix-11724.en.md @@ -0,0 +1 @@ +Fixed a metrics issue where messages sent to Kafka would count as failed even when they were successfully sent late due to its internal buffering.