Merge pull request #8759 from lafirest/feat/mysql_batch_query
feat(bridge): add `on_batch_query` on emqx_connector_mysql
This commit is contained in:
commit
4ad04b646f
|
@ -28,6 +28,7 @@
|
||||||
on_start/2,
|
on_start/2,
|
||||||
on_stop/2,
|
on_stop/2,
|
||||||
on_query/3,
|
on_query/3,
|
||||||
|
on_batch_query/3,
|
||||||
on_get_status/2
|
on_get_status/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -47,12 +48,15 @@
|
||||||
|
|
||||||
-type prepares() :: #{atom() => binary()}.
|
-type prepares() :: #{atom() => binary()}.
|
||||||
-type params_tokens() :: #{atom() => list()}.
|
-type params_tokens() :: #{atom() => list()}.
|
||||||
|
-type sqls() :: #{atom() => binary()}.
|
||||||
-type state() ::
|
-type state() ::
|
||||||
#{
|
#{
|
||||||
poolname := atom(),
|
poolname := atom(),
|
||||||
prepare_statement := prepares(),
|
|
||||||
auto_reconnect := boolean(),
|
auto_reconnect := boolean(),
|
||||||
params_tokens := params_tokens()
|
prepare_statement := prepares(),
|
||||||
|
params_tokens := params_tokens(),
|
||||||
|
batch_inserts := sqls(),
|
||||||
|
batch_params_tokens := params_tokens()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
%%=====================================================================
|
%%=====================================================================
|
||||||
|
@ -134,48 +138,46 @@ on_query(
|
||||||
{TypeOrKey, SQLOrKey, Params, Timeout},
|
{TypeOrKey, SQLOrKey, Params, Timeout},
|
||||||
#{poolname := PoolName, prepare_statement := Prepares} = State
|
#{poolname := PoolName, prepare_statement := Prepares} = State
|
||||||
) ->
|
) ->
|
||||||
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
|
|
||||||
?TRACE("QUERY", "mysql_connector_received", LogMeta),
|
|
||||||
Worker = ecpool:get_client(PoolName),
|
|
||||||
{ok, Conn} = ecpool_worker:client(Worker),
|
|
||||||
MySqlFunction = mysql_function(TypeOrKey),
|
MySqlFunction = mysql_function(TypeOrKey),
|
||||||
{SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
|
{SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State),
|
||||||
Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey2, Data, Timeout]),
|
case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of
|
||||||
case Result of
|
|
||||||
{error, disconnected} ->
|
|
||||||
?SLOG(
|
|
||||||
error,
|
|
||||||
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}
|
|
||||||
),
|
|
||||||
%% kill the poll worker to trigger reconnection
|
|
||||||
_ = exit(Conn, restart),
|
|
||||||
Result;
|
|
||||||
{error, not_prepared} ->
|
{error, not_prepared} ->
|
||||||
?SLOG(
|
|
||||||
warning,
|
|
||||||
LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
|
|
||||||
),
|
|
||||||
case prepare_sql(Prepares, PoolName) of
|
case prepare_sql(Prepares, PoolName) of
|
||||||
ok ->
|
ok ->
|
||||||
%% not return result, next loop will try again
|
%% not return result, next loop will try again
|
||||||
on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
|
on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
|
||||||
?SLOG(
|
?SLOG(
|
||||||
error,
|
error,
|
||||||
LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason}
|
LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason}
|
||||||
),
|
),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
Result ->
|
||||||
?SLOG(
|
|
||||||
error,
|
|
||||||
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
|
|
||||||
),
|
|
||||||
Result;
|
|
||||||
_ ->
|
|
||||||
Result
|
Result
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
on_batch_query(
|
||||||
|
InstId,
|
||||||
|
BatchReq,
|
||||||
|
#{batch_inserts := Inserts, batch_params_tokens := ParamsTokens} = State
|
||||||
|
) ->
|
||||||
|
case hd(BatchReq) of
|
||||||
|
{Key, _} ->
|
||||||
|
case maps:get(Key, Inserts, undefined) of
|
||||||
|
undefined ->
|
||||||
|
{error, batch_select_not_implemented};
|
||||||
|
InsertSQL ->
|
||||||
|
Tokens = maps:get(Key, ParamsTokens),
|
||||||
|
on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State)
|
||||||
|
end;
|
||||||
|
Request ->
|
||||||
|
LogMeta = #{connector => InstId, first_request => Request, state => State},
|
||||||
|
?SLOG(error, LogMeta#{msg => "invalid request"}),
|
||||||
|
{error, invald_request}
|
||||||
|
end.
|
||||||
|
|
||||||
mysql_function(sql) ->
|
mysql_function(sql) ->
|
||||||
query;
|
query;
|
||||||
mysql_function(prepared_query) ->
|
mysql_function(prepared_query) ->
|
||||||
|
@ -316,13 +318,44 @@ parse_prepare_sql(Config) ->
|
||||||
Any ->
|
Any ->
|
||||||
Any
|
Any
|
||||||
end,
|
end,
|
||||||
parse_prepare_sql(maps:to_list(SQL), #{}, #{}).
|
parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}).
|
||||||
|
|
||||||
parse_prepare_sql([{Key, H} | T], SQL, Tokens) ->
|
parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) ->
|
||||||
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H),
|
{PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H),
|
||||||
parse_prepare_sql(T, SQL#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens});
|
parse_batch_prepare_sql(
|
||||||
parse_prepare_sql([], SQL, Tokens) ->
|
L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks
|
||||||
#{prepare_statement => SQL, params_tokens => Tokens}.
|
);
|
||||||
|
parse_prepare_sql([], Prepares, Tokens, BatchInserts, BatchTks) ->
|
||||||
|
#{
|
||||||
|
prepare_statement => Prepares,
|
||||||
|
params_tokens => Tokens,
|
||||||
|
batch_inserts => BatchInserts,
|
||||||
|
batch_params_tokens => BatchTks
|
||||||
|
}.
|
||||||
|
|
||||||
|
parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) ->
|
||||||
|
case emqx_plugin_libs_rule:detect_sql_type(H) of
|
||||||
|
{ok, select} ->
|
||||||
|
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks);
|
||||||
|
{ok, insert} ->
|
||||||
|
case emqx_plugin_libs_rule:split_insert_sql(H) of
|
||||||
|
{ok, {InsertSQL, Params}} ->
|
||||||
|
ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params),
|
||||||
|
parse_prepare_sql(
|
||||||
|
T,
|
||||||
|
Prepares,
|
||||||
|
Tokens,
|
||||||
|
BatchInserts#{Key => InsertSQL},
|
||||||
|
BatchTks#{Key => ParamsTks}
|
||||||
|
);
|
||||||
|
{error, Reason} ->
|
||||||
|
?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}),
|
||||||
|
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
|
||||||
|
parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks)
|
||||||
|
end.
|
||||||
|
|
||||||
proc_sql_params(query, SQLOrKey, Params, _State) ->
|
proc_sql_params(query, SQLOrKey, Params, _State) ->
|
||||||
{SQLOrKey, Params};
|
{SQLOrKey, Params};
|
||||||
|
@ -335,3 +368,60 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
|
||||||
Tokens ->
|
Tokens ->
|
||||||
{TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
|
{TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) ->
|
||||||
|
JoinFun = fun
|
||||||
|
([Msg]) ->
|
||||||
|
emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Msg);
|
||||||
|
([H | T]) ->
|
||||||
|
lists:foldl(
|
||||||
|
fun(Msg, Acc) ->
|
||||||
|
Value = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Msg),
|
||||||
|
<<Acc/binary, ", ", Value/binary>>
|
||||||
|
end,
|
||||||
|
emqx_plugin_libs_rule:proc_sql_param_str(Tokens, H),
|
||||||
|
T
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
{_, Msgs} = lists:unzip(BatchReqs),
|
||||||
|
JoinPart = JoinFun(Msgs),
|
||||||
|
SQL = <<InsertPart/binary, " values ", JoinPart/binary>>,
|
||||||
|
on_sql_query(InstId, query, SQL, [], default_timeout, State).
|
||||||
|
|
||||||
|
on_sql_query(
|
||||||
|
InstId,
|
||||||
|
SQLFunc,
|
||||||
|
SQLOrKey,
|
||||||
|
Data,
|
||||||
|
Timeout,
|
||||||
|
#{poolname := PoolName} = State
|
||||||
|
) ->
|
||||||
|
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
|
||||||
|
?TRACE("QUERY", "mysql_connector_received", LogMeta),
|
||||||
|
Worker = ecpool:get_client(PoolName),
|
||||||
|
{ok, Conn} = ecpool_worker:client(Worker),
|
||||||
|
Result = erlang:apply(mysql, SQLFunc, [Conn, SQLOrKey, Data, Timeout]),
|
||||||
|
case Result of
|
||||||
|
{error, disconnected} ->
|
||||||
|
?SLOG(
|
||||||
|
error,
|
||||||
|
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected}
|
||||||
|
),
|
||||||
|
%% kill the poll worker to trigger reconnection
|
||||||
|
_ = exit(Conn, restart),
|
||||||
|
Result;
|
||||||
|
{error, not_prepared} = Error ->
|
||||||
|
?SLOG(
|
||||||
|
warning,
|
||||||
|
LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared}
|
||||||
|
),
|
||||||
|
Error;
|
||||||
|
{error, Reason} ->
|
||||||
|
?SLOG(
|
||||||
|
error,
|
||||||
|
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
|
||||||
|
),
|
||||||
|
Result;
|
||||||
|
_ ->
|
||||||
|
Result
|
||||||
|
end.
|
||||||
|
|
|
@ -29,7 +29,9 @@
|
||||||
preproc_sql/2,
|
preproc_sql/2,
|
||||||
proc_sql/2,
|
proc_sql/2,
|
||||||
proc_sql_param_str/2,
|
proc_sql_param_str/2,
|
||||||
proc_cql_param_str/2
|
proc_cql_param_str/2,
|
||||||
|
split_insert_sql/1,
|
||||||
|
detect_sql_type/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% type converting
|
%% type converting
|
||||||
|
@ -123,6 +125,43 @@ proc_sql_param_str(Tokens, Data) ->
|
||||||
proc_cql_param_str(Tokens, Data) ->
|
proc_cql_param_str(Tokens, Data) ->
|
||||||
emqx_placeholder:proc_cql_param_str(Tokens, Data).
|
emqx_placeholder:proc_cql_param_str(Tokens, Data).
|
||||||
|
|
||||||
|
%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">>
|
||||||
|
-spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
|
||||||
|
InsertSQL :: binary(),
|
||||||
|
Params :: binary().
|
||||||
|
split_insert_sql(SQL) ->
|
||||||
|
case re:split(SQL, "((?i)values)", [{return, binary}]) of
|
||||||
|
[Part1, _, Part3] ->
|
||||||
|
case string:trim(Part1, leading) of
|
||||||
|
<<"insert", _/binary>> = InsertSQL ->
|
||||||
|
{ok, {InsertSQL, Part3}};
|
||||||
|
<<"INSERT", _/binary>> = InsertSQL ->
|
||||||
|
{ok, {InsertSQL, Part3}};
|
||||||
|
_ ->
|
||||||
|
{error, not_insert_sql}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, not_insert_sql}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when
|
||||||
|
Type :: insert | select.
|
||||||
|
detect_sql_type(SQL) ->
|
||||||
|
case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of
|
||||||
|
{match, [First]} ->
|
||||||
|
Types = [select, insert],
|
||||||
|
PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types],
|
||||||
|
LowFirst = string:lowercase(First),
|
||||||
|
case proplists:lookup(LowFirst, PropTypes) of
|
||||||
|
{LowFirst, Type} ->
|
||||||
|
{ok, Type};
|
||||||
|
_ ->
|
||||||
|
{error, invalid_sql}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, invalid_sql}
|
||||||
|
end.
|
||||||
|
|
||||||
unsafe_atom_key(Key) when is_atom(Key) ->
|
unsafe_atom_key(Key) when is_atom(Key) ->
|
||||||
Key;
|
Key;
|
||||||
unsafe_atom_key(Key) when is_binary(Key) ->
|
unsafe_atom_key(Key) when is_binary(Key) ->
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("hocon/include/hoconsc.hrl").
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
-include("emqx_ee_bridge.hrl").
|
-include("emqx_ee_bridge.hrl").
|
||||||
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
|
||||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||||
|
|
||||||
|
@ -44,7 +45,6 @@ values(post) ->
|
||||||
#{
|
#{
|
||||||
type => mysql,
|
type => mysql,
|
||||||
name => <<"foo">>,
|
name => <<"foo">>,
|
||||||
local_topic => <<"local/topic/#">>,
|
|
||||||
sql_template => ?DEFAULT_SQL,
|
sql_template => ?DEFAULT_SQL,
|
||||||
connector => #{
|
connector => #{
|
||||||
server => <<"127.0.0.1:3306">>,
|
server => <<"127.0.0.1:3306">>,
|
||||||
|
@ -54,6 +54,15 @@ values(post) ->
|
||||||
password => <<"">>,
|
password => <<"">>,
|
||||||
auto_reconnect => true
|
auto_reconnect => true
|
||||||
},
|
},
|
||||||
|
resource_opts => #{
|
||||||
|
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
|
||||||
|
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
|
||||||
|
enable_batch => false,
|
||||||
|
batch_size => ?DEFAULT_BATCH_SIZE,
|
||||||
|
batch_time => ?DEFAULT_BATCH_TIME,
|
||||||
|
enable_queue => false,
|
||||||
|
max_queue_bytes => ?DEFAULT_QUEUE_SIZE
|
||||||
|
},
|
||||||
enable => true,
|
enable => true,
|
||||||
direction => egress
|
direction => egress
|
||||||
};
|
};
|
||||||
|
@ -70,7 +79,6 @@ fields("config") ->
|
||||||
[
|
[
|
||||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||||
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
|
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
|
||||||
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
|
|
||||||
{sql_template,
|
{sql_template,
|
||||||
mk(
|
mk(
|
||||||
binary(),
|
binary(),
|
||||||
|
@ -83,8 +91,27 @@ fields("config") ->
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC("desc_connector")
|
desc => ?DESC("desc_connector")
|
||||||
}
|
}
|
||||||
|
)},
|
||||||
|
{resource_opts,
|
||||||
|
mk(
|
||||||
|
ref(?MODULE, "creation_opts"),
|
||||||
|
#{
|
||||||
|
required => false,
|
||||||
|
default => #{},
|
||||||
|
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
] ++ emqx_resource_schema:fields("resource_opts");
|
];
|
||||||
|
fields("creation_opts") ->
|
||||||
|
Opts = emqx_resource_schema:fields("creation_opts"),
|
||||||
|
lists:filter(
|
||||||
|
fun({Field, _}) ->
|
||||||
|
not lists:member(Field, [
|
||||||
|
start_after_created, start_timeout, query_mode, async_inflight_window
|
||||||
|
])
|
||||||
|
end,
|
||||||
|
Opts
|
||||||
|
);
|
||||||
fields("post") ->
|
fields("post") ->
|
||||||
[type_field(), name_field() | fields("config")];
|
[type_field(), name_field() | fields("config")];
|
||||||
fields("put") ->
|
fields("put") ->
|
||||||
|
@ -100,6 +127,8 @@ desc(connector) ->
|
||||||
?DESC("desc_connector");
|
?DESC("desc_connector");
|
||||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||||
["Configuration for MySQL using `", string:to_upper(Method), "` method."];
|
["Configuration for MySQL using `", string:to_upper(Method), "` method."];
|
||||||
|
desc("creation_opts" = Name) ->
|
||||||
|
emqx_resource_schema:desc(Name);
|
||||||
desc(_) ->
|
desc(_) ->
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue