fix: add inflight window setting to the clickhouse bridge

This commit makes sure the inflight window setting is present for the
clickhouse bridge. It also changes emqx_resource_schema that previously
removed the inflight window setting from resources with query mode
`always_sync`. We don't need to do that because all bridges that uses
the buffer worker queue will get async call handling even if the bridge
don't support the async callback.

Co-authored-by: Zaiming (Stone) Shi <zmstone@gmail.com>
This commit is contained in:
Kjell Winblad 2023-03-21 16:30:45 +01:00
parent fd23800370
commit 27b8445337
3 changed files with 15 additions and 20 deletions

View File

@ -135,14 +135,14 @@ When disabled the messages are buffered in RAM only."""
} }
} }
async_inflight_window { inflight_window {
desc { desc {
en: """Async query inflight window.""" en: """Query inflight window. When query_mode is set to async, this config has to be et to 1 if messages from the same MQTT client have to be stricktly ordered."""
zh: """异步请求飞行队列窗口大小。""" zh: """请求飞行队列窗口大小。当请求模式为异步时,如果需要严格保证来自同一 MQTT 客户端的消息有序,则必须将此值设为 1。"""""
} }
label { label {
en: """Async inflight window""" en: """Inflight window"""
zh: """异步请求飞行队列窗口""" zh: """请求飞行队列窗口"""
} }
} }

View File

@ -39,10 +39,9 @@ fields("resource_opts_sync_only") ->
)} )}
]; ];
fields("creation_opts_sync_only") -> fields("creation_opts_sync_only") ->
Fields0 = fields("creation_opts"), Fields = fields("creation_opts"),
Fields1 = lists:keydelete(async_inflight_window, 1, Fields0),
QueryMod = {query_mode, fun query_mode_sync_only/1}, QueryMod = {query_mode, fun query_mode_sync_only/1},
lists:keyreplace(query_mode, 1, Fields1, QueryMod); lists:keyreplace(query_mode, 1, Fields, QueryMod);
fields("resource_opts") -> fields("resource_opts") ->
[ [
{resource_opts, {resource_opts,
@ -60,7 +59,7 @@ fields("creation_opts") ->
{auto_restart_interval, fun auto_restart_interval/1}, {auto_restart_interval, fun auto_restart_interval/1},
{query_mode, fun query_mode/1}, {query_mode, fun query_mode/1},
{request_timeout, fun request_timeout/1}, {request_timeout, fun request_timeout/1},
{async_inflight_window, fun async_inflight_window/1}, {inflight_window, fun inflight_window/1},
{enable_batch, fun enable_batch/1}, {enable_batch, fun enable_batch/1},
{batch_size, fun batch_size/1}, {batch_size, fun batch_size/1},
{batch_time, fun batch_time/1}, {batch_time, fun batch_time/1},
@ -136,11 +135,12 @@ enable_queue(deprecated) -> {since, "v5.0.14"};
enable_queue(desc) -> ?DESC("enable_queue"); enable_queue(desc) -> ?DESC("enable_queue");
enable_queue(_) -> undefined. enable_queue(_) -> undefined.
async_inflight_window(type) -> pos_integer(); inflight_window(type) -> pos_integer();
async_inflight_window(desc) -> ?DESC("async_inflight_window"); inflight_window(aliases) -> [async_inflight_window];
async_inflight_window(default) -> ?DEFAULT_INFLIGHT; inflight_window(desc) -> ?DESC("inflight_window");
async_inflight_window(required) -> false; inflight_window(default) -> ?DEFAULT_INFLIGHT;
async_inflight_window(_) -> undefined. inflight_window(required) -> false;
inflight_window(_) -> undefined.
batch_size(type) -> pos_integer(); batch_size(type) -> pos_integer();
batch_size(desc) -> ?DESC("batch_size"); batch_size(desc) -> ?DESC("batch_size");

View File

@ -107,8 +107,7 @@ fields("config") ->
] ++ ] ++
emqx_ee_connector_clickhouse:fields(config); emqx_ee_connector_clickhouse:fields(config);
fields("creation_opts") -> fields("creation_opts") ->
Opts = emqx_resource_schema:fields("creation_opts"), emqx_resource_schema:fields("creation_opts");
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
fields("post") -> fields("post") ->
fields("post", clickhouse); fields("post", clickhouse);
fields("put") -> fields("put") ->
@ -131,10 +130,6 @@ desc(_) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% internal %% internal
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
is_hidden_opts(Field) ->
lists:member(Field, [
async_inflight_window
]).
type_field(Type) -> type_field(Type) ->
{type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.