diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 64f2394c4..9cabaf5e9 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -147,7 +147,7 @@ fields("config_producer") -> fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> - fields("config") ++ fields(producer_opts); + fields("config") ++ fields(resource_opts) ++ fields(producer_opts); fields(kafka_consumer) -> fields("config") ++ fields(consumer_opts); fields("config") -> @@ -294,7 +294,37 @@ fields(producer_kafka_opts) -> mk(ref(producer_buffer), #{ required => false, desc => ?DESC(producer_buffer) - })} + })}, + {query_mode_sync_timeout, + mk( + emqx_schema:duration_ms(), + #{ + default => <<"5s">>, + desc => ?DESC(query_mode_sync_timeout) + } + )} + ]; +fields(resource_opts) -> + [ + {resource_opts, + mk( + ref(?MODULE, resource_opts_fields), + #{ + required => false, + desc => ?DESC(resource_opts) + } + )} + ]; +fields(resource_opts_fields) -> + [ + {query_mode, + mk( + enum([async, sync]), + #{ + default => async, + desc => ?DESC(query_mode) + } + )} ]; fields(kafka_message) -> [ @@ -410,7 +440,8 @@ struct_names() -> producer_opts, consumer_opts, consumer_kafka_opts, - consumer_topic_mapping + consumer_topic_mapping, + resource_opts_fields ]. %% ------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 8b8337b09..8be506efd 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -43,7 +43,11 @@ on_start(InstId, Config) -> bootstrap_hosts := Hosts0, bridge_name := BridgeName, connect_timeout := ConnTimeout, - kafka := KafkaConfig = #{message := MessageTemplate, topic := KafkaTopic}, + kafka := KafkaConfig = #{ + message := MessageTemplate, + topic := KafkaTopic, + query_mode_sync_timeout := QueryModeSyncTimeout + }, metadata_request_timeout := MetaReqTimeout, min_metadata_refresh_interval := MinMetaRefreshInterval, socket_opts := SocketOpts, @@ -99,7 +103,8 @@ on_start(InstId, Config) -> client_id => ClientId, kafka_topic => KafkaTopic, producers => Producers, - resource_id => ResourceId + resource_id => ResourceId, + query_mode_sync_timeout => QueryModeSyncTimeout }}; {error, Reason2} -> ?SLOG(error, #{ @@ -189,14 +194,15 @@ on_stop(InstanceId, _State) -> on_query( _InstId, {send_message, Message}, - #{message_template := Template, producers := Producers} + #{ + message_template := Template, + producers := Producers, + query_mode_sync_timeout := SyncTimeout + } ) -> KafkaMessage = render_message(Template, Message), - %% TODO: this function is not used so far, - %% timeout should be configurable - %% or the on_query/3 should be on_query/4 instead. try - {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], 5000), + {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), ok catch error:{producer_down, _} = Reason -> diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 0dbc3067f..c700cfd86 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -276,13 +276,13 @@ query(ResId, Request) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:lookup_cached(ResId) of - {ok, _Group, #{query_mode := QM, mod := Module}} -> + {ok, _Group, #{query_mode := QM, mod := Module} = Config} -> IsBufferSupported = is_buffer_supported(Module), case {IsBufferSupported, QM} of {true, _} -> %% only Kafka producer so far Opts1 = Opts#{is_buffer_supported => true}, - emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); + do_query_built_in_buffer(QM, ResId, Request, Opts1); {false, sync} -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); {false, async} -> @@ -292,6 +292,11 @@ query(ResId, Request, Opts) -> ?RESOURCE_ERROR(not_found, "resource not found") end. +do_query_built_in_buffer(async, ResId, Request, Opts1) -> + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); +do_query_built_in_buffer(sync, ResId, Request, _Opts1) -> + emqx_resource_buffer_worker:simple_sync_query(ResId, Request). + -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). simple_sync_query(ResId, Request) -> emqx_resource_buffer_worker:simple_sync_query(ResId, Request). diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index ef2e27972..740d94984 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -358,4 +358,28 @@ compression.desc: compression.label: """Compression""" +query_mode.desc: +"""Query mode. Optional 'sync/async', default 'async'.""" + +query_mode.label: +"""Query mode""" + +resource_opts.desc: +"""Resource options.""" + +resource_opts.label: +"""Resource Options""" + +resource_opts_fields.desc: +"""Resource options.""" + +resource_opts_fields.label: +"""Resource Options""" + +query_mode_sync_timeout.desc: +"""This parameter defines the timeout limit for synchronous queries. It applies only when the bridge query mode is configured to 'sync'.""" + +query_mode_sync_timeout.label: +"""Synchronous Query Timeout""" + }