From 6ff77b221b5c5abdb5ea6c67511ccd1faf160eb9 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 18 May 2023 14:30:51 +0800 Subject: [PATCH 1/4] fix(tdengine): add supports for the `automatically create` feature in the SQL template --- .../src/emqx_bridge_tdengine_connector.erl | 138 ++++++++++++------ .../src/emqx_plugin_libs_rule.erl | 12 +- 2 files changed, 103 insertions(+), 47 deletions(-) diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 46a70e8b6..876743be5 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -25,7 +25,7 @@ on_get_status/2 ]). --export([connect/1, do_get_status/1, execute/3]). +-export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -124,32 +124,36 @@ on_stop(InstanceId, #{pool_name := PoolName}) -> on_query(InstanceId, {query, SQL}, State) -> do_query(InstanceId, SQL, State); -on_query(InstanceId, Request, State) -> - %% because the `emqx-tdengine` client only supports a single SQL cmd - %% so the `on_query` and `on_batch_query` have the same process, that is: - %% we need to collect all data into one SQL cmd and then call the insert API - on_batch_query(InstanceId, [Request], State). - -on_batch_query( - InstanceId, - BatchReq, - #{batch_inserts := Inserts, batch_params_tokens := ParamsTokens} = State -) -> - case hd(BatchReq) of - {Key, _} -> - case maps:get(Key, Inserts, undefined) of - undefined -> - {error, {unrecoverable_error, batch_prepare_not_implemented}}; - InsertSQL -> - Tokens = maps:get(Key, ParamsTokens), - do_batch_insert(InstanceId, BatchReq, InsertSQL, Tokens, State) - end; - Request -> - LogMeta = #{connector => InstanceId, first_request => Request, state => State}, - ?SLOG(error, LogMeta#{msg => "invalid request"}), +on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> + case maps:find(Key, InsertTksMap) of + {ok, Tokens} -> + SQL = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Data), + do_query(InstanceId, SQL, State); + _ -> {error, {unrecoverable_error, invalid_request}} end. +%% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process +on_batch_query( + InstanceId, + [{Key, _} | _] = BatchReq, + #{batch_tokens := BatchTksMap, query_opts := Opts} = State +) -> + case maps:find(Key, BatchTksMap) of + {ok, Tokens} -> + do_query_job( + InstanceId, + {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]}, + State + ); + _ -> + {error, {unrecoverable_error, batch_prepare_not_implemented}} + end; +on_batch_query(InstanceId, BatchReq, State) -> + LogMeta = #{connector => InstanceId, request => BatchReq, state => State}, + ?SLOG(error, LogMeta#{msg => "invalid request"}), + {error, {unrecoverable_error, invalid_request}}. + on_get_status(_InstanceId, #{pool_name := PoolName}) -> Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), status_result(Health). @@ -167,17 +171,16 @@ status_result(_Status = false) -> connecting. %% Helper fns %%======================================================================================== -do_batch_insert(InstanceId, BatchReqs, InsertPart, Tokens, State) -> - SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, InsertPart, Tokens), - do_query(InstanceId, SQL, State). +do_query(InstanceId, Query, #{query_opts := Opts} = State) -> + do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State). -do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State) -> +do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> ?TRACE( "QUERY", "tdengine_connector_received", - #{connector => InstanceId, query => Query, state => State} + #{connector => InstanceId, job => Job, state => State} ), - Result = ecpool:pick_and_do(PoolName, {?MODULE, execute, [Query, Opts]}, no_handover), + Result = ecpool:pick_and_do(PoolName, Job, no_handover), case Result of {error, Reason} -> @@ -188,7 +191,7 @@ do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State ?SLOG(error, #{ msg => "tdengine_connector_do_query_failed", connector => InstanceId, - query => Query, + job => Job, reason => Reason }), Result; @@ -203,6 +206,37 @@ do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State execute(Conn, Query, Opts) -> tdengine:insert(Conn, Query, Opts). +do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> + Queries = aggregate_query(Tokens, BatchReqs), + SQL = lists:foldl( + fun({InsertPart, Values}, Acc) -> + lists:foldl( + fun(ValuePart, IAcc) -> + <> + end, + <>, + Values + ) + end, + <<"INSERT INTO">>, + Queries + ), + execute(Conn, SQL, Opts). + +aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) -> + maps:to_list( + lists:foldl( + fun({_, Data}, Acc) -> + InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data), + ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data), + Values = maps:get(InsertPart, Acc, []), + maps:put(InsertPart, [ParamsPart | Values], Acc) + end, + #{}, + BatchReqs + ) + ). + connect(Opts) -> tdengine:start_link(Opts). @@ -218,32 +252,46 @@ parse_prepare_sql(Config) -> parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}). -parse_batch_prepare_sql([{Key, H} | T], BatchInserts, BatchTks) -> +parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> case emqx_plugin_libs_rule:detect_sql_type(H) of {ok, select} -> - parse_batch_prepare_sql(T, BatchInserts, BatchTks); + parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); {ok, insert} -> - case emqx_plugin_libs_rule:split_insert_sql(H) of - {ok, {InsertSQL, Params}} -> - ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params), + InsertTks = emqx_plugin_libs_rule:preproc_tmpl(H), + H1 = string:trim(H, trailing, ";"), + case split_insert_sql(H1) of + [_InsertStr, InsertPart, _ValuesStr, ParamsPart] -> + InsertPartTks = emqx_plugin_libs_rule:preproc_tmpl(InsertPart), + ParamsPartTks = emqx_plugin_libs_rule:preproc_tmpl(ParamsPart), parse_batch_prepare_sql( T, - BatchInserts#{Key => InsertSQL}, - BatchTks#{Key => ParamsTks} + InsertTksMap#{Key => InsertTks}, + BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}} ); - {error, Reason} -> - ?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}), - parse_batch_prepare_sql(T, BatchInserts, BatchTks) + _ -> + ?SLOG(error, #{msg => "split sql failed", sql => H}), + parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) end; {error, Reason} -> ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), - parse_batch_prepare_sql(T, BatchInserts, BatchTks) + parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) end; -parse_batch_prepare_sql([], BatchInserts, BatchTks) -> +parse_batch_prepare_sql([], InsertTksMap, BatchTksMap) -> #{ - batch_inserts => BatchInserts, - batch_params_tokens => BatchTks + insert_tokens => InsertTksMap, + batch_tokens => BatchTksMap }. to_bin(List) when is_list(List) -> unicode:characters_to_binary(List, utf8). + +split_insert_sql(SQL0) -> + SQL = emqx_plugin_libs_rule:formalize_sql(SQL0), + lists:foldr( + fun + (<<>>, Acc) -> Acc; + (E, Acc) -> [string:trim(E) | Acc] + end, + [], + re:split(SQL, "(?i)(insert into)|(?i)(values)") + ). diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index 9a4c01a2b..3bfac1ec4 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -32,7 +32,8 @@ proc_cql_param_str/2, split_insert_sql/1, detect_sql_type/1, - proc_batch_sql/3 + proc_batch_sql/3, + formalize_sql/1 ]). %% type converting @@ -126,7 +127,8 @@ proc_cql_param_str(Tokens, Data) -> -spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when InsertSQL :: binary(), Params :: binary(). -split_insert_sql(SQL) -> +split_insert_sql(SQL0) -> + SQL = formalize_sql(SQL0), case re:split(SQL, "((?i)values)", [{return, binary}]) of [Part1, _, Part3] -> case string:trim(Part1, leading) of @@ -173,6 +175,12 @@ proc_batch_sql(BatchReqs, InsertPart, Tokens) -> ), <>. +formalize_sql(Input) -> + %% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char. + SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]), + %% 2. trims the result + string:trim(SQL). + unsafe_atom_key(Key) when is_atom(Key) -> Key; unsafe_atom_key(Key) when is_binary(Key) -> From 142125b9e43b6c8ebaecae201bcf1e773936a76c Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 18 May 2023 17:01:27 +0800 Subject: [PATCH 2/4] test(tdengine): add test cases to cover the super table feature --- .../test/emqx_bridge_tdengine_SUITE.erl | 204 ++++++++++++++++-- 1 file changed, 187 insertions(+), 17 deletions(-) diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 1b8db1aaa..00a887852 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -24,9 +24,21 @@ ");" ). -define(SQL_DROP_TABLE, "DROP TABLE t_mqtt_msg"). --define(SQL_DELETE, "DELETE from t_mqtt_msg"). +-define(SQL_DROP_STABLE, "DROP STABLE s_tab"). +-define(SQL_DELETE, "DELETE FROM t_mqtt_msg"). -define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg"). +-define(AUTO_CREATE_BRIDGE, + "insert into ${clientid} USING s_tab TAGS (${clientid}) values (${timestamp}, ${payload})" +). + +-define(SQL_CREATE_STABLE, + "CREATE STABLE s_tab (\n" + " ts timestamp,\n" + " payload BINARY(1024)\n" + ") TAGS (clientid BINARY(128));" +). + % DB defaults -define(TD_DATABASE, "mqtt"). -define(TD_USERNAME, "root"). @@ -53,12 +65,13 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), NonBatchCases = [t_write_timeout], + MustBatchCases = [t_batch_insert, t_auto_create_batch_insert], BatchingGroups = [{group, with_batch}, {group, without_batch}], [ {async, BatchingGroups}, {sync, BatchingGroups}, {with_batch, TCs -- NonBatchCases}, - {without_batch, TCs} + {without_batch, TCs -- MustBatchCases} ]. init_per_group(async, Config) -> @@ -117,7 +130,8 @@ common_init(ConfigT) -> Config0 = [ {td_host, Host}, {td_port, Port}, - {proxy_name, "tdengine_restful"} + {proxy_name, "tdengine_restful"}, + {template, ?SQL_BRIDGE} | ConfigT ], @@ -165,6 +179,7 @@ tdengine_config(BridgeType, Config) -> false -> 1 end, QueryMode = ?config(query_mode, Config), + Template = ?config(template, Config), ConfigString = io_lib:format( "bridges.~s.~s {\n" @@ -187,7 +202,7 @@ tdengine_config(BridgeType, Config) -> ?TD_DATABASE, ?TD_USERNAME, ?TD_PASSWORD, - ?SQL_BRIDGE, + Template, BatchSize, QueryMode ] @@ -272,11 +287,15 @@ connect_direct_tdengine(Config) -> connect_and_create_table(Config) -> ?WITH_CON(begin {ok, _} = directly_query(Con, ?SQL_CREATE_DATABASE, []), - {ok, _} = directly_query(Con, ?SQL_CREATE_TABLE) + {ok, _} = directly_query(Con, ?SQL_CREATE_TABLE), + {ok, _} = directly_query(Con, ?SQL_CREATE_STABLE) end). connect_and_drop_table(Config) -> - ?WITH_CON({ok, _} = directly_query(Con, ?SQL_DROP_TABLE)). + ?WITH_CON(begin + {ok, _} = directly_query(Con, ?SQL_DROP_TABLE), + {ok, _} = directly_query(Con, ?SQL_DROP_STABLE) + end). connect_and_clear_table(Config) -> ?WITH_CON({ok, _} = directly_query(Con, ?SQL_DELETE)). @@ -287,6 +306,15 @@ connect_and_get_payload(Config) -> ), Result. +connect_and_exec(Config, SQL) -> + ?WITH_CON({ok, _} = directly_query(Con, SQL)). + +connect_and_query(Config, SQL) -> + ?WITH_CON( + {ok, #{<<"code">> := 0, <<"data">> := Data}} = directly_query(Con, SQL) + ), + Data. + directly_query(Con, Query) -> directly_query(Con, Query, [{db_name, ?TD_DATABASE}]). @@ -407,7 +435,7 @@ t_write_failure(Config) -> #{?snk_kind := buffer_worker_flush_ack}, 2_000 ), - ?assertMatch({error, econnrefused}, Result), + ?assertMatch({error, Reason} when Reason =:= econnrefused; Reason =:= closed, Result), ok end), ok. @@ -490,26 +518,19 @@ t_missing_data(Config) -> ok. t_bad_sql_parameter(Config) -> - EnableBatch = ?config(enable_batch, Config), ?assertMatch( {ok, _}, create_bridge(Config) ), - Request = {sql, <<"">>, [bad_parameter]}, + Request = {send_message, <<"">>}, {_, {ok, #{result := Result}}} = ?wait_async_action( query_resource(Config, Request), #{?snk_kind := buffer_worker_flush_ack}, 2_000 ), - case EnableBatch of - true -> - ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); - false -> - ?assertMatch( - {error, {unrecoverable_error, _}}, Result - ) - end, + + ?assertMatch({error, #{<<"code">> := _}}, Result), ok. t_nasty_sql_string(Config) -> @@ -544,7 +565,156 @@ t_nasty_sql_string(Config) -> connect_and_get_payload(Config) ). +t_simple_insert(Config) -> + connect_and_clear_table(Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, + Request = {send_message, SentData}, + {_, {ok, #{result := _Result}}} = + ?wait_async_action( + query_resource(Config, Request), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + ?assertMatch( + ?PAYLOAD, + connect_and_get_payload(Config) + ). + +t_batch_insert(Config) -> + connect_and_clear_table(Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + Size = 5, + Ts = erlang:system_time(millisecond), + {_, {ok, #{result := Result}}} = + ?wait_async_action( + lists:foreach( + fun(Idx) -> + SentData = #{payload => ?PAYLOAD, timestamp => Ts + Idx}, + Request = {send_message, SentData}, + query_resource(Config, Request) + end, + lists:seq(1, Size) + ), + + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + + timer:sleep(200), + + ?assertMatch( + [[Size]], + connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg") + ). + +t_auto_create_simple_insert(Config0) -> + ClientId = to_str(?FUNCTION_NAME), + Config = get_auto_create_config(Config0), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + SentData = #{ + payload => ?PAYLOAD, + timestamp => 1668602148000, + clientid => ClientId + }, + Request = {send_message, SentData}, + {_, {ok, #{result := _Result}}} = + ?wait_async_action( + query_resource(Config, Request), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + ?assertMatch( + [[?PAYLOAD]], + connect_and_query(Config, "SELECT payload FROM " ++ ClientId) + ), + + ?assertMatch( + [[0]], + connect_and_query(Config, "DROP TABLE " ++ ClientId) + ). + +t_auto_create_batch_insert(Config0) -> + ClientId1 = "client1", + ClientId2 = "client2", + Config = get_auto_create_config(Config0), + + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + Size1 = 2, + Size2 = 3, + + Ts = erlang:system_time(millisecond), + {_, {ok, #{result := Result}}} = + ?wait_async_action( + lists:foreach( + fun({Offset, ClientId, Size}) -> + lists:foreach( + fun(Idx) -> + SentData = #{ + payload => ?PAYLOAD, + timestamp => Ts + Idx + Offset, + clientid => ClientId + }, + Request = {send_message, SentData}, + query_resource(Config, Request) + end, + lists:seq(1, Size) + ) + end, + [{0, ClientId1, Size1}, {100, ClientId2, Size2}] + ), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + + timer:sleep(200), + + ?assertMatch( + [[Size1]], + connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1) + ), + + ?assertMatch( + [[Size2]], + connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2) + ), + + ?assertMatch( + [[0]], + connect_and_query(Config, "DROP TABLE " ++ ClientId1) + ), + + ?assertMatch( + [[0]], + connect_and_query(Config, "DROP TABLE " ++ ClientId2) + ). + to_bin(List) when is_list(List) -> unicode:characters_to_binary(List, utf8); to_bin(Bin) when is_binary(Bin) -> Bin. + +to_str(Atom) when is_atom(Atom) -> + erlang:atom_to_list(Atom). + +get_auto_create_config(Config0) -> + Config = lists:keyreplace(template, 1, Config0, {template, ?AUTO_CREATE_BRIDGE}), + BridgeType = proplists:get_value(bridge_type, Config, <<"tdengine">>), + {_Name, TDConf} = tdengine_config(BridgeType, Config), + lists:keyreplace(tdengine_config, 1, Config, {tdengine_config, TDConf}). From f1a3e5965e0862b4b10f003fdcd98a697c70325c Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 18 May 2023 17:34:28 +0800 Subject: [PATCH 3/4] chore: update apps version && changes --- apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src | 2 +- apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src | 2 +- changes/ee/fix-10738.en.md | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 changes/ee/fix-10738.en.md diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src index 141973e1e..321a2b724 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_tdengine, [ {description, "EMQX Enterprise TDEngine Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [kernel, stdlib, tdengine]}, {env, []}, diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src index bfd7e68fa..82a95c377 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_plugin_libs, [ {description, "EMQX Plugin utility libs"}, - {vsn, "4.3.10"}, + {vsn, "4.3.11"}, {modules, []}, {applications, [kernel, stdlib]}, {env, []} diff --git a/changes/ee/fix-10738.en.md b/changes/ee/fix-10738.en.md new file mode 100644 index 000000000..e2fa14bfc --- /dev/null +++ b/changes/ee/fix-10738.en.md @@ -0,0 +1,3 @@ +Add supports for the `Supertable` and `Create Tables Automatically` features of TDEngine to its data bridge. +Before this fix, an insert with a supertable in the template will fail, like this: + `insert into ${clientid} using msg TAGS (${clientid}) values (${ts},${msg})`. From 5a08a7b9de1393a115ed4bb89d4703af6e5433fd Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 19 May 2023 11:08:58 +0800 Subject: [PATCH 4/4] fix(tdengine): minor improvement of code and changes --- .../src/emqx_bridge_tdengine_connector.erl | 41 ++++++++++--------- .../test/emqx_bridge_tdengine_SUITE.erl | 37 ++++++++++------- changes/ee/fix-10738.en.md | 2 +- 3 files changed, 45 insertions(+), 35 deletions(-) diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 876743be5..5644d0446 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -208,8 +208,8 @@ execute(Conn, Query, Opts) -> do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> Queries = aggregate_query(Tokens, BatchReqs), - SQL = lists:foldl( - fun({InsertPart, Values}, Acc) -> + SQL = maps:fold( + fun(InsertPart, Values, Acc) -> lists:foldl( fun(ValuePart, IAcc) -> <> @@ -224,17 +224,15 @@ do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> execute(Conn, SQL, Opts). aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) -> - maps:to_list( - lists:foldl( - fun({_, Data}, Acc) -> - InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data), - ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data), - Values = maps:get(InsertPart, Acc, []), - maps:put(InsertPart, [ParamsPart | Values], Acc) - end, - #{}, - BatchReqs - ) + lists:foldl( + fun({_, Data}, Acc) -> + InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data), + ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data), + Values = maps:get(InsertPart, Acc, []), + maps:put(InsertPart, [ParamsPart | Values], Acc) + end, + #{}, + BatchReqs ). connect(Opts) -> @@ -268,8 +266,8 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> InsertTksMap#{Key => InsertTks}, BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}} ); - _ -> - ?SLOG(error, #{msg => "split sql failed", sql => H}), + Result -> + ?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}), parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) end; {error, Reason} -> @@ -287,11 +285,14 @@ to_bin(List) when is_list(List) -> split_insert_sql(SQL0) -> SQL = emqx_plugin_libs_rule:formalize_sql(SQL0), - lists:foldr( - fun - (<<>>, Acc) -> Acc; - (E, Acc) -> [string:trim(E) | Acc] + lists:filtermap( + fun(E) -> + case string:trim(E) of + <<>> -> + false; + E1 -> + {true, E1} + end end, - [], re:split(SQL, "(?i)(insert into)|(?i)(values)") ). diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 00a887852..3d06aee52 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -594,7 +594,7 @@ t_batch_insert(Config) -> Size = 5, Ts = erlang:system_time(millisecond), - {_, {ok, #{result := Result}}} = + {_, {ok, #{result := _Result}}} = ?wait_async_action( lists:foreach( fun(Idx) -> @@ -609,11 +609,13 @@ t_batch_insert(Config) -> 2_000 ), - timer:sleep(200), - - ?assertMatch( - [[Size]], - connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg") + ?retry( + _Sleep = 50, + _Attempts = 30, + ?assertMatch( + [[Size]], + connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg") + ) ). t_auto_create_simple_insert(Config0) -> @@ -660,7 +662,7 @@ t_auto_create_batch_insert(Config0) -> Size2 = 3, Ts = erlang:system_time(millisecond), - {_, {ok, #{result := Result}}} = + {_, {ok, #{result := _Result}}} = ?wait_async_action( lists:foreach( fun({Offset, ClientId, Size}) -> @@ -683,16 +685,23 @@ t_auto_create_batch_insert(Config0) -> 2_000 ), - timer:sleep(200), + ?retry( + _Sleep = 50, + _Attempts = 30, - ?assertMatch( - [[Size1]], - connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1) + ?assertMatch( + [[Size1]], + connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1) + ) ), - ?assertMatch( - [[Size2]], - connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2) + ?retry( + 50, + 30, + ?assertMatch( + [[Size2]], + connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2) + ) ), ?assertMatch( diff --git a/changes/ee/fix-10738.en.md b/changes/ee/fix-10738.en.md index e2fa14bfc..203fb5823 100644 --- a/changes/ee/fix-10738.en.md +++ b/changes/ee/fix-10738.en.md @@ -1,3 +1,3 @@ -Add supports for the `Supertable` and `Create Tables Automatically` features of TDEngine to its data bridge. +Add support for the `Supertable` and `Create Tables Automatically` features of TDEngine to its data bridge. Before this fix, an insert with a supertable in the template will fail, like this: `insert into ${clientid} using msg TAGS (${clientid}) values (${ts},${msg})`.