Merge pull request #8786 from terry-xiaoyu/refactor_configs_apis_mysql_bridge

refactor: configs and APIs for mysql bridge
This commit is contained in:
Xinyu Liu 2022-08-24 10:55:09 +08:00 committed by GitHub
commit 019ab4501d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 59 deletions

View File

@ -309,11 +309,9 @@ parse_prepare_sql(Config) ->
SQL = SQL =
case maps:get(prepare_statement, Config, undefined) of case maps:get(prepare_statement, Config, undefined) of
undefined -> undefined ->
case emqx_map_lib:deep_get([egress, sql_template], Config, undefined) of case maps:get(sql, Config, undefined) of
undefined -> undefined -> #{};
#{}; Template -> #{send_message => Template}
Template ->
#{send_message => Template}
end; end;
Any -> Any ->
Any Any

View File

@ -38,16 +38,6 @@ will be forwarded.
zh: "启用/禁用桥接" zh: "启用/禁用桥接"
} }
} }
config_direction {
desc {
en: """The direction of this bridge, MUST be 'egress'"""
zh: """桥接的方向, 必须是 egress"""
}
label {
en: "Bridge Direction"
zh: "桥接方向"
}
}
desc_config { desc_config {
desc { desc {
@ -81,14 +71,4 @@ will be forwarded.
zh: "桥接名字" zh: "桥接名字"
} }
} }
desc_connector {
desc {
en: """Generic configuration for the connector."""
zh: """连接器的通用配置。"""
}
label: {
en: "Connector Generic Configuration"
zh: "连接器通用配置。"
}
}
} }

View File

@ -43,17 +43,17 @@ values(get) ->
maps:merge(values(post), ?METRICS_EXAMPLE); maps:merge(values(post), ?METRICS_EXAMPLE);
values(post) -> values(post) ->
#{ #{
enable => true,
type => mysql, type => mysql,
name => <<"foo">>, name => <<"foo">>,
sql_template => ?DEFAULT_SQL,
connector => #{
server => <<"127.0.0.1:3306">>, server => <<"127.0.0.1:3306">>,
database => <<"test">>, database => <<"test">>,
pool_size => 8, pool_size => 8,
username => <<"root">>, username => <<"root">>,
password => <<"">>, password => <<"">>,
auto_reconnect => true auto_reconnect => true,
}, sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>,
resource_opts => #{ resource_opts => #{
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
@ -62,9 +62,7 @@ values(post) ->
batch_time => ?DEFAULT_BATCH_TIME, batch_time => ?DEFAULT_BATCH_TIME,
enable_queue => false, enable_queue => false,
max_queue_bytes => ?DEFAULT_QUEUE_SIZE max_queue_bytes => ?DEFAULT_QUEUE_SIZE
}, }
enable => true,
direction => egress
}; };
values(put) -> values(put) ->
values(post). values(post).
@ -78,19 +76,15 @@ roots() -> [].
fields("config") -> fields("config") ->
[ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, {sql,
{sql_template,
mk( mk(
binary(), binary(),
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
)}, )},
{connector, {local_topic,
mk( mk(
ref(?MODULE, connector), binary(),
#{ #{desc => ?DESC("local_topic"), default => undefined}
required => true,
desc => ?DESC("desc_connector")
}
)}, )},
{resource_opts, {resource_opts,
mk( mk(
@ -101,30 +95,20 @@ fields("config") ->
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
} }
)} )}
]; ] ++
emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields();
fields("creation_opts") -> fields("creation_opts") ->
Opts = emqx_resource_schema:fields("creation_opts"), Opts = emqx_resource_schema:fields("creation_opts"),
lists:filter( [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
fun({Field, _}) ->
not lists:member(Field, [
start_after_created, start_timeout, query_mode, async_inflight_window
])
end,
Opts
);
fields("post") -> fields("post") ->
[type_field(), name_field() | fields("config")]; [type_field(), name_field() | fields("config")];
fields("put") -> fields("put") ->
fields("config"); fields("config");
fields("get") -> fields("get") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post"); emqx_bridge_schema:metrics_status_fields() ++ fields("post").
fields(connector) ->
emqx_connector_mysql:fields(config) -- emqx_connector_schema_lib:prepare_statement_fields().
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(connector) ->
?DESC("desc_connector");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for MySQL using `", string:to_upper(Method), "` method."]; ["Configuration for MySQL using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) -> desc("creation_opts" = Name) ->
@ -134,6 +118,11 @@ desc(_) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% internal %% internal
is_hidden_opts(Field) ->
lists:member(Field, [
query_mode, async_inflight_window
]).
type_field() -> type_field() ->
{type, mk(enum([mysql]), #{required => true, desc => ?DESC("desc_type")})}. {type, mk(enum([mysql]), #{required => true, desc => ?DESC("desc_type")})}.