refactor(resource): add resource_opts level into config structure

This commit is contained in:
Shawn 2022-08-15 09:22:54 +08:00
parent d1de262f31
commit 19d85d485b
11 changed files with 107 additions and 67 deletions

View File

@ -3,7 +3,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-export([roots/0, fields/1, namespace/0, desc/1]). -export([roots/0, fields/1, namespace/0, desc/1]).
@ -23,7 +23,14 @@ fields("post") ->
fields("put") -> fields("put") ->
fields("config"); fields("config");
fields("get") -> fields("get") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post"). emqx_bridge_schema:metrics_status_fields() ++ fields("post");
fields("creation_opts") ->
lists:filter(
fun({K, _V}) ->
not lists:member(K, unsupported_opts())
end,
emqx_resource_schema:fields("creation_opts")
).
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
@ -117,13 +124,17 @@ request_config() ->
]. ].
webhook_creation_opts() -> webhook_creation_opts() ->
Opts = emqx_resource_schema:fields(creation_opts), [
lists:filter( {resource_opts,
fun({K, _V}) -> mk(
not lists:member(K, unsupported_opts()) ref(?MODULE, "creation_opts"),
end, #{
Opts required => false,
). default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)}
].
unsupported_opts() -> unsupported_opts() ->
[ [

View File

@ -110,9 +110,14 @@ on_start(
{auto_reconnect, reconn_interval(AutoReconn)}, {auto_reconnect, reconn_interval(AutoReconn)},
{pool_size, PoolSize} {pool_size, PoolSize}
], ],
SqlTmpl = emqx_map_lib:deep_get([egress, sql_template], Config, undefined),
SqlTmplParts = emqx_connector_utils:split_insert_sql(SqlTmpl),
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
Prepares = parse_prepare_sql(Config), Prepares = parse_prepare_sql(Config),
State = maps:merge(#{poolname => PoolName, auto_reconnect => AutoReconn}, Prepares), State0 = #{
poolname => PoolName, auto_reconnect => AutoReconn, sql_template_parts => SqlTmplParts
},
State = maps:merge(State0, Prepares),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
ok -> {ok, init_prepare(State)}; ok -> {ok, init_prepare(State)};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
@ -136,18 +141,17 @@ on_query(
) -> ) ->
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
?TRACE("QUERY", "mysql_connector_received", LogMeta), ?TRACE("QUERY", "mysql_connector_received", LogMeta),
Worker = ecpool:get_client(PoolName), {ok, Conn} = ecpool_worker:client(ecpool:get_client(PoolName)),
{ok, Conn} = ecpool_worker:client(Worker), MySqlFun = mysql_function(TypeOrKey),
MySqlFunction = mysql_function(TypeOrKey), {SQLOrKey2, SqlParams} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
{SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State), Result = mysql:MySqlFun(Conn, SQLOrKey2, SqlParams, Timeout),
Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey2, Data, Timeout]),
case Result of case Result of
{error, disconnected} -> {error, disconnected} ->
?SLOG( ?SLOG(
error, error,
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected} LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}
), ),
%% kill the poll worker to trigger reconnection %% kill the ecpool worker to trigger reconnection
_ = exit(Conn, restart), _ = exit(Conn, restart),
Result; Result;
{error, not_prepared} -> {error, not_prepared} ->
@ -182,7 +186,7 @@ mysql_function(prepared_query) ->
execute; execute;
%% for bridge %% for bridge
mysql_function(_) -> mysql_function(_) ->
mysql_function(prepared_query). execute.
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) -> on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
@ -328,10 +332,10 @@ proc_sql_params(query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params}; {SQLOrKey, Params};
proc_sql_params(prepared_query, SQLOrKey, Params, _State) -> proc_sql_params(prepared_query, SQLOrKey, Params, _State) ->
{SQLOrKey, Params}; {SQLOrKey, Params};
proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) -> proc_sql_params(PreparedKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) ->
case maps:get(TypeOrKey, ParamsTokens, undefined) of case maps:get(PreparedKey, ParamsTokens, undefined) of
undefined -> undefined ->
{SQLOrData, Params}; {SQLOrData, Params};
Tokens -> Tokens ->
{TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} {PreparedKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
end. end.

View File

@ -0,0 +1,19 @@
-module(emqx_connector_utils).
-export([split_insert_sql/1]).
%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
split_insert_sql(SQL) ->
case re:split(SQL, "((?i)values)", [{return, binary}]) of
[Part1, _, Part3] ->
case string:trim(Part1, leading) of
<<"insert", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
<<"INSERT", _/binary>> = InsertSQL ->
{ok, {InsertSQL, Part3}};
_ ->
{error, not_insert_sql}
end;
_ ->
{error, not_insert_sql}
end.

View File

@ -1,4 +1,14 @@
emqx_resource_schema { emqx_resource_schema {
resource_opts {
desc {
en: """Resource options."""
zh: """资源相关的选项。"""
}
label {
en: """Resource Options"""
zh: """资源选项"""
}
}
health_check_interval { health_check_interval {
desc { desc {
@ -86,17 +96,6 @@ The auto restart interval after the resource is disconnected, in milliseconds.
} }
} }
resume_interval {
desc {
en: """Resume time interval when resource down."""
zh: """资源不可用时的重试时间"""
}
label {
en: """resume_interval"""
zh: """恢复时间"""
}
}
async_inflight_window { async_inflight_window {
desc { desc {
en: """Async query inflight window.""" en: """Async query inflight window."""

View File

@ -66,7 +66,6 @@
enable_queue => boolean(), enable_queue => boolean(),
queue_max_bytes => integer(), queue_max_bytes => integer(),
query_mode => async | sync | dynamic, query_mode => async | sync | dynamic,
resume_interval => integer(),
async_inflight_window => integer() async_inflight_window => integer()
}. }.
-type query_result() :: -type query_result() ::
@ -81,7 +80,6 @@
-define(DEFAULT_INFLIGHT, 100). -define(DEFAULT_INFLIGHT, 100).
-define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL, 15000).
-define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>). -define(HEALTHCHECK_INTERVAL_RAW, <<"15s">>).
-define(RESUME_INTERVAL, 15000).
-define(START_AFTER_CREATED, true). -define(START_AFTER_CREATED, true).
-define(START_TIMEOUT, 5000). -define(START_TIMEOUT, 5000).
-define(START_TIMEOUT_RAW, <<"5s">>). -define(START_TIMEOUT_RAW, <<"5s">>).

View File

@ -280,21 +280,7 @@ get_instance(ResId) ->
-spec fetch_creation_opts(map()) -> creation_opts(). -spec fetch_creation_opts(map()) -> creation_opts().
fetch_creation_opts(Opts) -> fetch_creation_opts(Opts) ->
SupportedOpts = [ maps:get(resource_opts, Opts).
health_check_interval,
start_timeout,
start_after_created,
auto_restart_interval,
enable_batch,
batch_size,
batch_time,
enable_queue,
queue_max_bytes,
query_mode,
resume_interval,
async_inflight_window
],
maps:with(SupportedOpts, Opts).
-spec list_instances() -> [resource_id()]. -spec list_instances() -> [resource_id()].
list_instances() -> list_instances() ->

View File

@ -59,11 +59,6 @@
-define(REPLY(FROM, REQUEST, RESULT), {reply, FROM, REQUEST, RESULT}). -define(REPLY(FROM, REQUEST, RESULT), {reply, FROM, REQUEST, RESULT}).
-define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]). -define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]).
-define(RESOURCE_ERROR(Reason, Msg),
{error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}}
).
-define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}).
-type id() :: binary(). -type id() :: binary().
-type query() :: {query, from(), request()}. -type query() :: {query, from(), request()}.
-type request() :: term(). -type request() :: term().
@ -140,7 +135,7 @@ init({Id, Index, Opts}) ->
batch_size => BatchSize, batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
queue => Queue, queue => Queue,
resume_interval => maps:get(resume_interval, Opts, ?RESUME_INTERVAL), resume_interval => maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
acc => [], acc => [],
acc_left => BatchSize, acc_left => BatchSize,
tref => undefined tref => undefined

