Merge pull request #10738 from lafirest/fix/tdengine_template

fix(tdengine): add supports for the `super table` feature in the SQL template
This commit is contained in:
lafirest 2023-05-19 20:41:50 +08:00 committed by GitHub
commit 1aa904d019
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 305 additions and 66 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_tdengine, [ {application, emqx_bridge_tdengine, [
{description, "EMQX Enterprise TDEngine Bridge"}, {description, "EMQX Enterprise TDEngine Bridge"},
{vsn, "0.1.1"}, {vsn, "0.1.2"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, tdengine]}, {applications, [kernel, stdlib, tdengine]},
{env, []}, {env, []},

View File

@ -25,7 +25,7 @@
on_get_status/2 on_get_status/2
]). ]).
-export([connect/1, do_get_status/1, execute/3]). -export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]).
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
@ -124,32 +124,36 @@ on_stop(InstanceId, #{pool_name := PoolName}) ->
on_query(InstanceId, {query, SQL}, State) -> on_query(InstanceId, {query, SQL}, State) ->
do_query(InstanceId, SQL, State); do_query(InstanceId, SQL, State);
on_query(InstanceId, Request, State) -> on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) ->
%% because the `emqx-tdengine` client only supports a single SQL cmd case maps:find(Key, InsertTksMap) of
%% so the `on_query` and `on_batch_query` have the same process, that is: {ok, Tokens} ->
%% we need to collect all data into one SQL cmd and then call the insert API SQL = emqx_plugin_libs_rule:proc_sql_param_str(Tokens, Data),
on_batch_query(InstanceId, [Request], State). do_query(InstanceId, SQL, State);
_ ->
on_batch_query(
InstanceId,
BatchReq,
#{batch_inserts := Inserts, batch_params_tokens := ParamsTokens} = State
) ->
case hd(BatchReq) of
{Key, _} ->
case maps:get(Key, Inserts, undefined) of
undefined ->
{error, {unrecoverable_error, batch_prepare_not_implemented}};
InsertSQL ->
Tokens = maps:get(Key, ParamsTokens),
do_batch_insert(InstanceId, BatchReq, InsertSQL, Tokens, State)
end;
Request ->
LogMeta = #{connector => InstanceId, first_request => Request, state => State},
?SLOG(error, LogMeta#{msg => "invalid request"}),
{error, {unrecoverable_error, invalid_request}} {error, {unrecoverable_error, invalid_request}}
end. end.
%% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process
on_batch_query(
InstanceId,
[{Key, _} | _] = BatchReq,
#{batch_tokens := BatchTksMap, query_opts := Opts} = State
) ->
case maps:find(Key, BatchTksMap) of
{ok, Tokens} ->
do_query_job(
InstanceId,
{?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]},
State
);
_ ->
{error, {unrecoverable_error, batch_prepare_not_implemented}}
end;
on_batch_query(InstanceId, BatchReq, State) ->
LogMeta = #{connector => InstanceId, request => BatchReq, state => State},
?SLOG(error, LogMeta#{msg => "invalid request"}),
{error, {unrecoverable_error, invalid_request}}.
on_get_status(_InstanceId, #{pool_name := PoolName}) -> on_get_status(_InstanceId, #{pool_name := PoolName}) ->
Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
status_result(Health). status_result(Health).
@ -167,17 +171,16 @@ status_result(_Status = false) -> connecting.
%% Helper fns %% Helper fns
%%======================================================================================== %%========================================================================================
do_batch_insert(InstanceId, BatchReqs, InsertPart, Tokens, State) -> do_query(InstanceId, Query, #{query_opts := Opts} = State) ->
SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, InsertPart, Tokens), do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State).
do_query(InstanceId, SQL, State).
do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State) -> do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
?TRACE( ?TRACE(
"QUERY", "QUERY",
"tdengine_connector_received", "tdengine_connector_received",
#{connector => InstanceId, query => Query, state => State} #{connector => InstanceId, job => Job, state => State}
), ),
Result = ecpool:pick_and_do(PoolName, {?MODULE, execute, [Query, Opts]}, no_handover), Result = ecpool:pick_and_do(PoolName, Job, no_handover),
case Result of case Result of
{error, Reason} -> {error, Reason} ->
@ -188,7 +191,7 @@ do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State
?SLOG(error, #{ ?SLOG(error, #{
msg => "tdengine_connector_do_query_failed", msg => "tdengine_connector_do_query_failed",
connector => InstanceId, connector => InstanceId,
query => Query, job => Job,
reason => Reason reason => Reason
}), }),
Result; Result;
@ -203,6 +206,35 @@ do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State
execute(Conn, Query, Opts) -> execute(Conn, Query, Opts) ->
tdengine:insert(Conn, Query, Opts). tdengine:insert(Conn, Query, Opts).
do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
Queries = aggregate_query(Tokens, BatchReqs),
SQL = maps:fold(
fun(InsertPart, Values, Acc) ->
lists:foldl(
fun(ValuePart, IAcc) ->
<<IAcc/binary, " ", ValuePart/binary>>
end,
<<Acc/binary, " ", InsertPart/binary, " VALUES">>,
Values
)
end,
<<"INSERT INTO">>,
Queries
),
execute(Conn, SQL, Opts).
aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) ->
lists:foldl(
fun({_, Data}, Acc) ->
InsertPart = emqx_plugin_libs_rule:proc_sql_param_str(InsertPartTks, Data),
ParamsPart = emqx_plugin_libs_rule:proc_sql_param_str(ParamsPartTks, Data),
Values = maps:get(InsertPart, Acc, []),
maps:put(InsertPart, [ParamsPart | Values], Acc)
end,
#{},
BatchReqs
).
connect(Opts) -> connect(Opts) ->
tdengine:start_link(Opts). tdengine:start_link(Opts).
@ -218,32 +250,49 @@ parse_prepare_sql(Config) ->
parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}). parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}).
parse_batch_prepare_sql([{Key, H} | T], BatchInserts, BatchTks) -> parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
case emqx_plugin_libs_rule:detect_sql_type(H) of case emqx_plugin_libs_rule:detect_sql_type(H) of
{ok, select} -> {ok, select} ->
parse_batch_prepare_sql(T, BatchInserts, BatchTks); parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap);
{ok, insert} -> {ok, insert} ->
case emqx_plugin_libs_rule:split_insert_sql(H) of InsertTks = emqx_plugin_libs_rule:preproc_tmpl(H),
{ok, {InsertSQL, Params}} -> H1 = string:trim(H, trailing, ";"),
ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(Params), case split_insert_sql(H1) of
[_InsertStr, InsertPart, _ValuesStr, ParamsPart] ->
InsertPartTks = emqx_plugin_libs_rule:preproc_tmpl(InsertPart),
ParamsPartTks = emqx_plugin_libs_rule:preproc_tmpl(ParamsPart),
parse_batch_prepare_sql( parse_batch_prepare_sql(
T, T,
BatchInserts#{Key => InsertSQL}, InsertTksMap#{Key => InsertTks},
BatchTks#{Key => ParamsTks} BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}}
); );
{error, Reason} -> Result ->
?SLOG(error, #{msg => "split sql failed", sql => H, reason => Reason}), ?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}),
parse_batch_prepare_sql(T, BatchInserts, BatchTks) parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
end; end;
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}), ?SLOG(error, #{msg => "detect sql type failed", sql => H, reason => Reason}),
parse_batch_prepare_sql(T, BatchInserts, BatchTks) parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
end; end;
parse_batch_prepare_sql([], BatchInserts, BatchTks) -> parse_batch_prepare_sql([], InsertTksMap, BatchTksMap) ->
#{ #{
batch_inserts => BatchInserts, insert_tokens => InsertTksMap,
batch_params_tokens => BatchTks batch_tokens => BatchTksMap
}. }.
to_bin(List) when is_list(List) -> to_bin(List) when is_list(List) ->
unicode:characters_to_binary(List, utf8). unicode:characters_to_binary(List, utf8).
split_insert_sql(SQL0) ->
SQL = emqx_plugin_libs_rule:formalize_sql(SQL0),
lists:filtermap(
fun(E) ->
case string:trim(E) of
<<>> ->
false;
E1 ->
{true, E1}
end
end,
re:split(SQL, "(?i)(insert into)|(?i)(values)")
).

