Merge pull request #11724 from thalesmg/fix-kprodu-sync-metrics-m-20231006

fix({kafka,pulsar}_producer): correctly handle metrics for connectors that have internal buffers
This commit is contained in:
Thales Macedo Garitezi 2023-10-10 12:13:33 -03:00 committed by GitHub
commit ee45145fb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 81 additions and 15 deletions

View File

@ -32,9 +32,9 @@
-define(kafka_producers, kafka_producers). -define(kafka_producers, kafka_producers).
query_mode(#{kafka := #{query_mode := sync}}) -> query_mode(#{kafka := #{query_mode := sync}}) ->
simple_sync; simple_sync_internal_buffer;
query_mode(_) -> query_mode(_) ->
simple_async. simple_async_internal_buffer.
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.

View File

@ -133,7 +133,7 @@ t_query_mode(CtConfig) ->
end, end,
fun(Trace) -> fun(Trace) ->
%% We should have a sync Snabbkaffe trace %% We should have a sync Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace)) ?assertMatch([_], ?of_kind(simple_sync_internal_buffer_query, Trace))
end end
), ),
?check_trace( ?check_trace(
@ -141,7 +141,7 @@ t_query_mode(CtConfig) ->
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"}) publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"})
end, end,
fun(Trace) -> fun(Trace) ->
%% We should have a sync Snabbkaffe trace %% We should have an async Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace)) ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace))
end end
), ),

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [ {application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"}, {description, "EMQX Pulsar Bridge"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -73,7 +73,7 @@
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
query_mode(_Config) -> query_mode(_Config) ->
simple_async. simple_async_internal_buffer.
-spec on_start(resource_id(), config()) -> {ok, state()}. -spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) -> on_start(InstanceId, Config) ->

View File

@ -22,11 +22,18 @@
-type resource_state() :: term(). -type resource_state() :: term().
-type resource_status() :: connected | disconnected | connecting | stopped. -type resource_status() :: connected | disconnected | connecting | stopped.
-type callback_mode() :: always_sync | async_if_possible. -type callback_mode() :: always_sync | async_if_possible.
-type query_mode() :: simple_sync | simple_async | sync | async | no_queries. -type query_mode() ::
simple_sync
| simple_async
| simple_sync_internal_buffer
| simple_async_internal_buffer
| sync
| async
| no_queries.
-type result() :: term(). -type result() :: term().
-type reply_fun() :: -type reply_fun() ::
{fun((result(), Args :: term()) -> any()), Args :: term()} {fun((...) -> any()), Args :: [term()]}
| {fun((result(), Args :: term()) -> any()), Args :: term(), reply_context()} | {fun((...) -> any()), Args :: [term()], reply_context()}
| undefined. | undefined.
-type reply_context() :: #{reply_dropped => boolean()}. -type reply_context() :: #{reply_dropped => boolean()}.
-type query_opts() :: #{ -type query_opts() :: #{
@ -36,7 +43,6 @@
expire_at => infinity | integer(), expire_at => infinity | integer(),
async_reply_fun => reply_fun(), async_reply_fun => reply_fun(),
simple_query => boolean(), simple_query => boolean(),
is_buffer_supported => boolean(),
reply_to => reply_fun() reply_to => reply_fun()
}. }.
-type resource_data() :: #{ -type resource_data() :: #{

View File

@ -311,6 +311,20 @@ query(ResId, Request, Opts) ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again %% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts); emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
{simple_async_internal_buffer, _} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
{simple_sync_internal_buffer, _} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_internal_buffer_query(
ResId, Request, Opts
);
{sync, _} -> {sync, _} ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{async, _} -> {async, _} ->

View File

@ -39,7 +39,8 @@
-export([ -export([
simple_sync_query/2, simple_sync_query/2,
simple_sync_query/3, simple_sync_query/3,
simple_async_query/3 simple_async_query/3,
simple_sync_internal_buffer_query/3
]). ]).
-export([ -export([
@ -53,7 +54,9 @@
-export([queue_item_marshaller/1, estimate_size/1]). -export([queue_item_marshaller/1, estimate_size/1]).
-export([handle_async_reply/2, handle_async_batch_reply/2, reply_call/2]). -export([
handle_async_reply/2, handle_async_batch_reply/2, reply_call/2, reply_call_internal_buffer/3
]).
-export([clear_disk_queue_dir/2]). -export([clear_disk_queue_dir/2]).
@ -169,6 +172,42 @@ simple_async_query(Id, Request, QueryOpts0) ->
_ = handle_query_result(Id, Result, _HasBeenSent = false), _ = handle_query_result(Id, Result, _HasBeenSent = false),
Result. Result.
%% This is a hack to handle cases where the underlying connector has internal buffering
%% (e.g.: Kafka and Pulsar producers). Since the message may be inernally retried at a
%% later time, we can't bump metrics immediatelly if the return value is not a success
%% (e.g.: if the call timed out, but the message was enqueued nevertheless).
-spec simple_sync_internal_buffer_query(id(), request(), query_opts()) -> term().
simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
?tp(simple_sync_internal_buffer_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
ReplyAlias = alias([reply]),
try
MaybeReplyTo = maps:get(reply_to, QueryOpts0, undefined),
QueryOpts1 = QueryOpts0#{
reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]}
},
QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
case simple_async_query(Id, Request, QueryOpts) of
{error, _} = Error ->
Error;
{async_return, {error, _} = Error} ->
Error;
{async_return, {ok, _Pid}} ->
receive
{ReplyAlias, Response} ->
Response
after Timeout ->
_ = unalias(ReplyAlias),
receive
{ReplyAlias, Response} ->
Response
after 0 -> {error, timeout}
end
end
end
after
_ = unalias(ReplyAlias)
end.
simple_query_opts() -> simple_query_opts() ->
ensure_expire_at(#{simple_query => true, timeout => infinity}). ensure_expire_at(#{simple_query => true, timeout => infinity}).
@ -1049,7 +1088,7 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
end. end.
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
ResQM =:= simple_async; ResQM =:= simple_sync ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
-> ->
%% The connector supports buffer, send even in disconnected state %% The connector supports buffer, send even in disconnected state
#{mod := Mod, state := ResSt, callback_mode := CBM} = Resource, #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
@ -1908,6 +1947,12 @@ reply_call(Alias, Response) ->
erlang:send(Alias, {Alias, Response}), erlang:send(Alias, {Alias, Response}),
ok. ok.
%% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to'
%% callbacks.
reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) ->
?MODULE:reply_call(ReplyAlias, Response),
do_reply_caller(MaybeReplyTo, Response).
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
adjust_batch_time_test_() -> adjust_batch_time_test_() ->

View File

@ -147,9 +147,9 @@ create(ResId, Group, ResourceType, Config, Opts) ->
QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts), QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts),
case QueryMode of case QueryMode of
%% the resource has built-in buffer, so there is no need for resource workers %% the resource has built-in buffer, so there is no need for resource workers
simple_sync -> simple_sync_internal_buffer ->
ok; ok;
simple_async -> simple_async_internal_buffer ->
ok; ok;
%% The resource is a consumer resource, so there is no need for resource workers %% The resource is a consumer resource, so there is no need for resource workers
no_queries -> no_queries ->

View File

@ -0,0 +1 @@
Fixed a metrics issue where messages sent to Kafka would count as failed even when they were successfully sent late due to its internal buffering.