fix(kafka_producer): correctly handle metrics for connector that have internal buffers
Fixes https://emqx.atlassian.net/browse/EMQX-11086 There’s currently a metric inconsistency due to the internal buffering nature of Kafka Producer (wolff). We use simple_sync_query to call the Kafka Producer bridge. If that times out, the call is accounted as failed, even though the message is buffered in wolff and later sent successfully.
This commit is contained in:
parent
c60915293a
commit
79cf0a2ced
|
@ -32,7 +32,7 @@
|
||||||
-define(kafka_producers, kafka_producers).
|
-define(kafka_producers, kafka_producers).
|
||||||
|
|
||||||
query_mode(#{kafka := #{query_mode := sync}}) ->
|
query_mode(#{kafka := #{query_mode := sync}}) ->
|
||||||
simple_sync;
|
simple_sync_internal_buffer;
|
||||||
query_mode(_) ->
|
query_mode(_) ->
|
||||||
simple_async.
|
simple_async.
|
||||||
|
|
||||||
|
|
|
@ -133,7 +133,7 @@ t_query_mode(CtConfig) ->
|
||||||
end,
|
end,
|
||||||
fun(Trace) ->
|
fun(Trace) ->
|
||||||
%% We should have a sync Snabbkaffe trace
|
%% We should have a sync Snabbkaffe trace
|
||||||
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace))
|
?assertMatch([_], ?of_kind(simple_sync_internal_buffer_query, Trace))
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
?check_trace(
|
?check_trace(
|
||||||
|
@ -141,7 +141,7 @@ t_query_mode(CtConfig) ->
|
||||||
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"})
|
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"})
|
||||||
end,
|
end,
|
||||||
fun(Trace) ->
|
fun(Trace) ->
|
||||||
%% We should have a sync Snabbkaffe trace
|
%% We should have an async Snabbkaffe trace
|
||||||
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace))
|
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace))
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
|
|
|
@ -22,11 +22,17 @@
|
||||||
-type resource_state() :: term().
|
-type resource_state() :: term().
|
||||||
-type resource_status() :: connected | disconnected | connecting | stopped.
|
-type resource_status() :: connected | disconnected | connecting | stopped.
|
||||||
-type callback_mode() :: always_sync | async_if_possible.
|
-type callback_mode() :: always_sync | async_if_possible.
|
||||||
-type query_mode() :: simple_sync | simple_async | sync | async | no_queries.
|
-type query_mode() ::
|
||||||
|
simple_sync
|
||||||
|
| simple_async
|
||||||
|
| simple_sync_internal_buffer
|
||||||
|
| sync
|
||||||
|
| async
|
||||||
|
| no_queries.
|
||||||
-type result() :: term().
|
-type result() :: term().
|
||||||
-type reply_fun() ::
|
-type reply_fun() ::
|
||||||
{fun((result(), Args :: term()) -> any()), Args :: term()}
|
{fun((...) -> any()), Args :: [term()]}
|
||||||
| {fun((result(), Args :: term()) -> any()), Args :: term(), reply_context()}
|
| {fun((...) -> any()), Args :: [term()], reply_context()}
|
||||||
| undefined.
|
| undefined.
|
||||||
-type reply_context() :: #{reply_dropped => boolean()}.
|
-type reply_context() :: #{reply_dropped => boolean()}.
|
||||||
-type query_opts() :: #{
|
-type query_opts() :: #{
|
||||||
|
|
|
@ -311,6 +311,12 @@ query(ResId, Request, Opts) ->
|
||||||
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
|
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
|
||||||
%% so the buffer worker does not need to lookup the cache again
|
%% so the buffer worker does not need to lookup the cache again
|
||||||
emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
|
emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
|
||||||
|
{simple_sync_internal_buffer, _} ->
|
||||||
|
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
|
||||||
|
%% so the buffer worker does not need to lookup the cache again
|
||||||
|
emqx_resource_buffer_worker:simple_sync_internal_buffer_query(
|
||||||
|
ResId, Request, Opts
|
||||||
|
);
|
||||||
{sync, _} ->
|
{sync, _} ->
|
||||||
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
|
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
|
||||||
{async, _} ->
|
{async, _} ->
|
||||||
|
|
|
@ -39,7 +39,8 @@
|
||||||
-export([
|
-export([
|
||||||
simple_sync_query/2,
|
simple_sync_query/2,
|
||||||
simple_sync_query/3,
|
simple_sync_query/3,
|
||||||
simple_async_query/3
|
simple_async_query/3,
|
||||||
|
simple_sync_internal_buffer_query/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -53,7 +54,9 @@
|
||||||
|
|
||||||
-export([queue_item_marshaller/1, estimate_size/1]).
|
-export([queue_item_marshaller/1, estimate_size/1]).
|
||||||
|
|
||||||
-export([handle_async_reply/2, handle_async_batch_reply/2, reply_call/2]).
|
-export([
|
||||||
|
handle_async_reply/2, handle_async_batch_reply/2, reply_call/2, reply_call_internal_buffer/3
|
||||||
|
]).
|
||||||
|
|
||||||
-export([clear_disk_queue_dir/2]).
|
-export([clear_disk_queue_dir/2]).
|
||||||
|
|
||||||
|
@ -169,6 +172,42 @@ simple_async_query(Id, Request, QueryOpts0) ->
|
||||||
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
|
%% This is a hack to handle cases where the underlying connector has internal buffering
|
||||||
|
%% (e.g.: Kafka and Pulsar producers). Since the message may be inernally retried at a
|
||||||
|
%% later time, we can't bump metrics immediatelly if the return value is not a success
|
||||||
|
%% (e.g.: if the call timed out, but the message was enqueued nevertheless).
|
||||||
|
-spec simple_sync_internal_buffer_query(id(), request(), query_opts()) -> term().
|
||||||
|
simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
|
||||||
|
?tp(simple_sync_internal_buffer_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
|
||||||
|
ReplyAlias = alias([reply]),
|
||||||
|
try
|
||||||
|
MaybeReplyTo = maps:get(reply_to, QueryOpts0, undefined),
|
||||||
|
QueryOpts1 = QueryOpts0#{
|
||||||
|
reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]}
|
||||||
|
},
|
||||||
|
QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
|
||||||
|
case simple_async_query(Id, Request, QueryOpts) of
|
||||||
|
{error, _} = Error ->
|
||||||
|
Error;
|
||||||
|
{async_return, {error, _} = Error} ->
|
||||||
|
Error;
|
||||||
|
{async_return, {ok, _Pid}} ->
|
||||||
|
receive
|
||||||
|
{ReplyAlias, Response} ->
|
||||||
|
Response
|
||||||
|
after Timeout ->
|
||||||
|
_ = unalias(ReplyAlias),
|
||||||
|
receive
|
||||||
|
{ReplyAlias, Response} ->
|
||||||
|
Response
|
||||||
|
after 0 -> {error, timeout}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
after
|
||||||
|
_ = unalias(ReplyAlias)
|
||||||
|
end.
|
||||||
|
|
||||||
simple_query_opts() ->
|
simple_query_opts() ->
|
||||||
ensure_expire_at(#{simple_query => true, timeout => infinity}).
|
ensure_expire_at(#{simple_query => true, timeout => infinity}).
|
||||||
|
|
||||||
|
@ -1908,6 +1947,12 @@ reply_call(Alias, Response) ->
|
||||||
erlang:send(Alias, {Alias, Response}),
|
erlang:send(Alias, {Alias, Response}),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to'
|
||||||
|
%% callbacks.
|
||||||
|
reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) ->
|
||||||
|
?MODULE:reply_call(ReplyAlias, Response),
|
||||||
|
do_reply_caller(MaybeReplyTo, Response).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
adjust_batch_time_test_() ->
|
adjust_batch_time_test_() ->
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed a metrics issue where messages sent to Kafka would count as failed even when they were successfully sent late due to its internal buffering.
|
Loading…
Reference in New Issue