View File

@ -24,9 +24,21 @@
");" ");"
). ).
-define(SQL_DROP_TABLE, "DROP TABLE t_mqtt_msg"). -define(SQL_DROP_TABLE, "DROP TABLE t_mqtt_msg").
-define(SQL_DELETE, "DELETE from t_mqtt_msg"). -define(SQL_DROP_STABLE, "DROP STABLE s_tab").
-define(SQL_DELETE, "DELETE FROM t_mqtt_msg").
-define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg"). -define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg").
-define(AUTO_CREATE_BRIDGE,
"insert into ${clientid} USING s_tab TAGS (${clientid}) values (${timestamp}, ${payload})"
).
-define(SQL_CREATE_STABLE,
"CREATE STABLE s_tab (\n"
" ts timestamp,\n"
" payload BINARY(1024)\n"
") TAGS (clientid BINARY(128));"
).
% DB defaults % DB defaults
-define(TD_DATABASE, "mqtt"). -define(TD_DATABASE, "mqtt").
-define(TD_USERNAME, "root"). -define(TD_USERNAME, "root").
@ -53,12 +65,13 @@ all() ->
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout], NonBatchCases = [t_write_timeout],
MustBatchCases = [t_batch_insert, t_auto_create_batch_insert],
BatchingGroups = [{group, with_batch}, {group, without_batch}], BatchingGroups = [{group, with_batch}, {group, without_batch}],
[ [
{async, BatchingGroups}, {async, BatchingGroups},
{sync, BatchingGroups}, {sync, BatchingGroups},
{with_batch, TCs -- NonBatchCases}, {with_batch, TCs -- NonBatchCases},
{without_batch, TCs} {without_batch, TCs -- MustBatchCases}
]. ].
init_per_group(async, Config) -> init_per_group(async, Config) ->
@ -117,7 +130,8 @@ common_init(ConfigT) ->
Config0 = [ Config0 = [
{td_host, Host}, {td_host, Host},
{td_port, Port}, {td_port, Port},
{proxy_name, "tdengine_restful"} {proxy_name, "tdengine_restful"},
{template, ?SQL_BRIDGE}
| ConfigT | ConfigT
], ],
@ -165,6 +179,7 @@ tdengine_config(BridgeType, Config) ->
false -> 1 false -> 1
end, end,
QueryMode = ?config(query_mode, Config), QueryMode = ?config(query_mode, Config),
Template = ?config(template, Config),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.~s.~s {\n" "bridges.~s.~s {\n"
@ -187,7 +202,7 @@ tdengine_config(BridgeType, Config) ->
?TD_DATABASE, ?TD_DATABASE,
?TD_USERNAME, ?TD_USERNAME,
?TD_PASSWORD, ?TD_PASSWORD,
?SQL_BRIDGE, Template,
BatchSize, BatchSize,
QueryMode QueryMode
] ]
@ -272,11 +287,15 @@ connect_direct_tdengine(Config) ->
connect_and_create_table(Config) -> connect_and_create_table(Config) ->
?WITH_CON(begin ?WITH_CON(begin
{ok, _} = directly_query(Con, ?SQL_CREATE_DATABASE, []), {ok, _} = directly_query(Con, ?SQL_CREATE_DATABASE, []),
{ok, _} = directly_query(Con, ?SQL_CREATE_TABLE) {ok, _} = directly_query(Con, ?SQL_CREATE_TABLE),
{ok, _} = directly_query(Con, ?SQL_CREATE_STABLE)
end). end).
connect_and_drop_table(Config) -> connect_and_drop_table(Config) ->
?WITH_CON({ok, _} = directly_query(Con, ?SQL_DROP_TABLE)). ?WITH_CON(begin
{ok, _} = directly_query(Con, ?SQL_DROP_TABLE),
{ok, _} = directly_query(Con, ?SQL_DROP_STABLE)
end).
connect_and_clear_table(Config) -> connect_and_clear_table(Config) ->
?WITH_CON({ok, _} = directly_query(Con, ?SQL_DELETE)). ?WITH_CON({ok, _} = directly_query(Con, ?SQL_DELETE)).
@ -287,6 +306,15 @@ connect_and_get_payload(Config) ->
), ),
Result. Result.
connect_and_exec(Config, SQL) ->
?WITH_CON({ok, _} = directly_query(Con, SQL)).
connect_and_query(Config, SQL) ->
?WITH_CON(
{ok, #{<<"code">> := 0, <<"data">> := Data}} = directly_query(Con, SQL)
),
Data.
directly_query(Con, Query) -> directly_query(Con, Query) ->
directly_query(Con, Query, [{db_name, ?TD_DATABASE}]). directly_query(Con, Query, [{db_name, ?TD_DATABASE}]).
@ -407,7 +435,7 @@ t_write_failure(Config) ->
#{?snk_kind := buffer_worker_flush_ack}, #{?snk_kind := buffer_worker_flush_ack},
2_000 2_000
), ),
?assertMatch({error, econnrefused}, Result), ?assertMatch({error, Reason} when Reason =:= econnrefused; Reason =:= closed, Result),
ok ok
end), end),
ok. ok.
@ -490,26 +518,19 @@ t_missing_data(Config) ->
ok. ok.
t_bad_sql_parameter(Config) -> t_bad_sql_parameter(Config) ->
EnableBatch = ?config(enable_batch, Config),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Request = {sql, <<"">>, [bad_parameter]}, Request = {send_message, <<"">>},
{_, {ok, #{result := Result}}} = {_, {ok, #{result := Result}}} =
?wait_async_action( ?wait_async_action(
query_resource(Config, Request), query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack}, #{?snk_kind := buffer_worker_flush_ack},
2_000 2_000
), ),
case EnableBatch of
true -> ?assertMatch({error, #{<<"code">> := _}}, Result),
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false ->
?assertMatch(
{error, {unrecoverable_error, _}}, Result
)
end,
ok. ok.
t_nasty_sql_string(Config) -> t_nasty_sql_string(Config) ->
@ -544,7 +565,165 @@ t_nasty_sql_string(Config) ->
connect_and_get_payload(Config) connect_and_get_payload(Config)
). ).
t_simple_insert(Config) ->
connect_and_clear_table(Config),
?assertMatch(
{ok, _},
create_bridge(Config)
),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
Request = {send_message, SentData},
{_, {ok, #{result := _Result}}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch(
?PAYLOAD,
connect_and_get_payload(Config)
).
t_batch_insert(Config) ->
connect_and_clear_table(Config),
?assertMatch(
{ok, _},
create_bridge(Config)
),
Size = 5,
Ts = erlang:system_time(millisecond),
{_, {ok, #{result := _Result}}} =
?wait_async_action(
lists:foreach(
fun(Idx) ->
SentData = #{payload => ?PAYLOAD, timestamp => Ts + Idx},
Request = {send_message, SentData},
query_resource(Config, Request)
end,
lists:seq(1, Size)
),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?retry(
_Sleep = 50,
_Attempts = 30,
?assertMatch(
[[Size]],
connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg")
)
).
t_auto_create_simple_insert(Config0) ->
ClientId = to_str(?FUNCTION_NAME),
Config = get_auto_create_config(Config0),
?assertMatch(
{ok, _},
create_bridge(Config)
),
SentData = #{
payload => ?PAYLOAD,
timestamp => 1668602148000,
clientid => ClientId
},
Request = {send_message, SentData},
{_, {ok, #{result := _Result}}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch(
[[?PAYLOAD]],
connect_and_query(Config, "SELECT payload FROM " ++ ClientId)
),
?assertMatch(
[[0]],
connect_and_query(Config, "DROP TABLE " ++ ClientId)
).
t_auto_create_batch_insert(Config0) ->
ClientId1 = "client1",
ClientId2 = "client2",
Config = get_auto_create_config(Config0),
?assertMatch(
{ok, _},
create_bridge(Config)
),
Size1 = 2,
Size2 = 3,
Ts = erlang:system_time(millisecond),
{_, {ok, #{result := _Result}}} =
?wait_async_action(
lists:foreach(
fun({Offset, ClientId, Size}) ->
lists:foreach(
fun(Idx) ->
SentData = #{
payload => ?PAYLOAD,
timestamp => Ts + Idx + Offset,
clientid => ClientId
},
Request = {send_message, SentData},
query_resource(Config, Request)
end,
lists:seq(1, Size)
)
end,
[{0, ClientId1, Size1}, {100, ClientId2, Size2}]
),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?retry(
_Sleep = 50,
_Attempts = 30,
?assertMatch(
[[Size1]],
connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1)
)
),
?retry(
50,
30,
?assertMatch(
[[Size2]],
connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2)
)
),
?assertMatch(
[[0]],
connect_and_query(Config, "DROP TABLE " ++ ClientId1)
),
?assertMatch(
[[0]],
connect_and_query(Config, "DROP TABLE " ++ ClientId2)
).
to_bin(List) when is_list(List) -> to_bin(List) when is_list(List) ->
unicode:characters_to_binary(List, utf8); unicode:characters_to_binary(List, utf8);
to_bin(Bin) when is_binary(Bin) -> to_bin(Bin) when is_binary(Bin) ->
Bin. Bin.
to_str(Atom) when is_atom(Atom) ->
erlang:atom_to_list(Atom).
get_auto_create_config(Config0) ->
Config = lists:keyreplace(template, 1, Config0, {template, ?AUTO_CREATE_BRIDGE}),
BridgeType = proplists:get_value(bridge_type, Config, <<"tdengine">>),
{_Name, TDConf} = tdengine_config(BridgeType, Config),
lists:keyreplace(tdengine_config, 1, Config, {tdengine_config, TDConf}).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_plugin_libs, [ {application, emqx_plugin_libs, [
{description, "EMQX Plugin utility libs"}, {description, "EMQX Plugin utility libs"},
{vsn, "4.3.10"}, {vsn, "4.3.11"},
{modules, []}, {modules, []},
{applications, [kernel, stdlib]}, {applications, [kernel, stdlib]},
{env, []} {env, []}

View File

@ -32,7 +32,8 @@
proc_cql_param_str/2, proc_cql_param_str/2,
split_insert_sql/1, split_insert_sql/1,
detect_sql_type/1, detect_sql_type/1,
proc_batch_sql/3 proc_batch_sql/3,
formalize_sql/1
]). ]).
%% type converting %% type converting
@ -126,7 +127,8 @@ proc_cql_param_str(Tokens, Data) ->
-spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when -spec split_insert_sql(binary()) -> {ok, {InsertSQL, Params}} | {error, atom()} when
InsertSQL :: binary(), InsertSQL :: binary(),
Params :: binary(). Params :: binary().
split_insert_sql(SQL) -> split_insert_sql(SQL0) ->
SQL = formalize_sql(SQL0),
case re:split(SQL, "((?i)values)", [{return, binary}]) of case re:split(SQL, "((?i)values)", [{return, binary}]) of
[Part1, _, Part3] -> [Part1, _, Part3] ->
case string:trim(Part1, leading) of case string:trim(Part1, leading) of
@ -173,6 +175,12 @@ proc_batch_sql(BatchReqs, InsertPart, Tokens) ->
), ),
<<InsertPart/binary, " values ", ValuesPart/binary>>. <<InsertPart/binary, " values ", ValuesPart/binary>>.
formalize_sql(Input) ->
%% 1. replace all whitespaces like '\r' '\n' or spaces to a single space char.
SQL = re:replace(Input, "\\s+", " ", [global, {return, binary}]),
%% 2. trims the result
string:trim(SQL).
unsafe_atom_key(Key) when is_atom(Key) -> unsafe_atom_key(Key) when is_atom(Key) ->
Key; Key;
unsafe_atom_key(Key) when is_binary(Key) -> unsafe_atom_key(Key) when is_binary(Key) ->

View File

@ -0,0 +1,3 @@
Add support for the `Supertable` and `Create Tables Automatically` features of TDEngine to its data bridge.
Before this fix, an insert with a supertable in the template will fail, like this:
`insert into ${clientid} using msg TAGS (${clientid}) values (${ts},${msg})`.