diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 1b8db1aaa..00a887852 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -24,9 +24,21 @@ ");" ). -define(SQL_DROP_TABLE, "DROP TABLE t_mqtt_msg"). --define(SQL_DELETE, "DELETE from t_mqtt_msg"). +-define(SQL_DROP_STABLE, "DROP STABLE s_tab"). +-define(SQL_DELETE, "DELETE FROM t_mqtt_msg"). -define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg"). +-define(AUTO_CREATE_BRIDGE, + "insert into ${clientid} USING s_tab TAGS (${clientid}) values (${timestamp}, ${payload})" +). + +-define(SQL_CREATE_STABLE, + "CREATE STABLE s_tab (\n" + " ts timestamp,\n" + " payload BINARY(1024)\n" + ") TAGS (clientid BINARY(128));" +). + % DB defaults -define(TD_DATABASE, "mqtt"). -define(TD_USERNAME, "root"). @@ -53,12 +65,13 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), NonBatchCases = [t_write_timeout], + MustBatchCases = [t_batch_insert, t_auto_create_batch_insert], BatchingGroups = [{group, with_batch}, {group, without_batch}], [ {async, BatchingGroups}, {sync, BatchingGroups}, {with_batch, TCs -- NonBatchCases}, - {without_batch, TCs} + {without_batch, TCs -- MustBatchCases} ]. init_per_group(async, Config) -> @@ -117,7 +130,8 @@ common_init(ConfigT) -> Config0 = [ {td_host, Host}, {td_port, Port}, - {proxy_name, "tdengine_restful"} + {proxy_name, "tdengine_restful"}, + {template, ?SQL_BRIDGE} | ConfigT ], @@ -165,6 +179,7 @@ tdengine_config(BridgeType, Config) -> false -> 1 end, QueryMode = ?config(query_mode, Config), + Template = ?config(template, Config), ConfigString = io_lib:format( "bridges.~s.~s {\n" @@ -187,7 +202,7 @@ tdengine_config(BridgeType, Config) -> ?TD_DATABASE, ?TD_USERNAME, ?TD_PASSWORD, - ?SQL_BRIDGE, + Template, BatchSize, QueryMode ] @@ -272,11 +287,15 @@ connect_direct_tdengine(Config) -> connect_and_create_table(Config) -> ?WITH_CON(begin {ok, _} = directly_query(Con, ?SQL_CREATE_DATABASE, []), - {ok, _} = directly_query(Con, ?SQL_CREATE_TABLE) + {ok, _} = directly_query(Con, ?SQL_CREATE_TABLE), + {ok, _} = directly_query(Con, ?SQL_CREATE_STABLE) end). connect_and_drop_table(Config) -> - ?WITH_CON({ok, _} = directly_query(Con, ?SQL_DROP_TABLE)). + ?WITH_CON(begin + {ok, _} = directly_query(Con, ?SQL_DROP_TABLE), + {ok, _} = directly_query(Con, ?SQL_DROP_STABLE) + end). connect_and_clear_table(Config) -> ?WITH_CON({ok, _} = directly_query(Con, ?SQL_DELETE)). @@ -287,6 +306,15 @@ connect_and_get_payload(Config) -> ), Result. +connect_and_exec(Config, SQL) -> + ?WITH_CON({ok, _} = directly_query(Con, SQL)). + +connect_and_query(Config, SQL) -> + ?WITH_CON( + {ok, #{<<"code">> := 0, <<"data">> := Data}} = directly_query(Con, SQL) + ), + Data. + directly_query(Con, Query) -> directly_query(Con, Query, [{db_name, ?TD_DATABASE}]). @@ -407,7 +435,7 @@ t_write_failure(Config) -> #{?snk_kind := buffer_worker_flush_ack}, 2_000 ), - ?assertMatch({error, econnrefused}, Result), + ?assertMatch({error, Reason} when Reason =:= econnrefused; Reason =:= closed, Result), ok end), ok. @@ -490,26 +518,19 @@ t_missing_data(Config) -> ok. t_bad_sql_parameter(Config) -> - EnableBatch = ?config(enable_batch, Config), ?assertMatch( {ok, _}, create_bridge(Config) ), - Request = {sql, <<"">>, [bad_parameter]}, + Request = {send_message, <<"">>}, {_, {ok, #{result := Result}}} = ?wait_async_action( query_resource(Config, Request), #{?snk_kind := buffer_worker_flush_ack}, 2_000 ), - case EnableBatch of - true -> - ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); - false -> - ?assertMatch( - {error, {unrecoverable_error, _}}, Result - ) - end, + + ?assertMatch({error, #{<<"code">> := _}}, Result), ok. t_nasty_sql_string(Config) -> @@ -544,7 +565,156 @@ t_nasty_sql_string(Config) -> connect_and_get_payload(Config) ). +t_simple_insert(Config) -> + connect_and_clear_table(Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, + Request = {send_message, SentData}, + {_, {ok, #{result := _Result}}} = + ?wait_async_action( + query_resource(Config, Request), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + ?assertMatch( + ?PAYLOAD, + connect_and_get_payload(Config) + ). + +t_batch_insert(Config) -> + connect_and_clear_table(Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + Size = 5, + Ts = erlang:system_time(millisecond), + {_, {ok, #{result := Result}}} = + ?wait_async_action( + lists:foreach( + fun(Idx) -> + SentData = #{payload => ?PAYLOAD, timestamp => Ts + Idx}, + Request = {send_message, SentData}, + query_resource(Config, Request) + end, + lists:seq(1, Size) + ), + + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + + timer:sleep(200), + + ?assertMatch( + [[Size]], + connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg") + ). + +t_auto_create_simple_insert(Config0) -> + ClientId = to_str(?FUNCTION_NAME), + Config = get_auto_create_config(Config0), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + SentData = #{ + payload => ?PAYLOAD, + timestamp => 1668602148000, + clientid => ClientId + }, + Request = {send_message, SentData}, + {_, {ok, #{result := _Result}}} = + ?wait_async_action( + query_resource(Config, Request), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + ?assertMatch( + [[?PAYLOAD]], + connect_and_query(Config, "SELECT payload FROM " ++ ClientId) + ), + + ?assertMatch( + [[0]], + connect_and_query(Config, "DROP TABLE " ++ ClientId) + ). + +t_auto_create_batch_insert(Config0) -> + ClientId1 = "client1", + ClientId2 = "client2", + Config = get_auto_create_config(Config0), + + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + Size1 = 2, + Size2 = 3, + + Ts = erlang:system_time(millisecond), + {_, {ok, #{result := Result}}} = + ?wait_async_action( + lists:foreach( + fun({Offset, ClientId, Size}) -> + lists:foreach( + fun(Idx) -> + SentData = #{ + payload => ?PAYLOAD, + timestamp => Ts + Idx + Offset, + clientid => ClientId + }, + Request = {send_message, SentData}, + query_resource(Config, Request) + end, + lists:seq(1, Size) + ) + end, + [{0, ClientId1, Size1}, {100, ClientId2, Size2}] + ), + #{?snk_kind := buffer_worker_flush_ack}, + 2_000 + ), + + timer:sleep(200), + + ?assertMatch( + [[Size1]], + connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1) + ), + + ?assertMatch( + [[Size2]], + connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2) + ), + + ?assertMatch( + [[0]], + connect_and_query(Config, "DROP TABLE " ++ ClientId1) + ), + + ?assertMatch( + [[0]], + connect_and_query(Config, "DROP TABLE " ++ ClientId2) + ). + to_bin(List) when is_list(List) -> unicode:characters_to_binary(List, utf8); to_bin(Bin) when is_binary(Bin) -> Bin. + +to_str(Atom) when is_atom(Atom) -> + erlang:atom_to_list(Atom). + +get_auto_create_config(Config0) -> + Config = lists:keyreplace(template, 1, Config0, {template, ?AUTO_CREATE_BRIDGE}), + BridgeType = proplists:get_value(bridge_type, Config, <<"tdengine">>), + {_Name, TDConf} = tdengine_config(BridgeType, Config), + lists:keyreplace(tdengine_config, 1, Config, {tdengine_config, TDConf}).