View File

@ -30,14 +30,25 @@ namespace() -> "resource_schema".
roots() -> []. roots() -> [].
fields('creation_opts') -> fields("resource_opts") ->
[
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(<<"resource_opts">>)
}
)}
];
fields("creation_opts") ->
[ [
{health_check_interval, fun health_check_interval/1}, {health_check_interval, fun health_check_interval/1},
{start_after_created, fun start_after_created/1}, {start_after_created, fun start_after_created/1},
{start_timeout, fun start_timeout/1}, {start_timeout, fun start_timeout/1},
{auto_restart_interval, fun auto_restart_interval/1}, {auto_restart_interval, fun auto_restart_interval/1},
{query_mode, fun query_mode/1}, {query_mode, fun query_mode/1},
{resume_interval, fun resume_interval/1},
{async_inflight_window, fun async_inflight_window/1}, {async_inflight_window, fun async_inflight_window/1},
{enable_batch, fun enable_batch/1}, {enable_batch, fun enable_batch/1},
{batch_size, fun batch_size/1}, {batch_size, fun batch_size/1},
@ -88,12 +99,6 @@ enable_queue(default) -> false;
enable_queue(desc) -> ?DESC("enable_queue"); enable_queue(desc) -> ?DESC("enable_queue");
enable_queue(_) -> undefined. enable_queue(_) -> undefined.
resume_interval(type) -> emqx_schema:duration_ms();
resume_interval(desc) -> ?DESC("resume_interval");
resume_interval(default) -> ?RESUME_INTERVAL;
resume_interval(required) -> false;
resume_interval(_) -> undefined.
async_inflight_window(type) -> pos_integer(); async_inflight_window(type) -> pos_integer();
async_inflight_window(desc) -> ?DESC("async_inflight_window"); async_inflight_window(desc) -> ?DESC("async_inflight_window");
async_inflight_window(default) -> ?DEFAULT_INFLIGHT; async_inflight_window(default) -> ?DEFAULT_INFLIGHT;

View File

@ -1,4 +1,25 @@
emqx_ee_bridge_mysql { emqx_ee_bridge_mysql {
local_topic {
desc {
en: """
The MQTT topic filter to be forwarded to MySQL. All MQTT 'PUBLISH' messages with the topic
matching the local_topic will be forwarded.</br>
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
configured, then both the data got from the rule and the MQTT messages that match local_topic
will be forwarded.
"""
zh: """
发送到 'local_topic' 的消息都会转发到 MySQL。 </br>
注意:如果这个 Bridge 被用作规则EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。
"""
}
label {
en: "Local Topic"
zh: "本地 Topic"
}
}
sql_template { sql_template {
desc { desc {
en: """SQL Template""" en: """SQL Template"""

View File

@ -91,7 +91,7 @@ fields(basic) ->
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{write_syntax, fun write_syntax/1} {write_syntax, fun write_syntax/1}
] ++ ] ++
emqx_resource_schema:fields('creation_opts'); emqx_resource_schema:fields("resource_opts");
fields("post_udp") -> fields("post_udp") ->
method_fileds(post, influxdb_udp); method_fileds(post, influxdb_udp);
fields("post_api_v1") -> fields("post_api_v1") ->

View File

@ -43,14 +43,15 @@ values(get) ->
values(post) -> values(post) ->
#{ #{
type => mysql, type => mysql,
name => <<"mysql">>, name => <<"foo">>,
local_topic => <<"local/topic/#">>,
sql_template => ?DEFAULT_SQL, sql_template => ?DEFAULT_SQL,
connector => #{ connector => #{
server => <<"127.0.0.1:3306">>, server => <<"127.0.0.1:3306">>,
database => <<"test">>, database => <<"test">>,
pool_size => 8, pool_size => 8,
username => <<"root">>, username => <<"root">>,
password => <<"public">>, password => <<"">>,
auto_reconnect => true auto_reconnect => true
}, },
enable => true, enable => true,
@ -61,7 +62,7 @@ values(put) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions %% Hocon Schema Definitions
namespace() -> "bridge". namespace() -> "bridge_mysql".
roots() -> []. roots() -> [].
@ -69,6 +70,7 @@ fields("config") ->
[ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{sql_template, {sql_template,
mk( mk(
binary(), binary(),
@ -82,7 +84,7 @@ fields("config") ->
desc => ?DESC("desc_connector") desc => ?DESC("desc_connector")
} }
)} )}
]; ] ++ emqx_resource_schema:fields("resource_opts");
fields("post") -> fields("post") ->
[type_field(), name_field() | fields("config")]; [type_field(), name_field() | fields("config")];
fields("put") -> fields("put") ->