From 47fa17b3c17344fa14bd47a3532af4e39c297b15 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 7 Jun 2023 20:00:17 +0200 Subject: [PATCH 1/8] feat: add sync/async option to the Kafka producer bridge This commit makes it possible to configure if a Kafka bridge should work in query mode sync or async by setting the resource_opts.query_mode configuration option. Fixes: https://emqx.atlassian.net/browse/EMQX-8631 --- .../src/emqx_bridge_kafka.erl | 37 +++++++++++++++++-- .../src/emqx_bridge_kafka_impl_producer.erl | 20 ++++++---- apps/emqx_resource/src/emqx_resource.erl | 9 ++++- rel/i18n/emqx_bridge_kafka.hocon | 24 ++++++++++++ 4 files changed, 78 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 64f2394c4..9cabaf5e9 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -147,7 +147,7 @@ fields("config_producer") -> fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> - fields("config") ++ fields(producer_opts); + fields("config") ++ fields(resource_opts) ++ fields(producer_opts); fields(kafka_consumer) -> fields("config") ++ fields(consumer_opts); fields("config") -> @@ -294,7 +294,37 @@ fields(producer_kafka_opts) -> mk(ref(producer_buffer), #{ required => false, desc => ?DESC(producer_buffer) - })} + })}, + {query_mode_sync_timeout, + mk( + emqx_schema:duration_ms(), + #{ + default => <<"5s">>, + desc => ?DESC(query_mode_sync_timeout) + } + )} + ]; +fields(resource_opts) -> + [ + {resource_opts, + mk( + ref(?MODULE, resource_opts_fields), + #{ + required => false, + desc => ?DESC(resource_opts) + } + )} + ]; +fields(resource_opts_fields) -> + [ + {query_mode, + mk( + enum([async, sync]), + #{ + default => async, + desc => ?DESC(query_mode) + } + )} ]; fields(kafka_message) -> [ @@ -410,7 +440,8 @@ struct_names() -> producer_opts, consumer_opts, consumer_kafka_opts, - consumer_topic_mapping + consumer_topic_mapping, + resource_opts_fields ]. %% ------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 8b8337b09..8be506efd 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -43,7 +43,11 @@ on_start(InstId, Config) -> bootstrap_hosts := Hosts0, bridge_name := BridgeName, connect_timeout := ConnTimeout, - kafka := KafkaConfig = #{message := MessageTemplate, topic := KafkaTopic}, + kafka := KafkaConfig = #{ + message := MessageTemplate, + topic := KafkaTopic, + query_mode_sync_timeout := QueryModeSyncTimeout + }, metadata_request_timeout := MetaReqTimeout, min_metadata_refresh_interval := MinMetaRefreshInterval, socket_opts := SocketOpts, @@ -99,7 +103,8 @@ on_start(InstId, Config) -> client_id => ClientId, kafka_topic => KafkaTopic, producers => Producers, - resource_id => ResourceId + resource_id => ResourceId, + query_mode_sync_timeout => QueryModeSyncTimeout }}; {error, Reason2} -> ?SLOG(error, #{ @@ -189,14 +194,15 @@ on_stop(InstanceId, _State) -> on_query( _InstId, {send_message, Message}, - #{message_template := Template, producers := Producers} + #{ + message_template := Template, + producers := Producers, + query_mode_sync_timeout := SyncTimeout + } ) -> KafkaMessage = render_message(Template, Message), - %% TODO: this function is not used so far, - %% timeout should be configurable - %% or the on_query/3 should be on_query/4 instead. try - {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], 5000), + {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), ok catch error:{producer_down, _} = Reason -> diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 0dbc3067f..c700cfd86 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -276,13 +276,13 @@ query(ResId, Request) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:lookup_cached(ResId) of - {ok, _Group, #{query_mode := QM, mod := Module}} -> + {ok, _Group, #{query_mode := QM, mod := Module} = Config} -> IsBufferSupported = is_buffer_supported(Module), case {IsBufferSupported, QM} of {true, _} -> %% only Kafka producer so far Opts1 = Opts#{is_buffer_supported => true}, - emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); + do_query_built_in_buffer(QM, ResId, Request, Opts1); {false, sync} -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); {false, async} -> @@ -292,6 +292,11 @@ query(ResId, Request, Opts) -> ?RESOURCE_ERROR(not_found, "resource not found") end. +do_query_built_in_buffer(async, ResId, Request, Opts1) -> + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); +do_query_built_in_buffer(sync, ResId, Request, _Opts1) -> + emqx_resource_buffer_worker:simple_sync_query(ResId, Request). + -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). simple_sync_query(ResId, Request) -> emqx_resource_buffer_worker:simple_sync_query(ResId, Request). diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index ef2e27972..740d94984 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -358,4 +358,28 @@ compression.desc: compression.label: """Compression""" +query_mode.desc: +"""Query mode. Optional 'sync/async', default 'async'.""" + +query_mode.label: +"""Query mode""" + +resource_opts.desc: +"""Resource options.""" + +resource_opts.label: +"""Resource Options""" + +resource_opts_fields.desc: +"""Resource options.""" + +resource_opts_fields.label: +"""Resource Options""" + +query_mode_sync_timeout.desc: +"""This parameter defines the timeout limit for synchronous queries. It applies only when the bridge query mode is configured to 'sync'.""" + +query_mode_sync_timeout.label: +"""Synchronous Query Timeout""" + } From ed9e29e769a50164df9e22817200925476da5af5 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 8 Jun 2023 00:57:42 +0200 Subject: [PATCH 2/8] refactor: refacor query_mode detection code This commit refactor the query_mode resource detection code according to a suggestion from @zmstone. This commit should not contain any functional change except for a change of the Kafka producer bridge config. --- .../src/emqx_bridge_dynamo_connector.erl | 3 -- .../src/emqx_bridge_gcp_pubsub_connector.erl | 5 +- .../src/emqx_bridge_kafka.erl | 35 ++++-------- .../src/emqx_bridge_kafka_impl_consumer.erl | 10 ++-- .../src/emqx_bridge_kafka_impl_producer.erl | 8 ++- .../src/emqx_bridge_opents_connector.erl | 3 -- .../src/emqx_bridge_pulsar_impl_producer.erl | 9 ++-- .../src/emqx_bridge_rabbitmq_connector.erl | 6 --- .../src/emqx_bridge_rocketmq_connector.erl | 3 -- .../src/emqx_bridge_sqlserver_connector.erl | 3 -- .../src/emqx_bridge_tdengine_connector.erl | 3 -- apps/emqx_oracle/src/emqx_oracle.erl | 3 -- apps/emqx_resource/src/emqx_resource.erl | 53 ++++++++++--------- .../src/emqx_resource_manager.erl | 30 +++++++---- .../src/emqx_ee_connector.app.src | 2 +- .../src/emqx_ee_connector_mongodb.erl | 3 -- 16 files changed, 74 insertions(+), 105 deletions(-) diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 0cc3f993b..214d6b642 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -64,8 +63,6 @@ fields(config) -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{ diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index 65a0336ec..18cfca1d9 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -22,8 +22,7 @@ on_query_async/4, on_batch_query/3, on_batch_query_async/4, - on_get_status/2, - is_buffer_supported/0 + on_get_status/2 ]). -export([reply_delegator/3]). @@ -56,8 +55,6 @@ %% emqx_resource API %%------------------------------------------------------------------------------------------------- -is_buffer_supported() -> false. - callback_mode() -> async_if_possible. -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 9cabaf5e9..54cddb735 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -147,7 +147,7 @@ fields("config_producer") -> fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> - fields("config") ++ fields(resource_opts) ++ fields(producer_opts); + fields("config") ++ fields(producer_opts); fields(kafka_consumer) -> fields("config") ++ fields(consumer_opts); fields("config") -> @@ -295,28 +295,6 @@ fields(producer_kafka_opts) -> required => false, desc => ?DESC(producer_buffer) })}, - {query_mode_sync_timeout, - mk( - emqx_schema:duration_ms(), - #{ - default => <<"5s">>, - desc => ?DESC(query_mode_sync_timeout) - } - )} - ]; -fields(resource_opts) -> - [ - {resource_opts, - mk( - ref(?MODULE, resource_opts_fields), - #{ - required => false, - desc => ?DESC(resource_opts) - } - )} - ]; -fields(resource_opts_fields) -> - [ {query_mode, mk( enum([async, sync]), @@ -324,6 +302,14 @@ fields(resource_opts_fields) -> default => async, desc => ?DESC(query_mode) } + )}, + {query_mode_sync_timeout, + mk( + emqx_schema:duration_ms(), + #{ + default => <<"5s">>, + desc => ?DESC(query_mode_sync_timeout) + } )} ]; fields(kafka_message) -> @@ -440,8 +426,7 @@ struct_names() -> producer_opts, consumer_opts, consumer_kafka_opts, - consumer_topic_mapping, - resource_opts_fields + consumer_topic_mapping ]. %% ------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c0de23d94..5f59b4756 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -8,7 +8,7 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, + query_mode/1, on_start/2, on_stop/2, on_get_status/2 @@ -112,11 +112,9 @@ callback_mode() -> async_if_possible. -%% there are no queries to be made to this bridge, so we say that -%% buffer is supported so we don't spawn unused resource buffer -%% workers. -is_buffer_supported() -> - true. +%% consumer bridges don't need resource workers +query_mode(_Config) -> + no_queries. -spec on_start(resource_id(), config()) -> {ok, state()}. on_start(ResourceId, Config) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 8be506efd..eee0cdd99 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -4,10 +4,11 @@ -module(emqx_bridge_kafka_impl_producer). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). %% callbacks of behaviour emqx_resource -export([ - is_buffer_supported/0, + query_mode/1, callback_mode/0, on_start/2, on_stop/2, @@ -32,7 +33,10 @@ %% to hocon; keeping this as just `kafka' for backwards compatibility. -define(BRIDGE_TYPE, kafka). -is_buffer_supported() -> true. +query_mode(#{kafka := #{query_mode := sync}}) -> + simple_sync; +query_mode(_) -> + simple_async. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 71184e872..887031e2e 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -49,8 +48,6 @@ fields(config) -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{ diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index b8157d4fc..1d93ffd0d 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -11,7 +11,7 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, + query_mode/1, on_start/2, on_stop/2, on_get_status/2, @@ -70,10 +70,9 @@ callback_mode() -> async_if_possible. -%% there are no queries to be made to this bridge, so we say that -%% buffer is supported so we don't spawn unused resource buffer -%% workers. -is_buffer_supported() -> true. +%% consumer bridges don't need resource workers +query_mode(_Config) -> + no_queries. -spec on_start(resource_id(), config()) -> {ok, state()}. on_start(InstanceId, Config) -> diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 0dc73b5f6..251158029 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -34,7 +34,6 @@ %% Optional callbacks on_get_status/2, on_query/3, - is_buffer_supported/0, on_batch_query/3 ]). @@ -187,11 +186,6 @@ callback_mode() -> always_sync. %% emqx_resource callback --spec is_buffer_supported() -> boolean(). -is_buffer_supported() -> - %% We want to make use of EMQX's buffer mechanism - false. - %% emqx_resource callback called when the resource is started -spec on_start(resource_id(), term()) -> {ok, resource_state()} | {error, _}. diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index a7d01960e..d9a9eae3b 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -86,8 +85,6 @@ servers() -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 52bd910db..d814e6205 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -30,7 +30,6 @@ %% callbacks for behaviour emqx_resource -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -169,8 +168,6 @@ server() -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( ResourceId = PoolName, #{ diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 8fd41443c..97823e0d7 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -79,8 +78,6 @@ server() -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{ diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 0f543badd..ed023ef30 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -18,7 +18,6 @@ %% callbacks for behaviour emqx_resource -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -68,8 +67,6 @@ % be sync for now. callback_mode() -> always_sync. -is_buffer_supported() -> false. - -spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}. on_start( InstId, diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index c700cfd86..2f43fd926 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -100,7 +100,8 @@ call_health_check/3, %% stop the instance call_stop/3, - is_buffer_supported/1 + %% get the query mode of the resource + query_mode/3 ]). %% list all the instances, id only. @@ -132,7 +133,7 @@ on_query_async/4, on_batch_query_async/4, on_get_status/2, - is_buffer_supported/0 + query_mode/1 ]). %% when calling emqx_resource:start/1 @@ -173,7 +174,8 @@ | {resource_status(), resource_state()} | {resource_status(), resource_state(), term()}. --callback is_buffer_supported() -> boolean(). +-callback query_mode(Config :: term()) -> + simple_sync | simple_async | sync | async | no_queries. -spec list_types() -> [module()]. list_types() -> @@ -276,27 +278,26 @@ query(ResId, Request) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:lookup_cached(ResId) of - {ok, _Group, #{query_mode := QM, mod := Module} = Config} -> - IsBufferSupported = is_buffer_supported(Module), - case {IsBufferSupported, QM} of - {true, _} -> - %% only Kafka producer so far + {ok, _Group, #{query_mode := QM}} -> + case QM of + simple_async -> + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again Opts1 = Opts#{is_buffer_supported => true}, - do_query_built_in_buffer(QM, ResId, Request, Opts1); - {false, sync} -> + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); + simple_sync -> + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again + emqx_resource_buffer_worker:simple_sync_query(ResId, Request); + sync -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); - {false, async} -> + async -> emqx_resource_buffer_worker:async_query(ResId, Request, Opts) end; {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") end. -do_query_built_in_buffer(async, ResId, Request, Opts1) -> - emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); -do_query_built_in_buffer(sync, ResId, Request, _Opts1) -> - emqx_resource_buffer_worker:simple_sync_query(ResId, Request). - -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). simple_sync_query(ResId, Request) -> emqx_resource_buffer_worker:simple_sync_query(ResId, Request). @@ -372,15 +373,6 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group). get_callback_mode(Mod) -> Mod:callback_mode(). --spec is_buffer_supported(module()) -> boolean(). -is_buffer_supported(Module) -> - try - Module:is_buffer_supported() - catch - _:_ -> - false - end. - -spec call_start(resource_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(ResId, Mod, Config) -> @@ -417,6 +409,17 @@ call_stop(ResId, Mod, ResourceState) -> Res end). +-spec query_mode(module(), term(), creation_opts()) -> + simple_sync | simple_async | sync | async | no_queries. + +query_mode(Mod, Config, Opts) -> + case erlang:function_exported(Mod, query_mode, 1) of + true -> + Mod:query_mode(Config); + false -> + maps:get(query_mode, Opts, sync) + end. + -spec check_config(resource_type(), raw_resource_config()) -> {ok, resource_config()} | {error, term()}. check_config(ResourceType, Conf) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 02dda8021..be58bf0e0 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -144,12 +144,18 @@ create(ResId, Group, ResourceType, Config, Opts) -> ], [matched] ), - case emqx_resource:is_buffer_supported(ResourceType) of - true -> - %% the resource it self supports - %% buffer, so there is no need for resource workers + QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts), + case QueryMode of + %% the resource has built-in buffer, so there is no need for resource workers + simple_sync -> ok; - false -> + simple_async -> + ok; + %% The resource is a consumer resource, so there is no need for resource workers + no_queries -> + ok; + _ -> + %% start resource workers as the query type requires them ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts), case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of true -> @@ -288,16 +294,20 @@ health_check(ResId) -> %% @doc Function called from the supervisor to actually start the server start_link(ResId, Group, ResourceType, Config, Opts) -> + QueryMode = + case erlang:function_exported(ResourceType, query_mode, 1) of + true -> + ResourceType:query_mode(Config); + false -> + maps:get(query_mode, Opts, sync) + end, + Data = #data{ id = ResId, group = Group, mod = ResourceType, callback_mode = emqx_resource:get_callback_mode(ResourceType), - %% query_mode = dynamic | sync | async - %% TODO: - %% dynamic mode is async mode when things are going well, but becomes sync mode - %% if the resource worker is overloaded - query_mode = maps:get(query_mode, Opts, sync), + query_mode = QueryMode, config = Config, opts = Opts, state = undefined, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 3ed460492..702b9ff09 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_connector, [ {description, "EMQX Enterprise connectors"}, - {vsn, "0.1.13"}, + {vsn, "0.1.14"}, {registered, []}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl index 59f763904..0c5a839e9 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl @@ -15,7 +15,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -28,8 +27,6 @@ callback_mode() -> emqx_connector_mongo:callback_mode(). -is_buffer_supported() -> false. - on_start(InstanceId, Config) -> case emqx_connector_mongo:on_start(InstanceId, Config) of {ok, ConnectorState} -> From 6f2271e9f0384d88f148d64051b28434f29b3b36 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 8 Jun 2023 15:43:41 +0200 Subject: [PATCH 3/8] test: add Kafka producer bridge test case for query_mode parameter --- .../src/emqx_bridge_kafka_impl_producer.erl | 2 + .../emqx_bridge_kafka_impl_producer_SUITE.erl | 75 +++++++++++++++---- rel/i18n/emqx_bridge_kafka.hocon | 12 --- 3 files changed, 62 insertions(+), 27 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index eee0cdd99..b5799a91a 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -204,6 +204,7 @@ on_query( query_mode_sync_timeout := SyncTimeout } ) -> + ?tp(emqx_bridge_kafka_impl_producer_sync_query, #{}), KafkaMessage = render_message(Template, Message), try {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), @@ -227,6 +228,7 @@ on_query_async( AsyncReplyFn, #{message_template := Template, producers := Producers} ) -> + ?tp(emqx_bridge_kafka_impl_producer_async_query, #{}), KafkaMessage = render_message(Template, Message), %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs %% * Must be a single element batch because wolff books calls, but not batch sizes diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 378dda543..e52f5b07b 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -120,6 +120,34 @@ set_special_configs(emqx_dashboard) -> ok; set_special_configs(_) -> ok. + +%%------------------------------------------------------------------------------ +%% Test case for the query_mode parameter +%%------------------------------------------------------------------------------ + +t_query_mode(CtConfig) -> + %% We need this because on_query_async is in a different group + CtConfig1 = [{query_api, none} | CtConfig], + ?check_trace( + begin + publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "sync"}) + end, + fun(RunStageResult, Trace) -> + %% We should have a sync Snabbkaffe trace + ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace)) + end + ), + ?check_trace( + begin + publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"}) + end, + fun(RunStageResult, Trace) -> + %% We should have a sync Snabbkaffe trace + ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace)) + end + ), + ok. + %%------------------------------------------------------------------------------ %% Test cases for all combinations of SSL, no SSL and authentication types %%------------------------------------------------------------------------------ @@ -473,6 +501,16 @@ do_send(Ref, Config, ResourceId, Msg, State) when is_list(Config) -> ok end. +publish_with_config_template_parameters(CtConfig, ConfigTemplateParameters) -> + publish_helper( + CtConfig, + #{ + auth_settings => "none", + ssl_settings => #{} + }, + ConfigTemplateParameters + ). + publish_with_and_without_ssl(CtConfig, AuthSettings) -> publish_with_and_without_ssl(CtConfig, AuthSettings, #{}). @@ -537,21 +575,25 @@ publish_helper( {ok, _} = emqx_bridge:create( <>, list_to_binary(Name), Conf ), - Time = erlang:unique_integer(), - BinTime = integer_to_binary(Time), Partition = 0, - Msg = #{ - clientid => BinTime, - payload => <<"payload">>, - timestamp => Time - }, - {ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition), - ct:pal("base offset before testing ~p", [Offset0]), - {ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId), - ok = send(CtConfig, InstId, Msg, State), - {ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0), - ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0), - + case proplists:get_value(query_api, CtConfig) of + none -> + ok; + _ -> + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + {ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition), + ct:pal("base offset before testing ~p", [Offset0]), + {ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId), + ok = send(CtConfig, InstId, Msg, State), + {ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0), + ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0) + end, %% test that it forwards from local mqtt topic as well {ok, Offset1} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition), ct:pal("base offset before testing (2) ~p", [Offset1]), @@ -596,13 +638,15 @@ hocon_config(Args) -> AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf), SSLConf = maps:get("ssl", Args, #{}), SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)), + QueryMode = maps:get("query_mode", Args, <<"async">>), SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf), Hocon = bbmustache:render( iolist_to_binary(hocon_config_template()), Args#{ "authentication" => AuthConfRendered, "bridge_name" => Name, - "ssl" => SSLConfRendered + "ssl" => SSLConfRendered, + "query_mode" => QueryMode } ), Hocon. @@ -630,6 +674,7 @@ bridges.kafka.{{ bridge_name }} { } partition_strategy = {{ partition_strategy }} topic = \"{{ kafka_topic }}\" + query_mode = {{ query_mode }} } metadata_request_timeout = 5s min_metadata_refresh_interval = 3s diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 740d94984..86acd81fe 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -364,18 +364,6 @@ query_mode.desc: query_mode.label: """Query mode""" -resource_opts.desc: -"""Resource options.""" - -resource_opts.label: -"""Resource Options""" - -resource_opts_fields.desc: -"""Resource options.""" - -resource_opts_fields.label: -"""Resource Options""" - query_mode_sync_timeout.desc: """This parameter defines the timeout limit for synchronous queries. It applies only when the bridge query mode is configured to 'sync'.""" From d524f8c805cfc7b64031906d7d28c85790593eb1 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 8 Jun 2023 15:58:19 +0200 Subject: [PATCH 4/8] refactor: rename config parameter --- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl | 4 ++-- .../src/emqx_bridge_kafka_impl_producer.erl | 6 +++--- rel/i18n/emqx_bridge_kafka.hocon | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 54cddb735..5f0dcee72 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -303,12 +303,12 @@ fields(producer_kafka_opts) -> desc => ?DESC(query_mode) } )}, - {query_mode_sync_timeout, + {sync_query_timeout, mk( emqx_schema:duration_ms(), #{ default => <<"5s">>, - desc => ?DESC(query_mode_sync_timeout) + desc => ?DESC(sync_query_timeout) } )} ]; diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index b5799a91a..563e7dd46 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -50,7 +50,7 @@ on_start(InstId, Config) -> kafka := KafkaConfig = #{ message := MessageTemplate, topic := KafkaTopic, - query_mode_sync_timeout := QueryModeSyncTimeout + sync_query_timeout := SyncQueryTimeout }, metadata_request_timeout := MetaReqTimeout, min_metadata_refresh_interval := MinMetaRefreshInterval, @@ -108,7 +108,7 @@ on_start(InstId, Config) -> kafka_topic => KafkaTopic, producers => Producers, resource_id => ResourceId, - query_mode_sync_timeout => QueryModeSyncTimeout + sync_query_timeout => SyncQueryTimeout }}; {error, Reason2} -> ?SLOG(error, #{ @@ -201,7 +201,7 @@ on_query( #{ message_template := Template, producers := Producers, - query_mode_sync_timeout := SyncTimeout + sync_query_timeout := SyncTimeout } ) -> ?tp(emqx_bridge_kafka_impl_producer_sync_query, #{}), diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 86acd81fe..fcb9b0074 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -166,7 +166,7 @@ consumer_offset_reset_policy.label: partition_count_refresh_interval.desc: """The time interval for Kafka producer to discover increased number of partitions. -After the number of partitions is increased in Kafka, EMQX will start taking the +After the number of partitions is increased in Kafka, EMQX will start taking the discovered partitions into account when dispatching messages per partition_strategy.""" partition_count_refresh_interval.label: @@ -364,10 +364,10 @@ query_mode.desc: query_mode.label: """Query mode""" -query_mode_sync_timeout.desc: +sync_query_timeout.desc: """This parameter defines the timeout limit for synchronous queries. It applies only when the bridge query mode is configured to 'sync'.""" -query_mode_sync_timeout.label: +sync_query_timeout.label: """Synchronous Query Timeout""" } From b60dbbc792dc85a961d2f64f156d997445a41222 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 8 Jun 2023 16:04:50 +0200 Subject: [PATCH 5/8] docs: add change log entry for Kafka query mode parameter --- changes/ee/feat-10970.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/feat-10970.en.md diff --git a/changes/ee/feat-10970.en.md b/changes/ee/feat-10970.en.md new file mode 100644 index 000000000..d716f6eef --- /dev/null +++ b/changes/ee/feat-10970.en.md @@ -0,0 +1 @@ +A query_mode parameter has been added to the Kafka producer bridge. This parameter allows you to specify if the bridge should use the asynchronous or synchronous mode when sending data to Kafka. The default is asynchronous mode. From cb3a5fdbd4423b97871757d8e16df0122f392954 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 8 Jun 2023 16:16:23 +0200 Subject: [PATCH 6/8] style: only callback modules should do dynamic calls --- apps/emqx_resource/src/emqx_resource_manager.erl | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index be58bf0e0..c1adb8ecd 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -294,14 +294,11 @@ health_check(ResId) -> %% @doc Function called from the supervisor to actually start the server start_link(ResId, Group, ResourceType, Config, Opts) -> - QueryMode = - case erlang:function_exported(ResourceType, query_mode, 1) of - true -> - ResourceType:query_mode(Config); - false -> - maps:get(query_mode, Opts, sync) - end, - + QueryMode = emqx_resource:query_mode( + ResourceType, + Config, + Opts + ), Data = #data{ id = ResId, group = Group, From 1c7834e0565bd224b489669df682b593ede98590 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 8 Jun 2023 16:47:02 +0200 Subject: [PATCH 7/8] fix: fixes due to comments from @zmstone --- apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl | 2 +- .../src/emqx_bridge_pulsar_impl_producer.erl | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 5f0dcee72..31307fd16 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -305,7 +305,7 @@ fields(producer_kafka_opts) -> )}, {sync_query_timeout, mk( - emqx_schema:duration_ms(), + emqx_schema:timeout_duration_ms(), #{ default => <<"5s">>, desc => ?DESC(sync_query_timeout) diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 1d93ffd0d..0dcf1bf66 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -70,9 +70,8 @@ callback_mode() -> async_if_possible. -%% consumer bridges don't need resource workers query_mode(_Config) -> - no_queries. + simple_async. -spec on_start(resource_id(), config()) -> {ok, state()}. on_start(InstanceId, Config) -> From 2671e8ecf9f35f7ab0ff774ca6e482996762ce3a Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 9 Jun 2023 11:00:05 +0200 Subject: [PATCH 8/8] fix: dialyzer type problem --- apps/emqx_resource/include/emqx_resource.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 562e18d52..8e17ac15c 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -22,7 +22,7 @@ -type resource_state() :: term(). -type resource_status() :: connected | disconnected | connecting | stopped. -type callback_mode() :: always_sync | async_if_possible. --type query_mode() :: async | sync | dynamic. +-type query_mode() :: async | sync | simple_async | simple_sync | dynamic. -type result() :: term(). -type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined. -type query_opts() :: #{