diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 876743be5..5644d0446 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -208,8 +208,8 @@ execute(Conn, Query, Opts) -> do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> Queries = aggregate_query(Tokens, BatchReqs), - SQL = lists:foldl( - fun({InsertPart, Values}, Acc) -> + SQL = maps:fold( + fun(InsertPart, Values, Acc) -> lists:foldl( fun(ValuePart, IAcc) -> <> @@ -224,17 +224,15 @@ do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> execute(Conn, SQL, Opts). aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) -> - maps:to_list( - lists:foldl( - fun({_, Data}, Acc) -> - InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data), - ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data), - Values = maps:get(InsertPart, Acc, []), - maps:put(InsertPart, [ParamsPart | Values], Acc) - end, - #{}, - BatchReqs - ) + lists:foldl( + fun({_, Data}, Acc) -> + InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data), + ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data), + Values = maps:get(InsertPart, Acc, []), + maps:put(InsertPart, [ParamsPart | Values], Acc) + end, + #{}, + BatchReqs ). connect(Opts) -> @@ -268,8 +266,8 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> InsertTksMap#{Key => InsertTks}, BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}} ); - _ -> - ?SLOG(error, #{msg => "split sql failed", sql => H}), + Result -> + ?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}), parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) end; {error, Reason} -> @@ -287,11 +285,14 @@ to_bin(List) when is_list(List) -> split_insert_sql(SQL0) -> SQL = emqx_plugin_libs_rule:formalize_sql(SQL0), - lists:foldr( - fun - (<<>>, Acc) -> Acc; - (E, Acc) -> [string:trim(E) | Acc] + lists:filtermap( + fun(E) -> + case string:trim(E) of + <<>> -> + false; + E1 -> + {true, E1} + end end, - [], re:split(SQL, "(?i)(insert into)|(?i)(values)") ). diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 00a887852..3d06aee52 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -594,7 +594,7 @@ t_batch_insert(Config) -> Size = 5, Ts = erlang:system_time(millisecond), - {_, {ok, #{result := Result}}} = + {_, {ok, #{result := _Result}}} = ?wait_async_action( lists:foreach( fun(Idx) -> @@ -609,11 +609,13 @@ t_batch_insert(Config) -> 2_000 ), - timer:sleep(200), - - ?assertMatch( - [[Size]], - connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg") + ?retry( + _Sleep = 50, + _Attempts = 30, + ?assertMatch( + [[Size]], + connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg") + ) ). t_auto_create_simple_insert(Config0) -> @@ -660,7 +662,7 @@ t_auto_create_batch_insert(Config0) -> Size2 = 3, Ts = erlang:system_time(millisecond), - {_, {ok, #{result := Result}}} = + {_, {ok, #{result := _Result}}} = ?wait_async_action( lists:foreach( fun({Offset, ClientId, Size}) -> @@ -683,16 +685,23 @@ t_auto_create_batch_insert(Config0) -> 2_000 ), - timer:sleep(200), + ?retry( + _Sleep = 50, + _Attempts = 30, - ?assertMatch( - [[Size1]], - connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1) + ?assertMatch( + [[Size1]], + connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1) + ) ), - ?assertMatch( - [[Size2]], - connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2) + ?retry( + 50, + 30, + ?assertMatch( + [[Size2]], + connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2) + ) ), ?assertMatch( diff --git a/changes/ee/fix-10738.en.md b/changes/ee/fix-10738.en.md index e2fa14bfc..203fb5823 100644 --- a/changes/ee/fix-10738.en.md +++ b/changes/ee/fix-10738.en.md @@ -1,3 +1,3 @@ -Add supports for the `Supertable` and `Create Tables Automatically` features of TDEngine to its data bridge. +Add support for the `Supertable` and `Create Tables Automatically` features of TDEngine to its data bridge. Before this fix, an insert with a supertable in the template will fail, like this: `insert into ${clientid} using msg TAGS (${clientid}) values (${ts},${msg})`.