From 27b84453371fe99c3a8b0e2e0e0aaed274b1d31c Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 21 Mar 2023 16:30:45 +0100 Subject: [PATCH 1/3] fix: add inflight window setting to the clickhouse bridge This commit makes sure the inflight window setting is present for the clickhouse bridge. It also changes emqx_resource_schema that previously removed the inflight window setting from resources with query mode `always_sync`. We don't need to do that because all bridges that uses the buffer worker queue will get async call handling even if the bridge don't support the async callback. Co-authored-by: Zaiming (Stone) Shi --- .../i18n/emqx_resource_schema_i18n.conf | 10 +++++----- .../src/schema/emqx_resource_schema.erl | 18 +++++++++--------- .../src/emqx_ee_bridge_clickhouse.erl | 7 +------ 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index fb6b2eb06..3a6b50e83 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -135,14 +135,14 @@ When disabled the messages are buffered in RAM only.""" } } - async_inflight_window { + inflight_window { desc { - en: """Async query inflight window.""" - zh: """异步请求飞行队列窗口大小。""" + en: """Query inflight window. When query_mode is set to async, this config has to be et to 1 if messages from the same MQTT client have to be stricktly ordered.""" + zh: """请求飞行队列窗口大小。当请求模式为异步时,如果需要严格保证来自同一 MQTT 客户端的消息有序,则必须将此值设为 1。""""" } label { - en: """Async inflight window""" - zh: """异步请求飞行队列窗口""" + en: """Inflight window""" + zh: """请求飞行队列窗口""" } } diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index fdd65bc3c..e0f68d639 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -39,10 +39,9 @@ fields("resource_opts_sync_only") -> )} ]; fields("creation_opts_sync_only") -> - Fields0 = fields("creation_opts"), - Fields1 = lists:keydelete(async_inflight_window, 1, Fields0), + Fields = fields("creation_opts"), QueryMod = {query_mode, fun query_mode_sync_only/1}, - lists:keyreplace(query_mode, 1, Fields1, QueryMod); + lists:keyreplace(query_mode, 1, Fields, QueryMod); fields("resource_opts") -> [ {resource_opts, @@ -60,7 +59,7 @@ fields("creation_opts") -> {auto_restart_interval, fun auto_restart_interval/1}, {query_mode, fun query_mode/1}, {request_timeout, fun request_timeout/1}, - {async_inflight_window, fun async_inflight_window/1}, + {inflight_window, fun inflight_window/1}, {enable_batch, fun enable_batch/1}, {batch_size, fun batch_size/1}, {batch_time, fun batch_time/1}, @@ -136,11 +135,12 @@ enable_queue(deprecated) -> {since, "v5.0.14"}; enable_queue(desc) -> ?DESC("enable_queue"); enable_queue(_) -> undefined. -async_inflight_window(type) -> pos_integer(); -async_inflight_window(desc) -> ?DESC("async_inflight_window"); -async_inflight_window(default) -> ?DEFAULT_INFLIGHT; -async_inflight_window(required) -> false; -async_inflight_window(_) -> undefined. +inflight_window(type) -> pos_integer(); +inflight_window(aliases) -> [async_inflight_window]; +inflight_window(desc) -> ?DESC("inflight_window"); +inflight_window(default) -> ?DEFAULT_INFLIGHT; +inflight_window(required) -> false; +inflight_window(_) -> undefined. batch_size(type) -> pos_integer(); batch_size(desc) -> ?DESC("batch_size"); diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl index 9e03aca4a..89b07c6e4 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl @@ -107,8 +107,7 @@ fields("config") -> ] ++ emqx_ee_connector_clickhouse:fields(config); fields("creation_opts") -> - Opts = emqx_resource_schema:fields("creation_opts"), - [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; + emqx_resource_schema:fields("creation_opts"); fields("post") -> fields("post", clickhouse); fields("put") -> @@ -131,10 +130,6 @@ desc(_) -> %% ------------------------------------------------------------------------------------------------- %% internal %% ------------------------------------------------------------------------------------------------- -is_hidden_opts(Field) -> - lists:member(Field, [ - async_inflight_window - ]). type_field(Type) -> {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. From 9d3f369cca524f1d038e9bde053d1abfceeb399b Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 Mar 2023 14:09:57 +0100 Subject: [PATCH 2/3] docs: fix spelling mistake Co-authored-by: Thales Macedo Garitezi --- apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 3a6b50e83..9b6990035 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -137,7 +137,7 @@ When disabled the messages are buffered in RAM only.""" inflight_window { desc { - en: """Query inflight window. When query_mode is set to async, this config has to be et to 1 if messages from the same MQTT client have to be stricktly ordered.""" + en: """Query inflight window. When query_mode is set to async, this config has to be set to 1 if messages from the same MQTT client have to be strictly ordered.""" zh: """请求飞行队列窗口大小。当请求模式为异步时,如果需要严格保证来自同一 MQTT 客户端的消息有序,则必须将此值设为 1。""""" } label { From 35474578ca6f1909da19de4d3928e8ea62fe00f6 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 Mar 2023 14:19:17 +0100 Subject: [PATCH 3/3] refactor: rename async_inflight_window to inflight_window everywhere --- apps/emqx_bridge/src/emqx_bridge_api.erl | 2 +- .../src/schema/emqx_bridge_compatible_config.erl | 2 +- apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl | 2 +- apps/emqx_resource/include/emqx_resource.hrl | 2 +- apps/emqx_resource/src/emqx_resource_buffer_worker.erl | 2 +- apps/emqx_resource/test/emqx_resource_SUITE.erl | 10 +++++----- scripts/test/influx/influx-bridge.conf | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 8ac3e476a..ff6842e37 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -238,7 +238,7 @@ info_example_basic(webhook) -> health_check_interval => 15000, auto_restart_interval => 15000, query_mode => async, - async_inflight_window => 100, + inflight_window => 100, max_queue_bytes => 100 * 1024 * 1024 } }; diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl index 1e55d0c0e..fe173fa89 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl @@ -86,7 +86,7 @@ default_ssl() -> default_resource_opts() -> #{ - <<"async_inflight_window">> => 100, + <<"inflight_window">> => 100, <<"auto_restart_interval">> => <<"60s">>, <<"health_check_interval">> => <<"15s">>, <<"max_queue_bytes">> => <<"1GB">>, diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index f249aa95e..e222190d2 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -172,7 +172,7 @@ bridge_async_config(#{port := Port} = Config) -> " request_timeout = \"~ps\"\n" " body = \"${id}\"" " resource_opts {\n" - " async_inflight_window = 100\n" + " inflight_window = 100\n" " auto_restart_interval = \"60s\"\n" " health_check_interval = \"15s\"\n" " max_queue_bytes = \"1GB\"\n" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 41be9e8a0..4e438bb79 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -73,7 +73,7 @@ max_queue_bytes => pos_integer(), query_mode => query_mode(), resume_interval => pos_integer(), - async_inflight_window => pos_integer() + inflight_window => pos_integer() }. -type query_result() :: ok diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 8bfd77e61..7934a2a4e 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -193,7 +193,7 @@ init({Id, Index, Opts}) -> Queue = replayq:open(QueueOpts), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), emqx_resource_metrics:inflight_set(Id, Index, 0), - InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), + InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT), InflightTID = inflight_new(InflightWinSize, Id, Index), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index e7c252fa9..990e5b472 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -369,7 +369,7 @@ t_query_counter_async_callback(_) -> #{ query_mode => async, batch_size => 1, - async_inflight_window => 1000000 + inflight_window => 1000000 } ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), @@ -450,7 +450,7 @@ t_query_counter_async_inflight(_) -> #{ query_mode => async, batch_size => 1, - async_inflight_window => WindowSize, + inflight_window => WindowSize, worker_pool_size => 1, resume_interval => 300 } @@ -634,7 +634,7 @@ t_query_counter_async_inflight_batch(_) -> query_mode => async, batch_size => BatchSize, batch_time => 100, - async_inflight_window => WindowSize, + inflight_window => WindowSize, worker_pool_size => 1, resume_interval => 300 } @@ -1584,7 +1584,7 @@ t_retry_async_inflight_full(_Config) -> #{name => ?FUNCTION_NAME}, #{ query_mode => async, - async_inflight_window => AsyncInflightWindow, + inflight_window => AsyncInflightWindow, batch_size => 1, worker_pool_size => 1, resume_interval => ResumeInterval @@ -1642,7 +1642,7 @@ t_async_reply_multi_eval(_Config) -> #{name => ?FUNCTION_NAME}, #{ query_mode => async, - async_inflight_window => AsyncInflightWindow, + inflight_window => AsyncInflightWindow, batch_size => 3, batch_time => 10, worker_pool_size => 1, diff --git a/scripts/test/influx/influx-bridge.conf b/scripts/test/influx/influx-bridge.conf index df10a0ec6..0416e42b6 100644 --- a/scripts/test/influx/influx-bridge.conf +++ b/scripts/test/influx/influx-bridge.conf @@ -6,7 +6,7 @@ bridges { org = "emqx" precision = "ms" resource_opts { - async_inflight_window = 100 + inflight_window = 100 auto_restart_interval = "60s" batch_size = 100 batch_time = "10ms"