From 8d8afd1688722a877fcecf43448c93df8e63b753 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 19 Aug 2022 13:16:22 +0800 Subject: [PATCH] feat(bridge): add `on_batch_query` on emqx_connector_mysql --- .../src/emqx_connector_mysql.erl | 154 ++++++++++++++---- .../src/emqx_plugin_libs_rule.erl | 41 ++++- .../src/emqx_ee_bridge_mysql.erl | 16 +- 3 files changed, 176 insertions(+), 35 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index cae07433a..bedd09267 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -28,6 +28,7 @@ on_start/2, on_stop/2, on_query/3, + on_batch_query/3, on_get_status/2 ]). @@ -47,12 +48,15 @@ -type prepares() :: #{atom() => binary()}. -type params_tokens() :: #{atom() => list()}. +-type sqls() :: #{atom() => binary()}. -type state() :: #{ poolname := atom(), - prepare_statement := prepares(), auto_reconnect := boolean(), - params_tokens := params_tokens() + prepare_statement := prepares(), + params_tokens := params_tokens(), + batch_inserts := sqls(), + batch_params_tokens := params_tokens() }. %%===================================================================== @@ -134,48 +138,46 @@ on_query( {TypeOrKey, SQLOrKey, Params, Timeout}, #{poolname := PoolName, prepare_statement := Prepares} = State ) -> - LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, - ?TRACE("QUERY", "mysql_connector_received", LogMeta), - Worker = ecpool:get_client(PoolName), - {ok, Conn} = ecpool_worker:client(Worker), MySqlFunction = mysql_function(TypeOrKey), {SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State), - Result = erlang:apply(mysql, MySqlFunction, [Conn, SQLOrKey2, Data, Timeout]), - case Result of - {error, disconnected} -> - ?SLOG( - error, - LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected} - ), - %% kill the poll worker to trigger reconnection - _ = exit(Conn, restart), - Result; + case on_sql_query(InstId, MySqlFunction, SQLOrKey2, Data, Timeout, State) of {error, not_prepared} -> - ?SLOG( - warning, - LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared} - ), case prepare_sql(Prepares, PoolName) of ok -> %% not return result, next loop will try again on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State); {error, Reason} -> + LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?SLOG( error, LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason} ), {error, Reason} end; - {error, Reason} -> - ?SLOG( - error, - LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} - ), - Result; - _ -> + Result -> Result end. +on_batch_query( + InstId, + BatchReq, + #{batch_inserts := Inserts, batch_params_tokens := ParamsTokens} = State +) -> + case hd(BatchReq) of + {Key, _} -> + case maps:get(Key, Inserts, undefined) of + undefined -> + {error, batch_select_not_implemented}; + InsertSQL -> + Tokens = maps:get(Key, ParamsTokens), + on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State) + end; + Request -> + LogMeta = #{connector => InstId, first_request => Request, state => State}, + ?SLOG(error, LogMeta#{msg => "invalid request"}), + {error, invald_request} + end. + mysql_function(sql) -> query; mysql_function(prepared_query) -> @@ -316,13 +318,44 @@ parse_prepare_sql(Config) -> Any -> Any end, - parse_prepare_sql(maps:to_list(SQL), #{}, #{}). + parse_prepare_sql(maps:to_list(SQL), #{}, #{}, #{}, #{}). -parse_prepare_sql([{Key, H} | T], SQL, Tokens) -> +parse_prepare_sql([{Key, H} | _] = L, Prepares, Tokens, BatchInserts, BatchTks) -> {PrepareSQL, ParamsTokens} = emqx_plugin_libs_rule:preproc_sql(H), - parse_prepare_sql(T, SQL#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}); -parse_prepare_sql([], SQL, Tokens) -> - #{prepare_statement => SQL, params_tokens => Tokens}. + parse_batch_prepare_sql( + L, Prepares#{Key => PrepareSQL}, Tokens#{Key => ParamsTokens}, BatchInserts, BatchTks + ); +parse_prepare_sql([], Prepares, Tokens, BatchInserts, BatchTks) -> + #{ + prepare_statement => Prepares, + params_tokens => Tokens, + batch_inserts => BatchInserts, + batch_params_tokens => BatchTks + }. + +parse_batch_prepare_sql([{Key, H} | T], Prepares, Tokens, BatchInserts, BatchTks) -> + case emqx_plugin_libs_rule:detect_sql_type(H) of + {ok, select} -> + parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks); + {ok, insert} -> + case emqx_plugin_libs_rule:split_insert_sql(H) of + {ok, {InsertSQL, Params}} -> + ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params), + parse_prepare_sql( + T, + Prepares, + Tokens, + BatchInserts#{Key => InsertSQL}, + BatchTks#{Key => ParamsTks} + ); + {error, Reason} -> + ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}), + parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks) + end; + {error, Reason} -> + ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), + parse_prepare_sql(T, Prepares, Tokens, BatchInserts, BatchTks) + end. proc_sql_params(query, SQLOrKey, Params, _State) -> {SQLOrKey, Params}; @@ -335,3 +368,60 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) Tokens -> {TypeOrKey, emqx_plugin_libs_rule:proc_sql(Tokens, SQLOrData)} end. + +on_batch_insert(InstId, BatchReqs, InsertPart, Tokens, State) -> + JoinFun = fun + ([Msg]) -> + emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Msg); + ([H | T]) -> + lists:foldl( + fun(Msg, Acc) -> + Value = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Msg), + <> + end, + emqx_plugin_libs_rule:proc_sql_param_str(Tokens, H), + T + ) + end, + {_, Msgs} = lists:unzip(BatchReqs), + JoinPart = JoinFun(Msgs), + SQL = <>, + on_sql_query(InstId, query, SQL, [], default_timeout, State). + +on_sql_query( + InstId, + SQLFunc, + SQLOrKey, + Data, + Timeout, + #{poolname := PoolName} = State +) -> + LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, + ?TRACE("QUERY", "mysql_connector_received", LogMeta), + Worker = ecpool:get_client(PoolName), + {ok, Conn} = ecpool_worker:client(Worker), + Result = erlang:apply(mysql, SQLFunc, [Conn, SQLOrKey, Data, Timeout]), + case Result of + {error, disconnected} -> + ?SLOG( + error, + LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => disconnected} + ), + %% kill the poll worker to trigger reconnection + _ = exit(Conn, restart), + Result; + {error, not_prepared} = Error -> + ?SLOG( + warning, + LogMeta#{msg => "mysql_connector_prepare_query_failed", reason => not_prepared} + ), + Error; + {error, Reason} -> + ?SLOG( + error, + LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} + ), + Result; + _ -> + Result + end. diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index 03304c209..e94d62b53 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -29,7 +29,9 @@ preproc_sql/2, proc_sql/2, proc_sql_param_str/2, - proc_cql_param_str/2 + proc_cql_param_str/2, + split_insert_sql/1, + detect_sql_type/1 ]). %% type converting @@ -123,6 +125,43 @@ proc_sql_param_str(Tokens, Data) -> proc_cql_param_str(Tokens, Data) -> emqx_placeholder:proc_cql_param_str(Tokens, Data). +%% SQL = <<"INSERT INTO \"abc\" (c1,c2,c3) VALUES (${1}, ${1}, ${1})">> +-spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when + InsertSQL :: binary(), + Params :: binary(). +split_insert_sql(SQL) -> + case re:split(SQL, "((?i)values)", [{return, binary}]) of + [Part1, _, Part3] -> + case string:trim(Part1, leading) of + <<"insert", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + <<"INSERT", _/binary>> = InsertSQL -> + {ok, {InsertSQL, Part3}}; + _ -> + {error, not_insert_sql} + end; + _ -> + {error, not_insert_sql} + end. + +-spec detect_sql_type(binary()) -> {ok, Type} | {error, atom()} when + Type :: insert | select. +detect_sql_type(SQL) -> + case re:run(SQL, "^\\s*([a-zA-Z]+)", [{capture, all_but_first, list}]) of + {match, [First]} -> + Types = [select, insert], + PropTypes = [{erlang:atom_to_list(Type), Type} || Type <- Types], + LowFirst = string:lowercase(First), + case proplists:lookup(LowFirst, PropTypes) of + {LowFirst, Type} -> + {ok, Type}; + _ -> + {error, invalid_sql} + end; + _ -> + {error, invalid_sql} + end. + unsafe_atom_key(Key) when is_atom(Key) -> Key; unsafe_atom_key(Key) when is_binary(Key) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index c9b611a52..5f6db77e8 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -6,6 +6,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include("emqx_ee_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -44,7 +45,6 @@ values(post) -> #{ type => mysql, name => <<"foo">>, - local_topic => <<"local/topic/#">>, sql_template => ?DEFAULT_SQL, connector => #{ server => <<"127.0.0.1:3306">>, @@ -54,6 +54,19 @@ values(post) -> password => <<"">>, auto_reconnect => true }, + resource_opts => #{ + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + start_after_created => ?START_AFTER_CREATED, + start_timeout => ?START_TIMEOUT_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + query_mode => sync, + async_inflight_window => ?DEFAULT_INFLIGHT, + enable_batch => false, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + enable_queue => false, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + }, enable => true, direction => egress }; @@ -70,7 +83,6 @@ fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, - {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {sql_template, mk( binary(),