diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf new file mode 100644 index 000000000..a3fb6c402 --- /dev/null +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -0,0 +1,119 @@ +emqx_resource_schema { + batch { + desc { + en: """ +Configuration of batch query.
+Batch requests are made immediately when the number of requests reaches the `batch_size`, or also immediately when the number of requests is less than the batch request size but the maximum batch_time has been reached. +""" + zh: """ +批量请求配置。
+请求数达到批量请求大小时立刻进行批量请求,或当请求数不足批量请求数大小,但已经达到最大批量等待时间时也立即进行批量请求。 +""" + } + label { + en: """batch""" + zh: """批量请求""" + } + } + + queue { + desc { + en: """Configuration of queue.""" + zh: """请求队列配置""" + } + label { + en: """queue""" + zh: """请求队列""" + } + } + + query_mode { + desc { + en: """Query mode. Optional 'sync/async', default 'sync'.""" + zh: """请求模式。可选 '同步/异步',默认为'同步'模式。""" + } + label { + en: """query_mode""" + zh: """query_mode""" + } + } + + enable_batch { + desc { + en: """Batch mode enabled.""" + zh: """启用批量模式。""" + } + label { + en: """enable_batch""" + zh: """启用批量模式""" + } + } + + enable_queue { + desc { + en: """Queue mode enabled.""" + zh: """启用队列模式。""" + } + label { + en: """enable_queue""" + zh: """启用队列模式""" + } + } + + resume_interval { + desc { + en: """Resume time interval when resource down.""" + zh: """资源不可用时的重试时间""" + } + label { + en: """resume_interval""" + zh: """恢复时间""" + } + } + + async_inflight_window { + desc { + en: """Async queyr inflight window.""" + zh: """异步请求飞行队列窗口大小""" + } + label { + en: """async_inflight_window""" + zh: """异步请求飞行队列窗口""" + } + } + + batch_size { + desc { + en: """Maximum batch count.""" + zh: """批量请求大小""" + } + label { + en: """batch_size""" + zh: """批量请求大小""" + } + } + + batch_time { + desc { + en: """Maximum batch waiting interval.""" + zh: """最大批量请求等待时间。""" + } + label { + en: """batch_time""" + zh: """批量等待间隔""" + } + } + + queue_max_bytes { + desc { + en: """Maximum queue storage size in bytes.""" + zh: """消息队列的最大长度,以字节计。""" + } + label { + en: """queue_max_bytes""" + zh: """队列最大长度""" + } + } + + +} diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 5c561a8d3..5327e3aae 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -60,5 +60,17 @@ | {error, term()} | {resource_down, term()}. +%% count +-define(DEFAULT_BATCH_SIZE, 100). +%% milliseconds +-define(DEFAULT_BATCH_TIME, 10). + +%% bytes +-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). + +-define(DEFAULT_INFLIGHT, 100). + +-define(RESUME_INTERVAL, 15000). + -define(TEST_ID_PREFIX, "_test_:"). -define(RES_METRICS, resource_metrics). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 716842bf9..e940dcb69 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -52,13 +52,6 @@ -export([reply_after_query/6, batch_reply_after_query/6]). --define(RESUME_INTERVAL, 15000). - -%% count --define(DEFAULT_BATCH_SIZE, 100). -%% milliseconds --define(DEFAULT_BATCH_TIME, 10). - -define(Q_ITEM(REQUEST), {q_item, REQUEST}). -define(QUERY(FROM, REQUEST), {query, FROM, REQUEST}). @@ -69,8 +62,6 @@ {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}} ). -define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). --define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). --define(DEFAULT_INFLIGHT, 100). -type id() :: binary(). -type query() :: {query, from(), request()}. @@ -122,7 +113,7 @@ init({Id, Index, Opts}) -> Name = name(Id, Index), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), Queue = - case maps:get(queue_enabled, Opts, false) of + case maps:get(enable_queue, Opts, false) of true -> replayq:open(#{ dir => disk_queue_dir(Id, Index), @@ -144,7 +135,7 @@ init({Id, Index, Opts}) -> %% if the resource worker is overloaded query_mode => maps:get(query_mode, Opts, sync), async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), - batch_enabled => maps:get(batch_enabled, Opts, false), + enable_batch => maps:get(enable_batch, Opts, false), batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), queue => Queue, @@ -270,14 +261,14 @@ drop_head(Q) -> ok = replayq:ack(Q1, AckRef), Q1. -query_or_acc(From, Request, #{batch_enabled := true, acc := Acc, acc_left := Left} = St0) -> +query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left} = St0) -> Acc1 = [?QUERY(From, Request) | Acc], St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of true -> flush(St); false -> {keep_state, ensure_flush_timer(St)} end; -query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id, query_mode := QM} = St) -> +query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, query_mode := QM} = St) -> QueryOpts = #{ inflight_name => maps:get(name, St), inflight_window => maps:get(async_inflight_window, St) diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl new file mode 100644 index 000000000..933cd0189 --- /dev/null +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -0,0 +1,99 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_resource_schema). + +-include("emqx_resource.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([namespace/0, roots/0, fields/1]). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions + +namespace() -> "resource_schema". + +roots() -> []. + +fields('batch&async&queue') -> + [ + {query_mode, fun query_mode/1}, + {resume_interval, fun resume_interval/1}, + {async_inflight_window, fun async_inflight_window/1}, + {batch, mk(ref(?MODULE, batch), #{desc => ?DESC("batch")})}, + {queue, mk(ref(?MODULE, queue), #{desc => ?DESC("queue")})} + ]; +fields(batch) -> + [ + {enable_batch, fun enable_batch/1}, + {batch_size, fun batch_size/1}, + {batch_time, fun batch_time/1} + ]; +fields(queue) -> + [ + {enable_queue, fun enable_queue/1}, + {max_queue_bytes, fun queue_max_bytes/1} + ]. + +query_mode(type) -> enum([sync, async]); +query_mode(desc) -> ?DESC("query_mode"); +query_mode(default) -> sync; +query_mode(required) -> false; +query_mode(_) -> undefined. + +enable_batch(type) -> boolean(); +enable_batch(required) -> false; +enable_batch(default) -> false; +enable_batch(desc) -> ?DESC("enable_batch"); +enable_batch(_) -> undefined. + +enable_queue(type) -> boolean(); +enable_queue(required) -> false; +enable_queue(default) -> false; +enable_queue(desc) -> ?DESC("enable_queue"); +enable_queue(_) -> undefined. + +resume_interval(type) -> emqx_schema:duration_ms(); +resume_interval(desc) -> ?DESC("resume_interval"); +resume_interval(default) -> ?RESUME_INTERVAL; +resume_interval(required) -> false; +resume_interval(_) -> undefined. + +async_inflight_window(type) -> pos_integer(); +async_inflight_window(desc) -> ?DESC("async_inflight_window"); +async_inflight_window(default) -> ?DEFAULT_INFLIGHT; +async_inflight_window(required) -> false; +async_inflight_window(_) -> undefined. + +batch_size(type) -> pos_integer(); +batch_size(desc) -> ?DESC("batch_size"); +batch_size(default) -> ?DEFAULT_BATCH_SIZE; +batch_size(required) -> false; +batch_size(_) -> undefined. + +batch_time(type) -> emqx_schema:duration_ms(); +batch_time(desc) -> ?DESC("batch_time"); +batch_time(default) -> ?DEFAULT_BATCH_TIME; +batch_time(required) -> false; +batch_time(_) -> undefined. + +queue_max_bytes(type) -> emqx_schema:bytesize(); +queue_max_bytes(desc) -> ?DESC("queue_max_bytes"); +queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE; +queue_max_bytes(required) -> false; +queue_max_bytes(_) -> undefined. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 5c62e22c2..ddd671b75 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -211,7 +211,7 @@ t_batch_query_counter(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, - #{batch_enabled => true} + #{enable_batch => true} ), ?check_trace(