fix(tdengine): minor improvement of code and changes
This commit is contained in:
parent
f1a3e5965e
commit
5a08a7b9de
|
@ -208,8 +208,8 @@ execute(Conn, Query, Opts) ->
|
||||||
|
|
||||||
do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
|
do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
|
||||||
Queries = aggregate_query(Tokens, BatchReqs),
|
Queries = aggregate_query(Tokens, BatchReqs),
|
||||||
SQL = lists:foldl(
|
SQL = maps:fold(
|
||||||
fun({InsertPart, Values}, Acc) ->
|
fun(InsertPart, Values, Acc) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(ValuePart, IAcc) ->
|
fun(ValuePart, IAcc) ->
|
||||||
<<IAcc/binary, " ", ValuePart/binary>>
|
<<IAcc/binary, " ", ValuePart/binary>>
|
||||||
|
@ -224,7 +224,6 @@ do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
|
||||||
execute(Conn, SQL, Opts).
|
execute(Conn, SQL, Opts).
|
||||||
|
|
||||||
aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) ->
|
aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) ->
|
||||||
maps:to_list(
|
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun({_, Data}, Acc) ->
|
fun({_, Data}, Acc) ->
|
||||||
InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data),
|
InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data),
|
||||||
|
@ -234,7 +233,6 @@ aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) ->
|
||||||
end,
|
end,
|
||||||
#{},
|
#{},
|
||||||
BatchReqs
|
BatchReqs
|
||||||
)
|
|
||||||
).
|
).
|
||||||
|
|
||||||
connect(Opts) ->
|
connect(Opts) ->
|
||||||
|
@ -268,8 +266,8 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
|
||||||
InsertTksMap#{Key => InsertTks},
|
InsertTksMap#{Key => InsertTks},
|
||||||
BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}}
|
BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}}
|
||||||
);
|
);
|
||||||
_ ->
|
Result ->
|
||||||
?SLOG(error, #{msg => "split sql failed", sql => H}),
|
?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}),
|
||||||
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
|
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -287,11 +285,14 @@ to_bin(List) when is_list(List) ->
|
||||||
|
|
||||||
split_insert_sql(SQL0) ->
|
split_insert_sql(SQL0) ->
|
||||||
SQL = emqx_plugin_libs_rule:formalize_sql(SQL0),
|
SQL = emqx_plugin_libs_rule:formalize_sql(SQL0),
|
||||||
lists:foldr(
|
lists:filtermap(
|
||||||
fun
|
fun(E) ->
|
||||||
(<<>>, Acc) -> Acc;
|
case string:trim(E) of
|
||||||
(E, Acc) -> [string:trim(E) | Acc]
|
<<>> ->
|
||||||
|
false;
|
||||||
|
E1 ->
|
||||||
|
{true, E1}
|
||||||
|
end
|
||||||
end,
|
end,
|
||||||
[],
|
|
||||||
re:split(SQL, "(?i)(insert into)|(?i)(values)")
|
re:split(SQL, "(?i)(insert into)|(?i)(values)")
|
||||||
).
|
).
|
||||||
|
|
|
@ -594,7 +594,7 @@ t_batch_insert(Config) ->
|
||||||
|
|
||||||
Size = 5,
|
Size = 5,
|
||||||
Ts = erlang:system_time(millisecond),
|
Ts = erlang:system_time(millisecond),
|
||||||
{_, {ok, #{result := Result}}} =
|
{_, {ok, #{result := _Result}}} =
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Idx) ->
|
fun(Idx) ->
|
||||||
|
@ -609,11 +609,13 @@ t_batch_insert(Config) ->
|
||||||
2_000
|
2_000
|
||||||
),
|
),
|
||||||
|
|
||||||
timer:sleep(200),
|
?retry(
|
||||||
|
_Sleep = 50,
|
||||||
|
_Attempts = 30,
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[[Size]],
|
[[Size]],
|
||||||
connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg")
|
connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg")
|
||||||
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
t_auto_create_simple_insert(Config0) ->
|
t_auto_create_simple_insert(Config0) ->
|
||||||
|
@ -660,7 +662,7 @@ t_auto_create_batch_insert(Config0) ->
|
||||||
Size2 = 3,
|
Size2 = 3,
|
||||||
|
|
||||||
Ts = erlang:system_time(millisecond),
|
Ts = erlang:system_time(millisecond),
|
||||||
{_, {ok, #{result := Result}}} =
|
{_, {ok, #{result := _Result}}} =
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun({Offset, ClientId, Size}) ->
|
fun({Offset, ClientId, Size}) ->
|
||||||
|
@ -683,16 +685,23 @@ t_auto_create_batch_insert(Config0) ->
|
||||||
2_000
|
2_000
|
||||||
),
|
),
|
||||||
|
|
||||||
timer:sleep(200),
|
?retry(
|
||||||
|
_Sleep = 50,
|
||||||
|
_Attempts = 30,
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[[Size1]],
|
[[Size1]],
|
||||||
connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1)
|
connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1)
|
||||||
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
|
?retry(
|
||||||
|
50,
|
||||||
|
30,
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
[[Size2]],
|
[[Size2]],
|
||||||
connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2)
|
connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2)
|
||||||
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
Add supports for the `Supertable` and `Create Tables Automatically` features of TDEngine to its data bridge.
|
Add support for the `Supertable` and `Create Tables Automatically` features of TDEngine to its data bridge.
|
||||||
Before this fix, an insert with a supertable in the template will fail, like this:
|
Before this fix, an insert with a supertable in the template will fail, like this:
|
||||||
`insert into ${clientid} using msg TAGS (${clientid}) values (${ts},${msg})`.
|
`insert into ${clientid} using msg TAGS (${clientid}) values (${ts},${msg})`.
|
||||||
|
|
Loading…
Reference in New Issue