diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 0cc3f993b..214d6b642 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -64,8 +63,6 @@ fields(config) -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{ diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index 65a0336ec..18cfca1d9 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -22,8 +22,7 @@ on_query_async/4, on_batch_query/3, on_batch_query_async/4, - on_get_status/2, - is_buffer_supported/0 + on_get_status/2 ]). -export([reply_delegator/3]). @@ -56,8 +55,6 @@ %% emqx_resource API %%------------------------------------------------------------------------------------------------- -is_buffer_supported() -> false. - callback_mode() -> async_if_possible. -spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 9cabaf5e9..54cddb735 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -147,7 +147,7 @@ fields("config_producer") -> fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> - fields("config") ++ fields(resource_opts) ++ fields(producer_opts); + fields("config") ++ fields(producer_opts); fields(kafka_consumer) -> fields("config") ++ fields(consumer_opts); fields("config") -> @@ -295,28 +295,6 @@ fields(producer_kafka_opts) -> required => false, desc => ?DESC(producer_buffer) })}, - {query_mode_sync_timeout, - mk( - emqx_schema:duration_ms(), - #{ - default => <<"5s">>, - desc => ?DESC(query_mode_sync_timeout) - } - )} - ]; -fields(resource_opts) -> - [ - {resource_opts, - mk( - ref(?MODULE, resource_opts_fields), - #{ - required => false, - desc => ?DESC(resource_opts) - } - )} - ]; -fields(resource_opts_fields) -> - [ {query_mode, mk( enum([async, sync]), @@ -324,6 +302,14 @@ fields(resource_opts_fields) -> default => async, desc => ?DESC(query_mode) } + )}, + {query_mode_sync_timeout, + mk( + emqx_schema:duration_ms(), + #{ + default => <<"5s">>, + desc => ?DESC(query_mode_sync_timeout) + } )} ]; fields(kafka_message) -> @@ -440,8 +426,7 @@ struct_names() -> producer_opts, consumer_opts, consumer_kafka_opts, - consumer_topic_mapping, - resource_opts_fields + consumer_topic_mapping ]. %% ------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c0de23d94..5f59b4756 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -8,7 +8,7 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, + query_mode/1, on_start/2, on_stop/2, on_get_status/2 @@ -112,11 +112,9 @@ callback_mode() -> async_if_possible. -%% there are no queries to be made to this bridge, so we say that -%% buffer is supported so we don't spawn unused resource buffer -%% workers. -is_buffer_supported() -> - true. +%% consumer bridges don't need resource workers +query_mode(_Config) -> + no_queries. -spec on_start(resource_id(), config()) -> {ok, state()}. on_start(ResourceId, Config) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 8be506efd..eee0cdd99 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -4,10 +4,11 @@ -module(emqx_bridge_kafka_impl_producer). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). %% callbacks of behaviour emqx_resource -export([ - is_buffer_supported/0, + query_mode/1, callback_mode/0, on_start/2, on_stop/2, @@ -32,7 +33,10 @@ %% to hocon; keeping this as just `kafka' for backwards compatibility. -define(BRIDGE_TYPE, kafka). -is_buffer_supported() -> true. +query_mode(#{kafka := #{query_mode := sync}}) -> + simple_sync; +query_mode(_) -> + simple_async. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 71184e872..887031e2e 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -49,8 +48,6 @@ fields(config) -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{ diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index b8157d4fc..1d93ffd0d 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -11,7 +11,7 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, + query_mode/1, on_start/2, on_stop/2, on_get_status/2, @@ -70,10 +70,9 @@ callback_mode() -> async_if_possible. -%% there are no queries to be made to this bridge, so we say that -%% buffer is supported so we don't spawn unused resource buffer -%% workers. -is_buffer_supported() -> true. +%% consumer bridges don't need resource workers +query_mode(_Config) -> + no_queries. -spec on_start(resource_id(), config()) -> {ok, state()}. on_start(InstanceId, Config) -> diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 0dc73b5f6..251158029 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -34,7 +34,6 @@ %% Optional callbacks on_get_status/2, on_query/3, - is_buffer_supported/0, on_batch_query/3 ]). @@ -187,11 +186,6 @@ callback_mode() -> always_sync. %% emqx_resource callback --spec is_buffer_supported() -> boolean(). -is_buffer_supported() -> - %% We want to make use of EMQX's buffer mechanism - false. - %% emqx_resource callback called when the resource is started -spec on_start(resource_id(), term()) -> {ok, resource_state()} | {error, _}. diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index a7d01960e..d9a9eae3b 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -86,8 +85,6 @@ servers() -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 52bd910db..d814e6205 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -30,7 +30,6 @@ %% callbacks for behaviour emqx_resource -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -169,8 +168,6 @@ server() -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( ResourceId = PoolName, #{ diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 8fd41443c..97823e0d7 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -17,7 +17,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -79,8 +78,6 @@ server() -> callback_mode() -> always_sync. -is_buffer_supported() -> false. - on_start( InstanceId, #{ diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 0f543badd..ed023ef30 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -18,7 +18,6 @@ %% callbacks for behaviour emqx_resource -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -68,8 +67,6 @@ % be sync for now. callback_mode() -> always_sync. -is_buffer_supported() -> false. - -spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}. on_start( InstId, diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index c700cfd86..2f43fd926 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -100,7 +100,8 @@ call_health_check/3, %% stop the instance call_stop/3, - is_buffer_supported/1 + %% get the query mode of the resource + query_mode/3 ]). %% list all the instances, id only. @@ -132,7 +133,7 @@ on_query_async/4, on_batch_query_async/4, on_get_status/2, - is_buffer_supported/0 + query_mode/1 ]). %% when calling emqx_resource:start/1 @@ -173,7 +174,8 @@ | {resource_status(), resource_state()} | {resource_status(), resource_state(), term()}. --callback is_buffer_supported() -> boolean(). +-callback query_mode(Config :: term()) -> + simple_sync | simple_async | sync | async | no_queries. -spec list_types() -> [module()]. list_types() -> @@ -276,27 +278,26 @@ query(ResId, Request) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:lookup_cached(ResId) of - {ok, _Group, #{query_mode := QM, mod := Module} = Config} -> - IsBufferSupported = is_buffer_supported(Module), - case {IsBufferSupported, QM} of - {true, _} -> - %% only Kafka producer so far + {ok, _Group, #{query_mode := QM}} -> + case QM of + simple_async -> + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again Opts1 = Opts#{is_buffer_supported => true}, - do_query_built_in_buffer(QM, ResId, Request, Opts1); - {false, sync} -> + emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); + simple_sync -> + %% TODO(5.1.1): pass Resource instead of ResId to simple APIs + %% so the buffer worker does not need to lookup the cache again + emqx_resource_buffer_worker:simple_sync_query(ResId, Request); + sync -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); - {false, async} -> + async -> emqx_resource_buffer_worker:async_query(ResId, Request, Opts) end; {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") end. -do_query_built_in_buffer(async, ResId, Request, Opts1) -> - emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts1); -do_query_built_in_buffer(sync, ResId, Request, _Opts1) -> - emqx_resource_buffer_worker:simple_sync_query(ResId, Request). - -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). simple_sync_query(ResId, Request) -> emqx_resource_buffer_worker:simple_sync_query(ResId, Request). @@ -372,15 +373,6 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group). get_callback_mode(Mod) -> Mod:callback_mode(). --spec is_buffer_supported(module()) -> boolean(). -is_buffer_supported(Module) -> - try - Module:is_buffer_supported() - catch - _:_ -> - false - end. - -spec call_start(resource_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(ResId, Mod, Config) -> @@ -417,6 +409,17 @@ call_stop(ResId, Mod, ResourceState) -> Res end). +-spec query_mode(module(), term(), creation_opts()) -> + simple_sync | simple_async | sync | async | no_queries. + +query_mode(Mod, Config, Opts) -> + case erlang:function_exported(Mod, query_mode, 1) of + true -> + Mod:query_mode(Config); + false -> + maps:get(query_mode, Opts, sync) + end. + -spec check_config(resource_type(), raw_resource_config()) -> {ok, resource_config()} | {error, term()}. check_config(ResourceType, Conf) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 02dda8021..be58bf0e0 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -144,12 +144,18 @@ create(ResId, Group, ResourceType, Config, Opts) -> ], [matched] ), - case emqx_resource:is_buffer_supported(ResourceType) of - true -> - %% the resource it self supports - %% buffer, so there is no need for resource workers + QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts), + case QueryMode of + %% the resource has built-in buffer, so there is no need for resource workers + simple_sync -> ok; - false -> + simple_async -> + ok; + %% The resource is a consumer resource, so there is no need for resource workers + no_queries -> + ok; + _ -> + %% start resource workers as the query type requires them ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts), case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of true -> @@ -288,16 +294,20 @@ health_check(ResId) -> %% @doc Function called from the supervisor to actually start the server start_link(ResId, Group, ResourceType, Config, Opts) -> + QueryMode = + case erlang:function_exported(ResourceType, query_mode, 1) of + true -> + ResourceType:query_mode(Config); + false -> + maps:get(query_mode, Opts, sync) + end, + Data = #data{ id = ResId, group = Group, mod = ResourceType, callback_mode = emqx_resource:get_callback_mode(ResourceType), - %% query_mode = dynamic | sync | async - %% TODO: - %% dynamic mode is async mode when things are going well, but becomes sync mode - %% if the resource worker is overloaded - query_mode = maps:get(query_mode, Opts, sync), + query_mode = QueryMode, config = Config, opts = Opts, state = undefined, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 3ed460492..702b9ff09 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_connector, [ {description, "EMQX Enterprise connectors"}, - {vsn, "0.1.13"}, + {vsn, "0.1.14"}, {registered, []}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl index 59f763904..0c5a839e9 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl @@ -15,7 +15,6 @@ %% `emqx_resource' API -export([ callback_mode/0, - is_buffer_supported/0, on_start/2, on_stop/2, on_query/3, @@ -28,8 +27,6 @@ callback_mode() -> emqx_connector_mongo:callback_mode(). -is_buffer_supported() -> false. - on_start(InstanceId, Config) -> case emqx_connector_mongo:on_start(InstanceId, Config) of {ok, ConnectorState} ->