feat: add sync/async option to the Kafka producer bridge

This commit makes it possible to configure if a Kafka bridge should work
in query mode sync or async by setting the resource_opts.query_mode
configuration option.

Fixes:
https://emqx.atlassian.net/browse/EMQX-8631
This commit is contained in:
Kjell Winblad 2023-06-07 20:00:17 +02:00
parent b481994e74
commit 47fa17b3c1
4 changed files with 78 additions and 12 deletions

View File

@ -147,7 +147,7 @@ fields("config_producer") ->
fields("config_consumer") -> fields("config_consumer") ->
fields(kafka_consumer); fields(kafka_consumer);
fields(kafka_producer) -> fields(kafka_producer) ->
fields("config") ++ fields(producer_opts); fields("config") ++ fields(resource_opts) ++ fields(producer_opts);
fields(kafka_consumer) -> fields(kafka_consumer) ->
fields("config") ++ fields(consumer_opts); fields("config") ++ fields(consumer_opts);
fields("config") -> fields("config") ->
@ -294,7 +294,37 @@ fields(producer_kafka_opts) ->
mk(ref(producer_buffer), #{ mk(ref(producer_buffer), #{
required => false, required => false,
desc => ?DESC(producer_buffer) desc => ?DESC(producer_buffer)
})} })},
{query_mode_sync_timeout,
mk(
emqx_schema:duration_ms(),
#{
default => <<"5s">>,
desc => ?DESC(query_mode_sync_timeout)
}
)}
];
fields(resource_opts) ->
[
{resource_opts,
mk(
ref(?MODULE, resource_opts_fields),
#{
required => false,
desc => ?DESC(resource_opts)
}
)}
];
fields(resource_opts_fields) ->
[
{query_mode,
mk(
enum([async, sync]),
#{
default => async,
desc => ?DESC(query_mode)
}
)}
]; ];
fields(kafka_message) -> fields(kafka_message) ->
[ [
@ -410,7 +440,8 @@ struct_names() ->
producer_opts, producer_opts,
consumer_opts, consumer_opts,
consumer_kafka_opts, consumer_kafka_opts,
consumer_topic_mapping consumer_topic_mapping,
resource_opts_fields
]. ].
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------

View File

@ -43,7 +43,11 @@ on_start(InstId, Config) ->
bootstrap_hosts := Hosts0, bootstrap_hosts := Hosts0,
bridge_name := BridgeName, bridge_name := BridgeName,
connect_timeout := ConnTimeout, connect_timeout := ConnTimeout,
kafka := KafkaConfig = #{message := MessageTemplate, topic := KafkaTopic}, kafka := KafkaConfig = #{
message := MessageTemplate,
topic := KafkaTopic,
query_mode_sync_timeout := QueryModeSyncTimeout
},
metadata_request_timeout := MetaReqTimeout, metadata_request_timeout := MetaReqTimeout,
min_metadata_refresh_interval := MinMetaRefreshInterval, min_metadata_refresh_interval := MinMetaRefreshInterval,
socket_opts := SocketOpts, socket_opts := SocketOpts,
@ -99,7 +103,8 @@ on_start(InstId, Config) ->
client_id => ClientId, client_id => ClientId,
kafka_topic => KafkaTopic, kafka_topic => KafkaTopic,
producers => Producers, producers => Producers,
resource_id => ResourceId resource_id => ResourceId,
query_mode_sync_timeout => QueryModeSyncTimeout
}}; }};
{error, Reason2} -> {error, Reason2} ->
?SLOG(error, #{ ?SLOG(error, #{
@ -189,14 +194,15 @@ on_stop(InstanceId, _State) ->
on_query( on_query(
_InstId, _InstId,
{send_message, Message}, {send_message, Message},
#{message_template := Template, producers := Producers} #{
message_template := Template,
producers := Producers,
query_mode_sync_timeout := SyncTimeout
}
) -> ) ->
KafkaMessage = render_message(Template, Message), KafkaMessage = render_message(Template, Message),
%% TODO: this function is not used so far,
%% timeout should be configurable
%% or the on_query/3 should be on_query/4 instead.
try try
{_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], 5000), {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
ok ok
catch catch
error:{producer_down, _} = Reason -> error:{producer_down, _} = Reason ->

View File

@ -276,13 +276,13 @@ query(ResId, Request) ->
Result :: term(). Result :: term().
query(ResId, Request, Opts) -> query(ResId, Request, Opts) ->
case emqx_resource_manager:lookup_cached(ResId) of case emqx_resource_manager:lookup_cached(ResId) of
{ok, _Group, #{query_mode := QM, mod := Module}} -> {ok, _Group, #{query_mode := QM, mod := Module} = Config} ->
IsBufferSupported = is_buffer_supported(Module), IsBufferSupported = is_buffer_supported(Module),
case {IsBufferSupported, QM} of case {IsBufferSupported, QM} of
{true, _} -> {true, _} ->
%% only Kafka producer so far %% only Kafka producer so far
Opts1 = Opts#{is_buffer_supported => true}, Opts1 = Opts#{is_buffer_supported => true},
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); do_query_built_in_buffer(QM, ResId, Request, Opts1);
{false, sync} -> {false, sync} ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{false, async} -> {false, async} ->
@ -292,6 +292,11 @@ query(ResId, Request, Opts) ->
?RESOURCE_ERROR(not_found, "resource not found") ?RESOURCE_ERROR(not_found, "resource not found")
end. end.
do_query_built_in_buffer(async, ResId, Request, Opts1) ->
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1);
do_query_built_in_buffer(sync, ResId, Request, _Opts1) ->
emqx_resource_buffer_worker:simple_sync_query(ResId, Request).
-spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term().
simple_sync_query(ResId, Request) -> simple_sync_query(ResId, Request) ->
emqx_resource_buffer_worker:simple_sync_query(ResId, Request). emqx_resource_buffer_worker:simple_sync_query(ResId, Request).

View File

@ -358,4 +358,28 @@ compression.desc:
compression.label: compression.label:
"""Compression""" """Compression"""
query_mode.desc:
"""Query mode. Optional 'sync/async', default 'async'."""
query_mode.label:
"""Query mode"""
resource_opts.desc:
"""Resource options."""
resource_opts.label:
"""Resource Options"""
resource_opts_fields.desc:
"""Resource options."""
resource_opts_fields.label:
"""Resource Options"""
query_mode_sync_timeout.desc:
"""This parameter defines the timeout limit for synchronous queries. It applies only when the bridge query mode is configured to 'sync'."""
query_mode_sync_timeout.label:
"""Synchronous Query Timeout"""
} }