Merge pull request #10970 from kjellwinblad/kjell/feat/kafka_add_async_param/EMQX-8631

feat: add sync/async option to the Kafka producer bridge
This commit is contained in:
Kjell Winblad 2023-06-09 12:51:23 +02:00 committed by GitHub
commit 4215da12f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 166 additions and 99 deletions

View File

@ -17,7 +17,6 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -64,8 +63,6 @@ fields(config) ->
callback_mode() -> always_sync.
is_buffer_supported() -> false.
on_start(
InstanceId,
#{

View File

@ -22,8 +22,7 @@
on_query_async/4,
on_batch_query/3,
on_batch_query_async/4,
on_get_status/2,
is_buffer_supported/0
on_get_status/2
]).
-export([reply_delegator/3]).
@ -56,8 +55,6 @@
%% emqx_resource API
%%-------------------------------------------------------------------------------------------------
is_buffer_supported() -> false.
callback_mode() -> async_if_possible.
-spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}.

View File

@ -294,7 +294,23 @@ fields(producer_kafka_opts) ->
mk(ref(producer_buffer), #{
required => false,
desc => ?DESC(producer_buffer)
})}
})},
{query_mode,
mk(
enum([async, sync]),
#{
default => async,
desc => ?DESC(query_mode)
}
)},
{sync_query_timeout,
mk(
emqx_schema:timeout_duration_ms(),
#{
default => <<"5s">>,
desc => ?DESC(sync_query_timeout)
}
)}
];
fields(kafka_message) ->
[

View File

@ -8,7 +8,7 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
query_mode/1,
on_start/2,
on_stop/2,
on_get_status/2
@ -112,11 +112,9 @@
callback_mode() ->
async_if_possible.
%% there are no queries to be made to this bridge, so we say that
%% buffer is supported so we don't spawn unused resource buffer
%% workers.
is_buffer_supported() ->
true.
%% consumer bridges don't need resource workers
query_mode(_Config) ->
no_queries.
-spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(ResourceId, Config) ->

View File

@ -4,10 +4,11 @@
-module(emqx_bridge_kafka_impl_producer).
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
%% callbacks of behaviour emqx_resource
-export([
is_buffer_supported/0,
query_mode/1,
callback_mode/0,
on_start/2,
on_stop/2,
@ -32,7 +33,10 @@
%% to hocon; keeping this as just `kafka' for backwards compatibility.
-define(BRIDGE_TYPE, kafka).
is_buffer_supported() -> true.
query_mode(#{kafka := #{query_mode := sync}}) ->
simple_sync;
query_mode(_) ->
simple_async.
callback_mode() -> async_if_possible.
@ -43,7 +47,11 @@ on_start(InstId, Config) ->
bootstrap_hosts := Hosts0,
bridge_name := BridgeName,
connect_timeout := ConnTimeout,
kafka := KafkaConfig = #{message := MessageTemplate, topic := KafkaTopic},
kafka := KafkaConfig = #{
message := MessageTemplate,
topic := KafkaTopic,
sync_query_timeout := SyncQueryTimeout
},
metadata_request_timeout := MetaReqTimeout,
min_metadata_refresh_interval := MinMetaRefreshInterval,
socket_opts := SocketOpts,
@ -99,7 +107,8 @@ on_start(InstId, Config) ->
client_id => ClientId,
kafka_topic => KafkaTopic,
producers => Producers,
resource_id => ResourceId
resource_id => ResourceId,
sync_query_timeout => SyncQueryTimeout
}};
{error, Reason2} ->
?SLOG(error, #{
@ -189,14 +198,16 @@ on_stop(InstanceId, _State) ->
on_query(
_InstId,
{send_message, Message},
#{message_template := Template, producers := Producers}
#{
message_template := Template,
producers := Producers,
sync_query_timeout := SyncTimeout
}
) ->
?tp(emqx_bridge_kafka_impl_producer_sync_query, #{}),
KafkaMessage = render_message(Template, Message),
%% TODO: this function is not used so far,
%% timeout should be configurable
%% or the on_query/3 should be on_query/4 instead.
try
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], 5000),
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
ok
catch
error:{producer_down, _} = Reason ->
@ -217,6 +228,7 @@ on_query_async(
AsyncReplyFn,
#{message_template := Template, producers := Producers}
) ->
?tp(emqx_bridge_kafka_impl_producer_async_query, #{}),
KafkaMessage = render_message(Template, Message),
%% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
%% * Must be a single element batch because wolff books calls, but not batch sizes

View File

@ -120,6 +120,34 @@ set_special_configs(emqx_dashboard) ->
ok;
set_special_configs(_) ->
ok.
%%------------------------------------------------------------------------------
%% Test case for the query_mode parameter
%%------------------------------------------------------------------------------
t_query_mode(CtConfig) ->
%% We need this because on_query_async is in a different group
CtConfig1 = [{query_api, none} | CtConfig],
?check_trace(
begin
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "sync"})
end,
fun(RunStageResult, Trace) ->
%% We should have a sync Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace))
end
),
?check_trace(
begin
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"})
end,
fun(RunStageResult, Trace) ->
%% We should have a sync Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace))
end
),
ok.
%%------------------------------------------------------------------------------
%% Test cases for all combinations of SSL, no SSL and authentication types
%%------------------------------------------------------------------------------
@ -473,6 +501,16 @@ do_send(Ref, Config, ResourceId, Msg, State) when is_list(Config) ->
ok
end.
publish_with_config_template_parameters(CtConfig, ConfigTemplateParameters) ->
publish_helper(
CtConfig,
#{
auth_settings => "none",
ssl_settings => #{}
},
ConfigTemplateParameters
).
publish_with_and_without_ssl(CtConfig, AuthSettings) ->
publish_with_and_without_ssl(CtConfig, AuthSettings, #{}).
@ -537,21 +575,25 @@ publish_helper(
{ok, _} = emqx_bridge:create(
<<?BRIDGE_TYPE>>, list_to_binary(Name), Conf
),
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Partition = 0,
Msg = #{
clientid => BinTime,
payload => <<"payload">>,
timestamp => Time
},
{ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
ct:pal("base offset before testing ~p", [Offset0]),
{ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId),
ok = send(CtConfig, InstId, Msg, State),
{ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0),
case proplists:get_value(query_api, CtConfig) of
none ->
ok;
_ ->
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Msg = #{
clientid => BinTime,
payload => <<"payload">>,
timestamp => Time
},
{ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
ct:pal("base offset before testing ~p", [Offset0]),
{ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId),
ok = send(CtConfig, InstId, Msg, State),
{ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0)
end,
%% test that it forwards from local mqtt topic as well
{ok, Offset1} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition),
ct:pal("base offset before testing (2) ~p", [Offset1]),
@ -596,13 +638,15 @@ hocon_config(Args) ->
AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf),
SSLConf = maps:get("ssl", Args, #{}),
SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)),
QueryMode = maps:get("query_mode", Args, <<"async">>),
SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf),
Hocon = bbmustache:render(
iolist_to_binary(hocon_config_template()),
Args#{
"authentication" => AuthConfRendered,
"bridge_name" => Name,
"ssl" => SSLConfRendered
"ssl" => SSLConfRendered,
"query_mode" => QueryMode
}
),
Hocon.
@ -630,6 +674,7 @@ bridges.kafka.{{ bridge_name }} {
}
partition_strategy = {{ partition_strategy }}
topic = \"{{ kafka_topic }}\"
query_mode = {{ query_mode }}
}
metadata_request_timeout = 5s
min_metadata_refresh_interval = 3s

View File

@ -17,7 +17,6 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -49,8 +48,6 @@ fields(config) ->
callback_mode() -> always_sync.
is_buffer_supported() -> false.
on_start(
InstanceId,
#{

View File

@ -11,7 +11,7 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
query_mode/1,
on_start/2,
on_stop/2,
on_get_status/2,
@ -70,10 +70,8 @@
callback_mode() -> async_if_possible.
%% there are no queries to be made to this bridge, so we say that
%% buffer is supported so we don't spawn unused resource buffer
%% workers.
is_buffer_supported() -> true.
query_mode(_Config) ->
simple_async.
-spec on_start(resource_id(), config()) -> {ok, state()}.
on_start(InstanceId, Config) ->

View File

@ -34,7 +34,6 @@
%% Optional callbacks
on_get_status/2,
on_query/3,
is_buffer_supported/0,
on_batch_query/3
]).
@ -187,11 +186,6 @@ callback_mode() -> always_sync.
%% emqx_resource callback
-spec is_buffer_supported() -> boolean().
is_buffer_supported() ->
%% We want to make use of EMQX's buffer mechanism
false.
%% emqx_resource callback called when the resource is started
-spec on_start(resource_id(), term()) -> {ok, resource_state()} | {error, _}.

View File

@ -17,7 +17,6 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -86,8 +85,6 @@ servers() ->
callback_mode() -> always_sync.
is_buffer_supported() -> false.
on_start(
InstanceId,
#{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config

View File

@ -30,7 +30,6 @@
%% callbacks for behaviour emqx_resource
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -169,8 +168,6 @@ server() ->
callback_mode() -> always_sync.
is_buffer_supported() -> false.
on_start(
ResourceId = PoolName,
#{

View File

@ -17,7 +17,6 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -79,8 +78,6 @@ server() ->
callback_mode() -> always_sync.
is_buffer_supported() -> false.
on_start(
InstanceId,
#{

View File

@ -18,7 +18,6 @@
%% callbacks for behaviour emqx_resource
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -68,8 +67,6 @@
% be sync for now.
callback_mode() -> always_sync.
is_buffer_supported() -> false.
-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
on_start(
InstId,

View File

@ -22,7 +22,7 @@
-type resource_state() :: term().
-type resource_status() :: connected | disconnected | connecting | stopped.
-type callback_mode() :: always_sync | async_if_possible.
-type query_mode() :: async | sync | dynamic.
-type query_mode() :: async | sync | simple_async | simple_sync | dynamic.
-type result() :: term().
-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
-type query_opts() :: #{

View File

@ -100,7 +100,8 @@
call_health_check/3,
%% stop the instance
call_stop/3,
is_buffer_supported/1
%% get the query mode of the resource
query_mode/3
]).
%% list all the instances, id only.
@ -132,7 +133,7 @@
on_query_async/4,
on_batch_query_async/4,
on_get_status/2,
is_buffer_supported/0
query_mode/1
]).
%% when calling emqx_resource:start/1
@ -173,7 +174,8 @@
| {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()}.
-callback is_buffer_supported() -> boolean().
-callback query_mode(Config :: term()) ->
simple_sync | simple_async | sync | async | no_queries.
-spec list_types() -> [module()].
list_types() ->
@ -276,16 +278,20 @@ query(ResId, Request) ->
Result :: term().
query(ResId, Request, Opts) ->
case emqx_resource_manager:lookup_cached(ResId) of
{ok, _Group, #{query_mode := QM, mod := Module}} ->
IsBufferSupported = is_buffer_supported(Module),
case {IsBufferSupported, QM} of
{true, _} ->
%% only Kafka producer so far
{ok, _Group, #{query_mode := QM}} ->
case QM of
simple_async ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
Opts1 = Opts#{is_buffer_supported => true},
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
{false, sync} ->
simple_sync ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
sync ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{false, async} ->
async ->
emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
end;
{error, not_found} ->
@ -367,15 +373,6 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
get_callback_mode(Mod) ->
Mod:callback_mode().
-spec is_buffer_supported(module()) -> boolean().
is_buffer_supported(Module) ->
try
Module:is_buffer_supported()
catch
_:_ ->
false
end.
-spec call_start(resource_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}.
call_start(ResId, Mod, Config) ->
@ -416,6 +413,17 @@ call_stop(ResId, Mod, ResourceState) ->
Res
end).
-spec query_mode(module(), term(), creation_opts()) ->
simple_sync | simple_async | sync | async | no_queries.
query_mode(Mod, Config, Opts) ->
case erlang:function_exported(Mod, query_mode, 1) of
true ->
Mod:query_mode(Config);
false ->
maps:get(query_mode, Opts, sync)
end.
-spec check_config(resource_type(), raw_resource_config()) ->
{ok, resource_config()} | {error, term()}.
check_config(ResourceType, Conf) ->

View File

@ -144,12 +144,18 @@ create(ResId, Group, ResourceType, Config, Opts) ->
],
[matched]
),
case emqx_resource:is_buffer_supported(ResourceType) of
true ->
%% the resource it self supports
%% buffer, so there is no need for resource workers
QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts),
case QueryMode of
%% the resource has built-in buffer, so there is no need for resource workers
simple_sync ->
ok;
false ->
simple_async ->
ok;
%% The resource is a consumer resource, so there is no need for resource workers
no_queries ->
ok;
_ ->
%% start resource workers as the query type requires them
ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts),
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
true ->
@ -288,16 +294,17 @@ health_check(ResId) ->
%% @doc Function called from the supervisor to actually start the server
start_link(ResId, Group, ResourceType, Config, Opts) ->
QueryMode = emqx_resource:query_mode(
ResourceType,
Config,
Opts
),
Data = #data{
id = ResId,
group = Group,
mod = ResourceType,
callback_mode = emqx_resource:get_callback_mode(ResourceType),
%% query_mode = dynamic | sync | async
%% TODO:
%% dynamic mode is async mode when things are going well, but becomes sync mode
%% if the resource worker is overloaded
query_mode = maps:get(query_mode, Opts, sync),
query_mode = QueryMode,
config = Config,
opts = Opts,
state = undefined,

View File

@ -0,0 +1 @@
A query_mode parameter has been added to the Kafka producer bridge. This parameter allows you to specify if the bridge should use the asynchronous or synchronous mode when sending data to Kafka. The default is asynchronous mode.

View File

@ -1,6 +1,6 @@
{application, emqx_ee_connector, [
{description, "EMQX Enterprise connectors"},
{vsn, "0.1.13"},
{vsn, "0.1.14"},
{registered, []},
{applications, [
kernel,

View File

@ -15,7 +15,6 @@
%% `emqx_resource' API
-export([
callback_mode/0,
is_buffer_supported/0,
on_start/2,
on_stop/2,
on_query/3,
@ -28,8 +27,6 @@
callback_mode() -> emqx_connector_mongo:callback_mode().
is_buffer_supported() -> false.
on_start(InstanceId, Config) ->
case emqx_connector_mongo:on_start(InstanceId, Config) of
{ok, ConnectorState} ->

View File

@ -358,4 +358,16 @@ compression.desc:
compression.label:
"""Compression"""
query_mode.desc:
"""Query mode. Optional 'sync/async', default 'async'."""
query_mode.label:
"""Query mode"""
sync_query_timeout.desc:
"""This parameter defines the timeout limit for synchronous queries. It applies only when the bridge query mode is configured to 'sync'."""
sync_query_timeout.label:
"""Synchronous Query Timeout"""
}