fix: add query_mode_sync_only for mysql pgsql redis mongodb bridge
This commit is contained in:
parent
dcc6bd9c21
commit
22c3f50020
|
@ -89,6 +89,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
query_mode_sync_only {
|
||||||
|
desc {
|
||||||
|
en: """Query mode. Currently only support 'sync'."""
|
||||||
|
zh: """请求模式。目前只支持同步模式。"""
|
||||||
|
}
|
||||||
|
label {
|
||||||
|
en: """Query mode"""
|
||||||
|
zh: """请求模式"""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
request_timeout {
|
request_timeout {
|
||||||
desc {
|
desc {
|
||||||
en: """Timeout for requests. If <code>query_mode</code> is <code>sync</code>, calls to the resource will be blocked for this amount of time before timing out."""
|
en: """Timeout for requests. If <code>query_mode</code> is <code>sync</code>, calls to the resource will be blocked for this amount of time before timing out."""
|
||||||
|
|
|
@ -30,16 +30,25 @@ namespace() -> "resource_schema".
|
||||||
|
|
||||||
roots() -> [].
|
roots() -> [].
|
||||||
|
|
||||||
|
fields("resource_opts_sync_only") ->
|
||||||
|
[
|
||||||
|
{resource_opts,
|
||||||
|
mk(
|
||||||
|
ref(?MODULE, "creation_opts_sync_only"),
|
||||||
|
resource_opts_meta()
|
||||||
|
)}
|
||||||
|
];
|
||||||
|
fields("creation_opts_sync_only") ->
|
||||||
|
Fields0 = fields("creation_opts"),
|
||||||
|
Fields1 = lists:keydelete(async_inflight_window, 1, Fields0),
|
||||||
|
QueryMod = {query_mode, fun query_mode_sync_only/1},
|
||||||
|
lists:keyreplace(query_mode, 1, Fields1, QueryMod);
|
||||||
fields("resource_opts") ->
|
fields("resource_opts") ->
|
||||||
[
|
[
|
||||||
{resource_opts,
|
{resource_opts,
|
||||||
mk(
|
mk(
|
||||||
ref(?MODULE, "creation_opts"),
|
ref(?MODULE, "creation_opts"),
|
||||||
#{
|
resource_opts_meta()
|
||||||
required => false,
|
|
||||||
default => #{},
|
|
||||||
desc => ?DESC(<<"resource_opts">>)
|
|
||||||
}
|
|
||||||
)}
|
)}
|
||||||
];
|
];
|
||||||
fields("creation_opts") ->
|
fields("creation_opts") ->
|
||||||
|
@ -59,6 +68,13 @@ fields("creation_opts") ->
|
||||||
{max_queue_bytes, fun max_queue_bytes/1}
|
{max_queue_bytes, fun max_queue_bytes/1}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
resource_opts_meta() ->
|
||||||
|
#{
|
||||||
|
required => false,
|
||||||
|
default => #{},
|
||||||
|
desc => ?DESC(<<"resource_opts">>)
|
||||||
|
}.
|
||||||
|
|
||||||
worker_pool_size(type) -> non_neg_integer();
|
worker_pool_size(type) -> non_neg_integer();
|
||||||
worker_pool_size(desc) -> ?DESC("worker_pool_size");
|
worker_pool_size(desc) -> ?DESC("worker_pool_size");
|
||||||
worker_pool_size(default) -> ?WORKER_POOL_SIZE;
|
worker_pool_size(default) -> ?WORKER_POOL_SIZE;
|
||||||
|
@ -95,6 +111,12 @@ query_mode(default) -> async;
|
||||||
query_mode(required) -> false;
|
query_mode(required) -> false;
|
||||||
query_mode(_) -> undefined.
|
query_mode(_) -> undefined.
|
||||||
|
|
||||||
|
query_mode_sync_only(type) -> enum([sync]);
|
||||||
|
query_mode_sync_only(desc) -> ?DESC("query_mode_sync_only");
|
||||||
|
query_mode_sync_only(default) -> async;
|
||||||
|
query_mode_sync_only(required) -> false;
|
||||||
|
query_mode_sync_only(_) -> undefined.
|
||||||
|
|
||||||
request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
|
request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
|
||||||
request_timeout(desc) -> ?DESC("request_timeout");
|
request_timeout(desc) -> ?DESC("request_timeout");
|
||||||
request_timeout(default) -> <<"15s">>;
|
request_timeout(default) -> <<"15s">>;
|
||||||
|
@ -139,4 +161,6 @@ max_queue_bytes(required) -> false;
|
||||||
max_queue_bytes(_) -> undefined.
|
max_queue_bytes(_) -> undefined.
|
||||||
|
|
||||||
desc("creation_opts") ->
|
desc("creation_opts") ->
|
||||||
|
?DESC("creation_opts");
|
||||||
|
desc("creation_opts_sync_only") ->
|
||||||
?DESC("creation_opts").
|
?DESC("creation_opts").
|
||||||
|
|
|
@ -39,7 +39,7 @@ fields("config") ->
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
|
||||||
{collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
|
{collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
|
||||||
{payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}
|
{payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}
|
||||||
] ++ fields("resource_opts");
|
] ++ emqx_resource_schema:fields("resource_opts_sync_only");
|
||||||
fields(mongodb_rs) ->
|
fields(mongodb_rs) ->
|
||||||
emqx_connector_mongo:fields(rs) ++ fields("config");
|
emqx_connector_mongo:fields(rs) ++ fields("config");
|
||||||
fields(mongodb_sharded) ->
|
fields(mongodb_sharded) ->
|
||||||
|
@ -69,32 +69,7 @@ fields("get_sharded") ->
|
||||||
fields("get_single") ->
|
fields("get_single") ->
|
||||||
emqx_bridge_schema:status_fields() ++
|
emqx_bridge_schema:status_fields() ++
|
||||||
fields(mongodb_single) ++
|
fields(mongodb_single) ++
|
||||||
type_and_name_fields(mongodb_single);
|
type_and_name_fields(mongodb_single).
|
||||||
fields("creation_opts") ->
|
|
||||||
lists:map(
|
|
||||||
fun
|
|
||||||
({query_mode, _FieldSchema}) ->
|
|
||||||
{query_mode,
|
|
||||||
mk(
|
|
||||||
enum([sync, async]),
|
|
||||||
#{
|
|
||||||
desc => ?DESC(emqx_resource_schema, "query_mode"),
|
|
||||||
default => async
|
|
||||||
}
|
|
||||||
)};
|
|
||||||
(Field) ->
|
|
||||||
Field
|
|
||||||
end,
|
|
||||||
emqx_resource_schema:fields("creation_opts")
|
|
||||||
);
|
|
||||||
fields("resource_opts") ->
|
|
||||||
[
|
|
||||||
{resource_opts,
|
|
||||||
mk(
|
|
||||||
ref(?MODULE, "creation_opts"),
|
|
||||||
#{default => #{}, desc => ?DESC(emqx_resource_schema, "resource_opts")}
|
|
||||||
)}
|
|
||||||
].
|
|
||||||
|
|
||||||
conn_bridge_examples(Method) ->
|
conn_bridge_examples(Method) ->
|
||||||
[
|
[
|
||||||
|
|
|
@ -98,8 +98,7 @@ fields("config") ->
|
||||||
(emqx_connector_mysql:fields(config) --
|
(emqx_connector_mysql:fields(config) --
|
||||||
emqx_connector_schema_lib:prepare_statement_fields());
|
emqx_connector_schema_lib:prepare_statement_fields());
|
||||||
fields("creation_opts") ->
|
fields("creation_opts") ->
|
||||||
Opts = emqx_resource_schema:fields("creation_opts"),
|
emqx_resource_schema:fields("creation_opts_sync_only");
|
||||||
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
|
|
||||||
fields("post") ->
|
fields("post") ->
|
||||||
[type_field(), name_field() | fields("config")];
|
[type_field(), name_field() | fields("config")];
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
|
@ -118,10 +117,6 @@ desc(_) ->
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% internal
|
%% internal
|
||||||
is_hidden_opts(Field) ->
|
|
||||||
lists:member(Field, [
|
|
||||||
async_inflight_window
|
|
||||||
]).
|
|
||||||
|
|
||||||
type_field() ->
|
type_field() ->
|
||||||
{type, mk(enum([mysql]), #{required => true, desc => ?DESC("desc_type")})}.
|
{type, mk(enum([mysql]), #{required => true, desc => ?DESC("desc_type")})}.
|
||||||
|
|
|
@ -100,8 +100,7 @@ fields("config") ->
|
||||||
(emqx_connector_pgsql:fields(config) --
|
(emqx_connector_pgsql:fields(config) --
|
||||||
emqx_connector_schema_lib:prepare_statement_fields());
|
emqx_connector_schema_lib:prepare_statement_fields());
|
||||||
fields("creation_opts") ->
|
fields("creation_opts") ->
|
||||||
Opts = emqx_resource_schema:fields("creation_opts"),
|
emqx_resource_schema:fields("creation_opts_sync_only");
|
||||||
[O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
|
|
||||||
fields("post") ->
|
fields("post") ->
|
||||||
fields("post", pgsql);
|
fields("post", pgsql);
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
|
@ -122,11 +121,6 @@ desc(_) ->
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% internal
|
|
||||||
is_hidden_opts(Field) ->
|
|
||||||
lists:member(Field, [
|
|
||||||
async_inflight_window
|
|
||||||
]).
|
|
||||||
|
|
||||||
type_field(Type) ->
|
type_field(Type) ->
|
||||||
{type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
|
{type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
|
||||||
|
|
|
@ -181,10 +181,10 @@ resource_fields(Type) ->
|
||||||
resource_creation_fields("redis_cluster") ->
|
resource_creation_fields("redis_cluster") ->
|
||||||
% TODO
|
% TODO
|
||||||
% Cluster bridge is currently incompatible with batching.
|
% Cluster bridge is currently incompatible with batching.
|
||||||
Fields = emqx_resource_schema:fields("creation_opts"),
|
Fields = emqx_resource_schema:fields("creation_opts_sync_only"),
|
||||||
lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time]);
|
lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time]);
|
||||||
resource_creation_fields(_) ->
|
resource_creation_fields(_) ->
|
||||||
emqx_resource_schema:fields("creation_opts").
|
emqx_resource_schema:fields("creation_opts_sync_only").
|
||||||
|
|
||||||
desc("config") ->
|
desc("config") ->
|
||||||
?DESC("desc_config");
|
?DESC("desc_config");
|
||||||
|
|
Loading…
Reference in New Issue