diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 46a70e8b6..876743be5 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -25,7 +25,7 @@ on_get_status/2 ]). --export([connect/1, do_get_status/1, execute/3]). +-export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -124,32 +124,36 @@ on_stop(InstanceId, #{pool_name := PoolName}) -> on_query(InstanceId, {query, SQL}, State) -> do_query(InstanceId, SQL, State); -on_query(InstanceId, Request, State) -> - %% because the `emqx-tdengine` client only supports a single SQL cmd - %% so the `on_query` and `on_batch_query` have the same process, that is: - %% we need to collect all data into one SQL cmd and then call the insert API - on_batch_query(InstanceId, [Request], State). - -on_batch_query( - InstanceId, - BatchReq, - #{batch_inserts := Inserts, batch_params_tokens := ParamsTokens} = State -) -> - case hd(BatchReq) of - {Key, _} -> - case maps:get(Key, Inserts, undefined) of - undefined -> - {error, {unrecoverable_error, batch_prepare_not_implemented}}; - InsertSQL -> - Tokens = maps:get(Key, ParamsTokens), - do_batch_insert(InstanceId, BatchReq, InsertSQL, Tokens, State) - end; - Request -> - LogMeta = #{connector => InstanceId, first_request => Request, state => State}, - ?SLOG(error, LogMeta#{msg => "invalid request"}), +on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> + case maps:find(Key, InsertTksMap) of + {ok, Tokens} -> + SQL = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Data), + do_query(InstanceId, SQL, State); + _ -> {error, {unrecoverable_error, invalid_request}} end. +%% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process +on_batch_query( + InstanceId, + [{Key, _} | _] = BatchReq, + #{batch_tokens := BatchTksMap, query_opts := Opts} = State +) -> + case maps:find(Key, BatchTksMap) of + {ok, Tokens} -> + do_query_job( + InstanceId, + {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]}, + State + ); + _ -> + {error, {unrecoverable_error, batch_prepare_not_implemented}} + end; +on_batch_query(InstanceId, BatchReq, State) -> + LogMeta = #{connector => InstanceId, request => BatchReq, state => State}, + ?SLOG(error, LogMeta#{msg => "invalid request"}), + {error, {unrecoverable_error, invalid_request}}. + on_get_status(_InstanceId, #{pool_name := PoolName}) -> Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), status_result(Health). @@ -167,17 +171,16 @@ status_result(_Status = false) -> connecting. %% Helper fns %%======================================================================================== -do_batch_insert(InstanceId, BatchReqs, InsertPart, Tokens, State) -> - SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, InsertPart, Tokens), - do_query(InstanceId, SQL, State). +do_query(InstanceId, Query, #{query_opts := Opts} = State) -> + do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State). -do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State) -> +do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> ?TRACE( "QUERY", "tdengine_connector_received", - #{connector => InstanceId, query => Query, state => State} + #{connector => InstanceId, job => Job, state => State} ), - Result = ecpool:pick_and_do(PoolName, {?MODULE, execute, [Query, Opts]}, no_handover), + Result = ecpool:pick_and_do(PoolName, Job, no_handover), case Result of {error, Reason} -> @@ -188,7 +191,7 @@ do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State ?SLOG(error, #{ msg => "tdengine_connector_do_query_failed", connector => InstanceId, - query => Query, + job => Job, reason => Reason }), Result; @@ -203,6 +206,37 @@ do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State execute(Conn, Query, Opts) -> tdengine:insert(Conn, Query, Opts). +do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> + Queries = aggregate_query(Tokens, BatchReqs), + SQL = lists:foldl( + fun({InsertPart, Values}, Acc) -> + lists:foldl( + fun(ValuePart, IAcc) -> + <> + end, + <>, + Values + ) + end, + <<"INSERT INTO">>, + Queries + ), + execute(Conn, SQL, Opts). + +aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) -> + maps:to_list( + lists:foldl( + fun({_, Data}, Acc) -> + InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data), + ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data), + Values = maps:get(InsertPart, Acc, []), + maps:put(InsertPart, [ParamsPart | Values], Acc) + end, + #{}, + BatchReqs + ) + ). + connect(Opts) -> tdengine:start_link(Opts). @@ -218,32 +252,46 @@ parse_prepare_sql(Config) -> parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}). -parse_batch_prepare_sql([{Key, H} | T], BatchInserts, BatchTks) -> +parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> case emqx_plugin_libs_rule:detect_sql_type(H) of {ok, select} -> - parse_batch_prepare_sql(T, BatchInserts, BatchTks); + parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); {ok, insert} -> - case emqx_plugin_libs_rule:split_insert_sql(H) of - {ok, {InsertSQL, Params}} -> - ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params), + InsertTks = emqx_plugin_libs_rule:preproc_tmpl(H), + H1 = string:trim(H, trailing, ";"), + case split_insert_sql(H1) of + [_InsertStr, InsertPart, _ValuesStr, ParamsPart] -> + InsertPartTks = emqx_plugin_libs_rule:preproc_tmpl(InsertPart), + ParamsPartTks = emqx_plugin_libs_rule:preproc_tmpl(ParamsPart), parse_batch_prepare_sql( T, - BatchInserts#{Key => InsertSQL}, - BatchTks#{Key => ParamsTks} + InsertTksMap#{Key => InsertTks}, + BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}} ); - {error, Reason} -> - ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}), - parse_batch_prepare_sql(T, BatchInserts, BatchTks) + _ -> + ?SLOG(error, #{msg => "split sql failed", sql => H}), + parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) end; {error, Reason} -> ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), - parse_batch_prepare_sql(T, BatchInserts, BatchTks) + parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) end; -parse_batch_prepare_sql([], BatchInserts, BatchTks) -> +parse_batch_prepare_sql([], InsertTksMap, BatchTksMap) -> #{ - batch_inserts => BatchInserts, - batch_params_tokens => BatchTks + insert_tokens => InsertTksMap, + batch_tokens => BatchTksMap }. to_bin(List) when is_list(List) -> unicode:characters_to_binary(List, utf8). + +split_insert_sql(SQL0) -> + SQL = emqx_plugin_libs_rule:formalize_sql(SQL0), + lists:foldr( + fun + (<<>>, Acc) -> Acc; + (E, Acc) -> [string:trim(E) | Acc] + end, + [], + re:split(SQL, "(?i)(insert into)|(?i)(values)") + ). diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index 9a4c01a2b..3bfac1ec4 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -32,7 +32,8 @@ proc_cql_param_str/2, split_insert_sql/1, detect_sql_type/1, - proc_batch_sql/3 + proc_batch_sql/3, + formalize_sql/1 ]). %% type converting @@ -126,7 +127,8 @@ proc_cql_param_str(Tokens, Data) -> -spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when InsertSQL :: binary(), Params :: binary(). -split_insert_sql(SQL) -> +split_insert_sql(SQL0) -> + SQL = formalize_sql(SQL0), case re:split(SQL, "((?i)values)", [{return, binary}]) of [Part1, _, Part3] -> case string:trim(Part1, leading) of @@ -173,6 +175,12 @@ proc_batch_sql(BatchReqs, InsertPart, Tokens) -> ), <>. +formalize_sql(Input) -> + %% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char. + SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]), + %% 2. trims the result + string:trim(SQL). + unsafe_atom_key(Key) when is_atom(Key) -> Key; unsafe_atom_key(Key) when is_binary(Key) ->