Merge pull request #10197 from kjellwinblad/0321-fix-inflight-window-hand-over-to-kjell
fix: add inflight window setting to the clickhouse bridge
This commit is contained in:
commit
8e0d315b7b
|
@ -218,7 +218,7 @@ info_example_basic(webhook) ->
|
||||||
health_check_interval => 15000,
|
health_check_interval => 15000,
|
||||||
auto_restart_interval => 15000,
|
auto_restart_interval => 15000,
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
async_inflight_window => 100,
|
inflight_window => 100,
|
||||||
max_queue_bytes => 100 * 1024 * 1024
|
max_queue_bytes => 100 * 1024 * 1024
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -86,7 +86,7 @@ default_ssl() ->
|
||||||
|
|
||||||
default_resource_opts() ->
|
default_resource_opts() ->
|
||||||
#{
|
#{
|
||||||
<<"async_inflight_window">> => 100,
|
<<"inflight_window">> => 100,
|
||||||
<<"auto_restart_interval">> => <<"60s">>,
|
<<"auto_restart_interval">> => <<"60s">>,
|
||||||
<<"health_check_interval">> => <<"15s">>,
|
<<"health_check_interval">> => <<"15s">>,
|
||||||
<<"max_queue_bytes">> => <<"1GB">>,
|
<<"max_queue_bytes">> => <<"1GB">>,
|
||||||
|
|
|
@ -172,7 +172,7 @@ bridge_async_config(#{port := Port} = Config) ->
|
||||||
" request_timeout = \"~ps\"\n"
|
" request_timeout = \"~ps\"\n"
|
||||||
" body = \"${id}\""
|
" body = \"${id}\""
|
||||||
" resource_opts {\n"
|
" resource_opts {\n"
|
||||||
" async_inflight_window = 100\n"
|
" inflight_window = 100\n"
|
||||||
" auto_restart_interval = \"60s\"\n"
|
" auto_restart_interval = \"60s\"\n"
|
||||||
" health_check_interval = \"15s\"\n"
|
" health_check_interval = \"15s\"\n"
|
||||||
" max_queue_bytes = \"1GB\"\n"
|
" max_queue_bytes = \"1GB\"\n"
|
||||||
|
|
|
@ -146,14 +146,14 @@ When disabled the messages are buffered in RAM only."""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async_inflight_window {
|
inflight_window {
|
||||||
desc {
|
desc {
|
||||||
en: """Async query inflight window."""
|
en: """Query inflight window. When query_mode is set to async, this config has to be set to 1 if messages from the same MQTT client have to be strictly ordered."""
|
||||||
zh: """异步请求飞行队列窗口大小。"""
|
zh: """请求飞行队列窗口大小。当请求模式为异步时,如果需要严格保证来自同一 MQTT 客户端的消息有序,则必须将此值设为 1。"""""
|
||||||
}
|
}
|
||||||
label {
|
label {
|
||||||
en: """Async inflight window"""
|
en: """Inflight window"""
|
||||||
zh: """异步请求飞行队列窗口"""
|
zh: """请求飞行队列窗口"""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,7 @@
|
||||||
max_queue_bytes => pos_integer(),
|
max_queue_bytes => pos_integer(),
|
||||||
query_mode => query_mode(),
|
query_mode => query_mode(),
|
||||||
resume_interval => pos_integer(),
|
resume_interval => pos_integer(),
|
||||||
async_inflight_window => pos_integer()
|
inflight_window => pos_integer()
|
||||||
}.
|
}.
|
||||||
-type query_result() ::
|
-type query_result() ::
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -195,7 +195,7 @@ init({Id, Index, Opts}) ->
|
||||||
Queue = replayq:open(QueueOpts),
|
Queue = replayq:open(QueueOpts),
|
||||||
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
|
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
|
||||||
emqx_resource_metrics:inflight_set(Id, Index, 0),
|
emqx_resource_metrics:inflight_set(Id, Index, 0),
|
||||||
InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
|
InflightWinSize = maps:get(inflight_window, Opts, ?DEFAULT_INFLIGHT),
|
||||||
InflightTID = inflight_new(InflightWinSize, Id, Index),
|
InflightTID = inflight_new(InflightWinSize, Id, Index),
|
||||||
HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
|
HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
|
||||||
RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT),
|
RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT),
|
||||||
|
|
|
@ -39,10 +39,9 @@ fields("resource_opts_sync_only") ->
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields("creation_opts_sync_only") ->
|
fields("creation_opts_sync_only") ->
|
||||||
Fields0 = fields("creation_opts"),
|
Fields = fields("creation_opts"),
|
||||||
Fields1 = lists:keydelete(async_inflight_window, 1, Fields0),
|
|
||||||
QueryMod = {query_mode, fun query_mode_sync_only/1},
|
QueryMod = {query_mode, fun query_mode_sync_only/1},
|
||||||
lists:keyreplace(query_mode, 1, Fields1, QueryMod);
|
lists:keyreplace(query_mode, 1, Fields, QueryMod);
|
||||||
fields("resource_opts") ->
|
fields("resource_opts") ->
|
||||||
[
|
[
|
||||||
{resource_opts,
|
{resource_opts,
|
||||||
|
@ -61,7 +60,7 @@ fields("creation_opts") ->
|
||||||
{auto_restart_interval, fun auto_restart_interval/1},
|
{auto_restart_interval, fun auto_restart_interval/1},
|
||||||
{query_mode, fun query_mode/1},
|
{query_mode, fun query_mode/1},
|
||||||
{request_timeout, fun request_timeout/1},
|
{request_timeout, fun request_timeout/1},
|
||||||
{async_inflight_window, fun async_inflight_window/1},
|
{inflight_window, fun inflight_window/1},
|
||||||
{enable_batch, fun enable_batch/1},
|
{enable_batch, fun enable_batch/1},
|
||||||
{batch_size, fun batch_size/1},
|
{batch_size, fun batch_size/1},
|
||||||
{batch_time, fun batch_time/1},
|
{batch_time, fun batch_time/1},
|
||||||
|
@ -143,11 +142,12 @@ enable_queue(deprecated) -> {since, "v5.0.14"};
|
||||||
enable_queue(desc) -> ?DESC("enable_queue");
|
enable_queue(desc) -> ?DESC("enable_queue");
|
||||||
enable_queue(_) -> undefined.
|
enable_queue(_) -> undefined.
|
||||||
|
|
||||||
async_inflight_window(type) -> pos_integer();
|
inflight_window(type) -> pos_integer();
|
||||||
async_inflight_window(desc) -> ?DESC("async_inflight_window");
|
inflight_window(aliases) -> [async_inflight_window];
|
||||||
async_inflight_window(default) -> ?DEFAULT_INFLIGHT;
|
inflight_window(desc) -> ?DESC("inflight_window");
|
||||||
async_inflight_window(required) -> false;
|
inflight_window(default) -> ?DEFAULT_INFLIGHT;
|
||||||
async_inflight_window(_) -> undefined.
|
inflight_window(required) -> false;
|
||||||
|
inflight_window(_) -> undefined.
|
||||||
|
|
||||||
batch_size(type) -> pos_integer();
|
batch_size(type) -> pos_integer();
|
||||||
batch_size(desc) -> ?DESC("batch_size");
|
batch_size(desc) -> ?DESC("batch_size");
|
||||||
|
|
|
@ -369,7 +369,7 @@ t_query_counter_async_callback(_) ->
|
||||||
#{
|
#{
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
batch_size => 1,
|
batch_size => 1,
|
||||||
async_inflight_window => 1000000
|
inflight_window => 1000000
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
|
?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
|
||||||
|
@ -450,7 +450,7 @@ t_query_counter_async_inflight(_) ->
|
||||||
#{
|
#{
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
batch_size => 1,
|
batch_size => 1,
|
||||||
async_inflight_window => WindowSize,
|
inflight_window => WindowSize,
|
||||||
worker_pool_size => 1,
|
worker_pool_size => 1,
|
||||||
resume_interval => 300
|
resume_interval => 300
|
||||||
}
|
}
|
||||||
|
@ -634,7 +634,7 @@ t_query_counter_async_inflight_batch(_) ->
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
batch_size => BatchSize,
|
batch_size => BatchSize,
|
||||||
batch_time => 100,
|
batch_time => 100,
|
||||||
async_inflight_window => WindowSize,
|
inflight_window => WindowSize,
|
||||||
worker_pool_size => 1,
|
worker_pool_size => 1,
|
||||||
resume_interval => 300
|
resume_interval => 300
|
||||||
}
|
}
|
||||||
|
@ -1584,7 +1584,7 @@ t_retry_async_inflight_full(_Config) ->
|
||||||
#{name => ?FUNCTION_NAME},
|
#{name => ?FUNCTION_NAME},
|
||||||
#{
|
#{
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
async_inflight_window => AsyncInflightWindow,
|
inflight_window => AsyncInflightWindow,
|
||||||
batch_size => 1,
|
batch_size => 1,
|
||||||
worker_pool_size => 1,
|
worker_pool_size => 1,
|
||||||
resume_interval => ResumeInterval
|
resume_interval => ResumeInterval
|
||||||
|
@ -1642,7 +1642,7 @@ t_async_reply_multi_eval(_Config) ->
|
||||||
#{name => ?FUNCTION_NAME},
|
#{name => ?FUNCTION_NAME},
|
||||||
#{
|
#{
|
||||||
query_mode => async,
|
query_mode => async,
|
||||||
async_inflight_window => AsyncInflightWindow,
|
inflight_window => AsyncInflightWindow,
|
||||||
batch_size => 3,
|
batch_size => 3,
|
||||||
batch_time => 10,
|
batch_time => 10,
|
||||||
worker_pool_size => 1,
|
worker_pool_size => 1,
|
||||||
|
|
|
@ -103,8 +103,7 @@ fields("config") ->
|
||||||
] ++
|
] ++
|
||||||
emqx_ee_connector_clickhouse:fields(config);
|
emqx_ee_connector_clickhouse:fields(config);
|
||||||
fields("creation_opts") ->
|
fields("creation_opts") ->
|
||||||
Opts = emqx_resource_schema:fields("creation_opts"),
|
emqx_resource_schema:fields("creation_opts");
|
||||||
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
|
|
||||||
fields("post") ->
|
fields("post") ->
|
||||||
fields("post", clickhouse);
|
fields("post", clickhouse);
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
|
@ -127,10 +126,6 @@ desc(_) ->
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% internal
|
%% internal
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
is_hidden_opts(Field) ->
|
|
||||||
lists:member(Field, [
|
|
||||||
async_inflight_window
|
|
||||||
]).
|
|
||||||
|
|
||||||
type_field(Type) ->
|
type_field(Type) ->
|
||||||
{type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
|
{type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
|
||||||
|
|
|
@ -6,7 +6,7 @@ bridges {
|
||||||
org = "emqx"
|
org = "emqx"
|
||||||
precision = "ms"
|
precision = "ms"
|
||||||
resource_opts {
|
resource_opts {
|
||||||
async_inflight_window = 100
|
inflight_window = 100
|
||||||
auto_restart_interval = "60s"
|
auto_restart_interval = "60s"
|
||||||
batch_size = 100
|
batch_size = 100
|
||||||
batch_time = "10ms"
|
batch_time = "10ms"
|
||||||
|
|
Loading…
Reference in New Issue