diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 826a03baa..47166f4e1 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -124,7 +124,7 @@ on_query(InstanceId, {query, SQL}, State) -> on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> case maps:find(Key, InsertTksMap) of {ok, Tokens} when is_map(Data) -> - SQL = emqx_placeholder:proc_sql_param_str(Tokens, Data), + SQL = emqx_placeholder:proc_tmpl(Tokens, Data), do_query(InstanceId, SQL, State); _ -> {error, {unrecoverable_error, invalid_request}} @@ -209,31 +209,16 @@ execute(Conn, Query, Opts) -> tdengine:insert(Conn, Query, Opts). do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> - Queries = aggregate_query(Tokens, BatchReqs), - SQL = maps:fold( - fun(InsertPart, Values, Acc) -> - lists:foldl( - fun(ValuePart, IAcc) -> - <> - end, - <>, - Values - ) - end, - <<"INSERT INTO">>, - Queries - ), + SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>), execute(Conn, SQL, Opts). -aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) -> +aggregate_query(BatchTks, BatchReqs, Acc) -> lists:foldl( - fun({_, Data}, Acc) -> - InsertPart = emqx_placeholder:proc_sql_param_str(InsertPartTks, Data), - ParamsPart = emqx_placeholder:proc_sql_param_str(ParamsPartTks, Data), - Values = maps:get(InsertPart, Acc, []), - maps:put(InsertPart, [ParamsPart | Values], Acc) + fun({_, Data}, InAcc) -> + InsertPart = emqx_placeholder:proc_tmpl(BatchTks, Data), + <> end, - #{}, + Acc, BatchReqs ). @@ -260,13 +245,12 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> InsertTks = emqx_placeholder:preproc_tmpl(H), H1 = string:trim(H, trailing, ";"), case split_insert_sql(H1) of - [_InsertStr, InsertPart, _ValuesStr, ParamsPart] -> - InsertPartTks = emqx_placeholder:preproc_tmpl(InsertPart), - ParamsPartTks = emqx_placeholder:preproc_tmpl(ParamsPart), + [_InsertPart, BatchDesc] -> + BatchTks = emqx_placeholder:preproc_tmpl(BatchDesc), parse_batch_prepare_sql( T, InsertTksMap#{Key => InsertTks}, - BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}} + BatchTksMap#{Key => BatchTks} ); Result -> ?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}), @@ -299,7 +283,7 @@ split_insert_sql(SQL0) -> {true, E1} end end, - re:split(SQL, "(?i)(insert into)|(?i)(values)") + re:split(SQL, "(?i)(insert into)") ). formalize_sql(Input) ->