diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 0cc3f993b..214d6b642 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -64,8 +63,6 @@ fields(config) -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{ diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index 65a0336ec..18cfca1d9 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -22,8 +22,7 @@ on_query_async/4, on_batch_query/3, on_batch_query_async/4, - on_get_status/2, - is_buffer_supported/0 + on_get_status/2 ]). -export([reply_delegator/3]). @@ -56,8 +55,6 @@ %% emqx_resource API %%------------------------------------------------------------------------------------------------- -is_buffer_supported() -> false. - callback_mode() -> async_if_possible. -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 64f2394c4..31307fd16 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -294,7 +294,23 @@ fields(producer_kafka_opts) -> mk(ref(producer_buffer), #{ required => false, desc => ?DESC(producer_buffer) - })} + })}, + {query_mode, + mk( + enum([async, sync]), + #{ + default => async, + desc => ?DESC(query_mode) + } + )}, + {sync_query_timeout, + mk( + emqx_schema:timeout_duration_ms(), + #{ + default => <<"5s">>, + desc => ?DESC(sync_query_timeout) + } + )} ]; fields(kafka_message) -> [ diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c0de23d94..5f59b4756 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -8,7 +8,7 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, + query_mode/1, on_start/2, on_stop/2, on_get_status/2 @@ -112,11 +112,9 @@ callback_mode() -> async_if_possible. -%% there are no queries to be made to this bridge, so we say that -%% buffer is supported so we don't spawn unused resource buffer -%% workers. -is_buffer_supported() -> - true. +%% consumer bridges don't need resource workers +query_mode(_Config) -> + no_queries. -spec on_start(resource_id(), config()) -> {ok, state()}. on_start(ResourceId, Config) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 8b8337b09..563e7dd46 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -4,10 +4,11 @@ -module(emqx_bridge_kafka_impl_producer). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). %% callbacks of behaviour emqx_resource -export([ - is_buffer_supported/0, + query_mode/1, callback_mode/0, on_start/2, on_stop/2, @@ -32,7 +33,10 @@ %% to hocon; keeping this as just `kafka' for backwards compatibility. -define(BRIDGE_TYPE, kafka). -is_buffer_supported() -> true. +query_mode(#{kafka := #{query_mode := sync}}) -> + simple_sync; +query_mode(_) -> + simple_async. callback_mode() -> async_if_possible. @@ -43,7 +47,11 @@ on_start(InstId, Config) -> bootstrap_hosts := Hosts0, bridge_name := BridgeName, connect_timeout := ConnTimeout, - kafka := KafkaConfig = #{message := MessageTemplate, topic := KafkaTopic}, + kafka := KafkaConfig = #{ + message := MessageTemplate, + topic := KafkaTopic, + sync_query_timeout := SyncQueryTimeout + }, metadata_request_timeout := MetaReqTimeout, min_metadata_refresh_interval := MinMetaRefreshInterval, socket_opts := SocketOpts, @@ -99,7 +107,8 @@ on_start(InstId, Config) -> client_id => ClientId, kafka_topic => KafkaTopic, producers => Producers, - resource_id => ResourceId + resource_id => ResourceId, + sync_query_timeout => SyncQueryTimeout }}; {error, Reason2} -> ?SLOG(error, #{ @@ -189,14 +198,16 @@ on_stop(InstanceId, _State) -> on_query( _InstId, {send_message, Message}, - #{message_template := Template, producers := Producers} + #{ + message_template := Template, + producers := Producers, + sync_query_timeout := SyncTimeout + } ) -> + ?tp(emqx_bridge_kafka_impl_producer_sync_query, #{}), KafkaMessage = render_message(Template, Message), - %% TODO: this function is not used so far, - %% timeout should be configurable - %% or the on_query/3 should be on_query/4 instead. try - {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], 5000), + {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout), ok catch error:{producer_down, _} = Reason -> @@ -217,6 +228,7 @@ on_query_async( AsyncReplyFn, #{message_template := Template, producers := Producers} ) -> + ?tp(emqx_bridge_kafka_impl_producer_async_query, #{}), KafkaMessage = render_message(Template, Message), %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs %% * Must be a single element batch because wolff books calls, but not batch sizes diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 378dda543..e52f5b07b 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -120,6 +120,34 @@ set_special_configs(emqx_dashboard) -> ok; set_special_configs(_) -> ok. + +%%------------------------------------------------------------------------------ +%% Test case for the query_mode parameter +%%------------------------------------------------------------------------------ + +t_query_mode(CtConfig) -> + %% We need this because on_query_async is in a different group + CtConfig1 = [{query_api, none} | CtConfig], + ?check_trace( + begin + publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "sync"}) + end, + fun(RunStageResult, Trace) -> + %% We should have a sync Snabbkaffe trace + ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace)) + end + ), + ?check_trace( + begin + publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"}) + end, + fun(RunStageResult, Trace) -> + %% We should have a sync Snabbkaffe trace + ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace)) + end + ), + ok. + %%------------------------------------------------------------------------------ %% Test cases for all combinations of SSL, no SSL and authentication types %%------------------------------------------------------------------------------ @@ -473,6 +501,16 @@ do_send(Ref, Config, ResourceId, Msg, State) when is_list(Config) -> ok end. +publish_with_config_template_parameters(CtConfig, ConfigTemplateParameters) -> + publish_helper( + CtConfig, + #{ + auth_settings => "none", + ssl_settings => #{} + }, + ConfigTemplateParameters + ). + publish_with_and_without_ssl(CtConfig, AuthSettings) -> publish_with_and_without_ssl(CtConfig, AuthSettings, #{}). @@ -537,21 +575,25 @@ publish_helper( {ok, _} = emqx_bridge:create( <>, list_to_binary(Name), Conf ), - Time = erlang:unique_integer(), - BinTime = integer_to_binary(Time), Partition = 0, - Msg = #{ - clientid => BinTime, - payload => <<"payload">>, - timestamp => Time - }, - {ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition), - ct:pal("base offset before testing ~p", [Offset0]), - {ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId), - ok = send(CtConfig, InstId, Msg, State), - {ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0), - ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0), - + case proplists:get_value(query_api, CtConfig) of + none -> + ok; + _ -> + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + {ok, Offset0} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition), + ct:pal("base offset before testing ~p", [Offset0]), + {ok, _Group, #{state := State}} = emqx_resource:get_instance(InstId), + ok = send(CtConfig, InstId, Msg, State), + {ok, {_, [KafkaMsg0]}} = brod:fetch(kafka_hosts(), KafkaTopic, Partition, Offset0), + ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0) + end, %% test that it forwards from local mqtt topic as well {ok, Offset1} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, Partition), ct:pal("base offset before testing (2) ~p", [Offset1]), @@ -596,13 +638,15 @@ hocon_config(Args) -> AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf), SSLConf = maps:get("ssl", Args, #{}), SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)), + QueryMode = maps:get("query_mode", Args, <<"async">>), SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf), Hocon = bbmustache:render( iolist_to_binary(hocon_config_template()), Args#{ "authentication" => AuthConfRendered, "bridge_name" => Name, - "ssl" => SSLConfRendered + "ssl" => SSLConfRendered, + "query_mode" => QueryMode } ), Hocon. @@ -630,6 +674,7 @@ bridges.kafka.{{ bridge_name }} { } partition_strategy = {{ partition_strategy }} topic = \"{{ kafka_topic }}\" + query_mode = {{ query_mode }} } metadata_request_timeout = 5s min_metadata_refresh_interval = 3s diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 71184e872..887031e2e 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -49,8 +48,6 @@ fields(config) -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{ diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index b8157d4fc..0dcf1bf66 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -11,7 +11,7 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, + query_mode/1, on_start/2, on_stop/2, on_get_status/2, @@ -70,10 +70,8 @@ callback_mode() -> async_if_possible. -%% there are no queries to be made to this bridge, so we say that -%% buffer is supported so we don't spawn unused resource buffer -%% workers. -is_buffer_supported() -> true. +query_mode(_Config) -> + simple_async. -spec on_start(resource_id(), config()) -> {ok, state()}. on_start(InstanceId, Config) -> diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 0dc73b5f6..251158029 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -34,7 +34,6 @@ %% Optional callbacks on_get_status/2, on_query/3, - is_buffer_supported/0, on_batch_query/3 ]). @@ -187,11 +186,6 @@ callback_mode() -> always_sync. %% emqx_resource callback --spec is_buffer_supported() -> boolean(). -is_buffer_supported() -> - %% We want to make use of EMQX's buffer mechanism - false. - %% emqx_resource callback called when the resource is started -spec on_start(resource_id(), term()) -> {ok, resource_state()} | {error, _}. diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index a7d01960e..d9a9eae3b 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -86,8 +85,6 @@ servers() -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 52bd910db..d814e6205 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -30,7 +30,6 @@ %% callbacks for behaviour emqx_resource -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -169,8 +168,6 @@ server() -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( ResourceId = PoolName, #{ diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 8fd41443c..97823e0d7 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -79,8 +78,6 @@ server() -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{ diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 0f543badd..ed023ef30 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -18,7 +18,6 @@ %% callbacks for behaviour emqx_resource -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -68,8 +67,6 @@ % be sync for now. callback_mode() -> always_sync. -is_buffer_supported() -> false. - -spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}. on_start( InstId, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 562e18d52..8e17ac15c 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -22,7 +22,7 @@ -type resource_state() :: term(). -type resource_status() :: connected | disconnected | connecting | stopped. -type callback_mode() :: always_sync | async_if_possible. --type query_mode() :: async | sync | dynamic. +-type query_mode() :: async | sync | simple_async | simple_sync | dynamic. -type result() :: term(). -type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined. -type query_opts() :: #{ diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 169b326c8..5a8eab324 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -100,7 +100,8 @@ call_health_check/3, %% stop the instance call_stop/3, - is_buffer_supported/1 + %% get the query mode of the resource + query_mode/3 ]). %% list all the instances, id only. @@ -132,7 +133,7 @@ on_query_async/4, on_batch_query_async/4, on_get_status/2, - is_buffer_supported/0 + query_mode/1 ]). %% when calling emqx_resource:start/1 @@ -173,7 +174,8 @@ | {resource_status(), resource_state()} | {resource_status(), resource_state(), term()}. --callback is_buffer_supported() -> boolean(). +-callback query_mode(Config :: term()) -> + simple_sync | simple_async | sync | async | no_queries. -spec list_types() -> [module()]. list_types() -> @@ -276,16 +278,20 @@ query(ResId, Request) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:lookup_cached(ResId) of - {ok, _Group, #{query_mode := QM, mod := Module}} -> - IsBufferSupported = is_buffer_supported(Module), - case {IsBufferSupported, QM} of - {true, _} -> - %% only Kafka producer so far + {ok, _Group, #{query_mode := QM}} -> + case QM of + simple_async -> + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again Opts1 = Opts#{is_buffer_supported => true}, emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); - {false, sync} -> + simple_sync -> + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again + emqx_resource_buffer_worker:simple_sync_query(ResId, Request); + sync -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); - {false, async} -> + async -> emqx_resource_buffer_worker:async_query(ResId, Request, Opts) end; {error, not_found} -> @@ -367,15 +373,6 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group). get_callback_mode(Mod) -> Mod:callback_mode(). --spec is_buffer_supported(module()) -> boolean(). -is_buffer_supported(Module) -> - try - Module:is_buffer_supported() - catch - _:_ -> - false - end. - -spec call_start(resource_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(ResId, Mod, Config) -> @@ -416,6 +413,17 @@ call_stop(ResId, Mod, ResourceState) -> Res end). +-spec query_mode(module(), term(), creation_opts()) -> + simple_sync | simple_async | sync | async | no_queries. + +query_mode(Mod, Config, Opts) -> + case erlang:function_exported(Mod, query_mode, 1) of + true -> + Mod:query_mode(Config); + false -> + maps:get(query_mode, Opts, sync) + end. + -spec check_config(resource_type(), raw_resource_config()) -> {ok, resource_config()} | {error, term()}. check_config(ResourceType, Conf) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 02dda8021..c1adb8ecd 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -144,12 +144,18 @@ create(ResId, Group, ResourceType, Config, Opts) -> ], [matched] ), - case emqx_resource:is_buffer_supported(ResourceType) of - true -> - %% the resource it self supports - %% buffer, so there is no need for resource workers + QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts), + case QueryMode of + %% the resource has built-in buffer, so there is no need for resource workers + simple_sync -> ok; - false -> + simple_async -> + ok; + %% The resource is a consumer resource, so there is no need for resource workers + no_queries -> + ok; + _ -> + %% start resource workers as the query type requires them ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts), case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of true -> @@ -288,16 +294,17 @@ health_check(ResId) -> %% @doc Function called from the supervisor to actually start the server start_link(ResId, Group, ResourceType, Config, Opts) -> + QueryMode = emqx_resource:query_mode( + ResourceType, + Config, + Opts + ), Data = #data{ id = ResId, group = Group, mod = ResourceType, callback_mode = emqx_resource:get_callback_mode(ResourceType), - %% query_mode = dynamic | sync | async - %% TODO: - %% dynamic mode is async mode when things are going well, but becomes sync mode - %% if the resource worker is overloaded - query_mode = maps:get(query_mode, Opts, sync), + query_mode = QueryMode, config = Config, opts = Opts, state = undefined, diff --git a/changes/ee/feat-10970.en.md b/changes/ee/feat-10970.en.md new file mode 100644 index 000000000..d716f6eef --- /dev/null +++ b/changes/ee/feat-10970.en.md @@ -0,0 +1 @@ +A query_mode parameter has been added to the Kafka producer bridge. This parameter allows you to specify if the bridge should use the asynchronous or synchronous mode when sending data to Kafka. The default is asynchronous mode. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 3ed460492..702b9ff09 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_connector, [ {description, "EMQX Enterprise connectors"}, - {vsn, "0.1.13"}, + {vsn, "0.1.14"}, {registered, []}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl index 59f763904..0c5a839e9 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl @@ -15,7 +15,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -28,8 +27,6 @@ callback_mode() -> emqx_connector_mongo:callback_mode(). -is_buffer_supported() -> false. - on_start(InstanceId, Config) -> case emqx_connector_mongo:on_start(InstanceId, Config) of {ok, ConnectorState} -> diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index ef2e27972..fcb9b0074 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -166,7 +166,7 @@ consumer_offset_reset_policy.label: partition_count_refresh_interval.desc: """The time interval for Kafka producer to discover increased number of partitions. -After the number of partitions is increased in Kafka, EMQX will start taking the +After the number of partitions is increased in Kafka, EMQX will start taking the discovered partitions into account when dispatching messages per partition_strategy.""" partition_count_refresh_interval.label: @@ -358,4 +358,16 @@ compression.desc: compression.label: """Compression""" +query_mode.desc: +"""Query mode. Optional 'sync/async', default 'async'.""" + +query_mode.label: +"""Query mode""" + +sync_query_timeout.desc: +"""This parameter defines the timeout limit for synchronous queries. It applies only when the bridge query mode is configured to 'sync'.""" + +sync_query_timeout.label: +"""Synchronous Query Timeout""" + }