feat(tdengine): improve the TDengine bridge to v2 style

This commit is contained in:
firest 2024-02-01 23:12:59 +08:00
parent 5bc67cb288
commit dfad020c49
10 changed files with 492 additions and 530 deletions

View File

@ -102,7 +102,8 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_iotdb_action_info, emqx_bridge_iotdb_action_info,
emqx_bridge_es_action_info, emqx_bridge_es_action_info,
emqx_bridge_opents_action_info, emqx_bridge_opents_action_info,
emqx_bridge_greptimedb_action_info emqx_bridge_greptimedb_action_info,
emqx_bridge_tdengine_action_info
]. ].
-else. -else.
hard_coded_action_info_modules_ee() -> hard_coded_action_info_modules_ee() ->

View File

@ -74,7 +74,7 @@ desc(connector_resource_opts) ->
desc("config_connector") -> desc("config_connector") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for IoTDB using `", string:to_upper(Method), "` method."]; ["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."];
desc(_) -> desc(_) ->
undefined. undefined.

View File

@ -9,35 +9,21 @@
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-export([ -export([conn_bridge_examples/1, values/1, bridge_v2_examples/1]).
conn_bridge_examples/1, -export([namespace/0, roots/0, fields/1, desc/1]).
values/1
]).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
-define(DEFAULT_SQL, << -define(DEFAULT_SQL, <<
"insert into t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) " "insert into t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, "
"values (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})" "arrived) values (${ts}, '${id}', '${topic}', ${qos}, '${payload}', "
"${timestamp})"
>>). >>).
-define(CONNECTOR_TYPE, tdengine).
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% api %% v1 examples
conn_bridge_examples(Method) -> conn_bridge_examples(Method) ->
[ [#{<<"tdengine">> => #{summary => <<"TDengine Bridge">>, value => values(Method)}}].
#{
<<"tdengine">> => #{
summary => <<"TDengine Bridge">>,
value => values(Method)
}
}
].
values(_Method) -> values(_Method) ->
#{ #{
@ -51,21 +37,46 @@ values(_Method) ->
password => <<"******">>, password => <<"******">>,
sql => ?DEFAULT_SQL, sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,
resource_opts => #{ resource_opts =>
worker_pool_size => 8, #{
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, worker_pool_size => 8,
batch_size => ?DEFAULT_BATCH_SIZE, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
batch_time => ?DEFAULT_BATCH_TIME, batch_size => ?DEFAULT_BATCH_SIZE,
query_mode => sync, batch_time => ?DEFAULT_BATCH_TIME,
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES query_mode => sync,
max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
}
}.
%% -------------------------------------------------------------------------------------------------
%% v2 examples
bridge_v2_examples(Method) ->
[
#{
<<"tdengine">> => #{
summary => <<"TDengine Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
)
}
}
].
action_values() ->
#{
parameters => #{
database => <<"mqtt">>,
sql => ?DEFAULT_SQL
} }
}. }.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions %% v1 Hocon Schema Definitions
namespace() -> "bridge_tdengine". namespace() ->
"bridge_tdengine".
roots() -> []. roots() ->
[].
fields("config") -> fields("config") ->
[ [
@ -73,24 +84,68 @@ fields("config") ->
{sql, {sql,
mk( mk(
binary(), binary(),
#{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} #{
desc => ?DESC("sql_template"),
default => ?DEFAULT_SQL,
format => <<"sql">>
}
)}, )},
{local_topic, {local_topic, mk(binary(), #{desc => ?DESC("local_topic"), default => undefined})}
mk( ] ++
binary(), emqx_resource_schema:fields("resource_opts") ++
#{desc => ?DESC("local_topic"), default => undefined}
)}
] ++ emqx_resource_schema:fields("resource_opts") ++
emqx_bridge_tdengine_connector:fields(config); emqx_bridge_tdengine_connector:fields(config);
fields("post") -> fields("post") ->
[type_field(), name_field() | fields("config")]; [type_field(), name_field() | fields("config")];
fields("put") -> fields("put") ->
fields("config"); fields("config");
fields("get") -> fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post"). emqx_bridge_schema:status_fields() ++ fields("post");
%% -------------------------------------------------------------------------------------------------
%% v2 Hocon Schema Definitions
fields(action) ->
{tdengine,
mk(
hoconsc:map(name, ref(?MODULE, action_config)),
#{
desc => <<"TDengine Action Config">>,
required => false
}
)};
fields(action_config) ->
emqx_bridge_v2_schema:make_producer_action_schema(
mk(
ref(?MODULE, action_parameters),
#{
required => true, desc => ?DESC("action_parameters")
}
)
);
fields(action_parameters) ->
[
{database, fun emqx_connector_schema_lib:database/1},
{sql,
mk(
binary(),
#{
desc => ?DESC("sql_template"),
default => ?DEFAULT_SQL,
format => <<"sql">>
}
)}
];
fields("post_bridge_v2") ->
emqx_bridge_schema:type_and_name_fields(enum([tdengine])) ++ fields(action_config);
fields("put_bridge_v2") ->
fields(action_config);
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2").
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(action_config) ->
?DESC("desc_config");
desc(action_parameters) ->
?DESC("action_parameters");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for TDengine using `", string:to_upper(Method), "` method."]; ["Configuration for TDengine using `", string:to_upper(Method), "` method."];
desc(_) -> desc(_) ->

View File

@ -0,0 +1,71 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_tdengine_action_info).
-behaviour(emqx_action_info).
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
%% behaviour callbacks
-export([
action_type_name/0,
bridge_v1_config_to_action_config/2,
bridge_v1_config_to_connector_config/1,
bridge_v1_type_name/0,
connector_action_config_to_bridge_v1_config/2,
connector_type_name/0,
schema_module/0
]).
-import(emqx_utils_conv, [bin/1]).
-define(ACTION_TYPE, tdengine).
-define(SCHEMA_MODULE, emqx_bridge_tdengine).
action_type_name() -> ?ACTION_TYPE.
bridge_v1_type_name() -> ?ACTION_TYPE.
connector_type_name() -> ?ACTION_TYPE.
schema_module() -> ?SCHEMA_MODULE.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
MergedConfig =
emqx_utils_maps:deep_merge(
maps:without(
[<<"description">>, <<"local_topic">>, <<"connector">>, <<"data">>],
emqx_utils_maps:unindent(<<"parameters">>, ActionConfig)
),
ConnectorConfig
),
BridgeV1Keys = schema_keys("config"),
maps:with(BridgeV1Keys, MergedConfig).
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionTopLevelKeys = schema_keys(action_config),
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
ActionConfig#{<<"connector">> => ConnectorName}
).
bridge_v1_config_to_connector_config(BridgeV1Config) ->
ConnectorKeys = schema_keys(emqx_bridge_tdengine_connector, "config_connector"),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
maps:with(ConnectorKeys, BridgeV1Config)
).
make_config_map(PickKeys, IndentKeys, Config) ->
Conf0 = maps:with(PickKeys, Config),
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
schema_keys(Name) ->
schema_keys(?SCHEMA_MODULE, Name).
schema_keys(Mod, Name) ->
[bin(Key) || Key <- proplists:get_keys(Mod:fields(Name))].

View File

@ -11,7 +11,7 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-export([roots/0, fields/1]). -export([namespace/0, roots/0, fields/1, desc/1]).
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
@ -20,9 +20,15 @@
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_get_status/2 on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]). ]).
-export([connector_examples/1]).
-export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]). -export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]).
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
@ -31,8 +37,12 @@
default_port => 6041 default_port => 6041
}). }).
-define(CONNECTOR_TYPE, tdengine).
namespace() -> "tdengine_connector".
%%===================================================================== %%=====================================================================
%% Hocon schema %% V1 Hocon schema
roots() -> roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}]. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
@ -40,17 +50,45 @@ fields(config) ->
[ [
{server, server()} {server, server()}
| adjust_fields(emqx_connector_schema_lib:relational_db_fields()) | adjust_fields(emqx_connector_schema_lib:relational_db_fields())
]. ];
%%=====================================================================
%% V2 Hocon schema
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
fields(config) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields("post") ->
emqx_connector_schema:type_and_name_fields(enum([tdengine])) ++ fields("config_connector");
fields("put") ->
fields("config_connector");
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post").
desc(config) ->
?DESC("desc_config");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc("config_connector") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for TDengine using `", string:to_upper(Method), "` method."];
desc(_) ->
undefined.
adjust_fields(Fields) -> adjust_fields(Fields) ->
lists:map( lists:filtermap(
fun fun
({username, OrigUsernameFn}) -> ({username, OrigUsernameFn}) ->
{username, add_default_fn(OrigUsernameFn, <<"root">>)}; {true, {username, add_default_fn(OrigUsernameFn, <<"root">>)}};
({password, _}) -> ({password, _}) ->
{password, emqx_connector_schema_lib:password_field(#{required => true})}; {true, {password, emqx_connector_schema_lib:password_field(#{required => true})}};
(Field) -> ({database, _}) ->
Field false;
(_Field) ->
true
end, end,
Fields Fields
). ).
@ -65,6 +103,32 @@ server() ->
Meta = #{desc => ?DESC("server")}, Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?TD_HOST_OPTIONS). emqx_schema:servers_sc(Meta, ?TD_HOST_OPTIONS).
%%=====================================================================
%% V2 Hocon schema
connector_examples(Method) ->
[
#{
<<"tdengine">> =>
#{
summary => <<"TDengine Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?CONNECTOR_TYPE, connector_example_values()
)
}
}
].
connector_example_values() ->
#{
name => <<"tdengine_connector">>,
type => tdengine,
enable => true,
server => <<"127.0.0.1:6041">>,
pool_size => 8,
username => <<"root">>,
password => <<"******">>
}.
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
@ -93,11 +157,10 @@ on_start(
{username, Username}, {username, Username},
{password, Password}, {password, Password},
{pool_size, PoolSize}, {pool_size, PoolSize},
{pool, binary_to_atom(InstanceId, utf8)} {pool, InstanceId}
], ],
Prepares = parse_prepare_sql(Config), State = #{pool_name => InstanceId, channels => #{}},
State = Prepares#{pool_name => InstanceId, query_opts => query_opts(Config)},
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -110,34 +173,33 @@ on_stop(InstanceId, _State) ->
msg => "stopping_tdengine_connector", msg => "stopping_tdengine_connector",
connector => InstanceId connector => InstanceId
}), }),
?tp(tdengine_connector_stop, #{instance_id => InstanceId}),
emqx_resource_pool:stop(InstanceId). emqx_resource_pool:stop(InstanceId).
on_query(InstanceId, {query, SQL}, State) -> on_query(InstanceId, {ChannelId, Data}, #{channels := Channels} = State) ->
do_query(InstanceId, SQL, State); case maps:find(ChannelId, Channels) of
on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> {ok, #{insert := Tokens, opts := Opts}} ->
case maps:find(Key, InsertTksMap) of Query = emqx_placeholder:proc_tmpl(Tokens, Data),
{ok, Tokens} when is_map(Data) -> do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State);
SQL = emqx_placeholder:proc_tmpl(Tokens, Data),
do_query(InstanceId, SQL, State);
_ -> _ ->
{error, {unrecoverable_error, invalid_request}} {error, {unrecoverable_error, {invalid_channel_id, InstanceId}}}
end. end.
%% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process %% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process
on_batch_query( on_batch_query(
InstanceId, InstanceId,
[{Key, _Data = #{}} | _] = BatchReq, [{ChannelId, _Data = #{}} | _] = BatchReq,
#{batch_tokens := BatchTksMap, query_opts := Opts} = State #{channels := Channels} = State
) -> ) ->
case maps:find(Key, BatchTksMap) of case maps:find(ChannelId, Channels) of
{ok, Tokens} -> {ok, #{batch := Tokens, opts := Opts}} ->
do_query_job( do_query_job(
InstanceId, InstanceId,
{?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]}, {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]},
State State
); );
_ -> _ ->
{error, {unrecoverable_error, batch_prepare_not_implemented}} {error, {unrecoverable_error, {invalid_channel_id, InstanceId}}}
end; end;
on_batch_query(InstanceId, BatchReq, State) -> on_batch_query(InstanceId, BatchReq, State) ->
LogMeta = #{connector => InstanceId, request => BatchReq, state => State}, LogMeta = #{connector => InstanceId, request => BatchReq, state => State},
@ -157,13 +219,46 @@ do_get_status(Conn) ->
status_result(_Status = true) -> connected; status_result(_Status = true) -> connected;
status_result(_Status = false) -> connecting. status_result(_Status = false) -> connecting.
on_add_channel(
_InstanceId,
#{channels := Channels} = OldState,
ChannelId,
#{
parameters := #{database := Database, sql := SQL}
}
) ->
case maps:is_key(ChannelId, Channels) of
true ->
{error, already_exists};
_ ->
case parse_prepare_sql(SQL) of
{ok, Result} ->
Opts = [{db_name, Database}],
Channels2 = Channels#{ChannelId => Result#{opts => Opts}},
{ok, OldState#{channels := Channels2}};
Error ->
Error
end
end.
on_remove_channel(_InstanceId, #{channels := Channels} = OldState, ChannelId) ->
{ok, OldState#{channels => maps:remove(ChannelId, Channels)}}.
on_get_channels(InstanceId) ->
emqx_bridge_v2:get_channels_for_connector(InstanceId).
on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) ->
case maps:is_key(ChannelId, Channels) of
true ->
on_get_status(InstanceId, State);
_ ->
{error, not_exists}
end.
%%======================================================================================== %%========================================================================================
%% Helper fns %% Helper fns
%%======================================================================================== %%========================================================================================
do_query(InstanceId, Query, #{query_opts := Opts} = State) ->
do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State).
do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
?TRACE( ?TRACE(
"QUERY", "QUERY",
@ -171,12 +266,11 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
#{connector => InstanceId, job => Job, state => State} #{connector => InstanceId, job => Job, state => State}
), ),
Result = ecpool:pick_and_do(PoolName, Job, no_handover), Result = ecpool:pick_and_do(PoolName, Job, no_handover),
case Result of case Result of
{error, Reason} -> {error, Reason} ->
?tp( ?tp(
tdengine_connector_query_return, tdengine_connector_query_return,
#{error => Reason} #{instance_id => InstanceId, error => Reason}
), ),
?SLOG(error, #{ ?SLOG(error, #{
msg => "tdengine_connector_do_query_failed", msg => "tdengine_connector_do_query_failed",
@ -193,7 +287,7 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
_ -> _ ->
?tp( ?tp(
tdengine_connector_query_return, tdengine_connector_query_return,
#{result => Result} #{instance_id => InstanceId, result => Result}
), ),
Result Result
end. end.
@ -221,49 +315,23 @@ connect(Opts) ->
NOpts = [{password, emqx_secret:unwrap(Secret)} | OptsRest], NOpts = [{password, emqx_secret:unwrap(Secret)} | OptsRest],
tdengine:start_link(NOpts). tdengine:start_link(NOpts).
query_opts(#{database := Database} = _Opts) -> parse_prepare_sql(SQL) ->
[{db_name, Database}]. case emqx_utils_sql:get_statement_type(SQL) of
parse_prepare_sql(Config) ->
SQL =
case maps:get(sql, Config, undefined) of
undefined -> #{};
Template -> #{send_message => Template}
end,
parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}).
parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
case emqx_utils_sql:get_statement_type(H) of
select ->
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap);
insert -> insert ->
InsertTks = emqx_placeholder:preproc_tmpl(H), InsertTks = emqx_placeholder:preproc_tmpl(SQL),
H1 = string:trim(H, trailing, ";"), SQL1 = string:trim(SQL, trailing, ";"),
case split_insert_sql(H1) of case split_insert_sql(SQL1) of
[_InsertPart, BatchDesc] -> [_InsertPart, BatchDesc] ->
BatchTks = emqx_placeholder:preproc_tmpl(BatchDesc), BatchTks = emqx_placeholder:preproc_tmpl(BatchDesc),
parse_batch_prepare_sql( {ok, #{insert => InsertTks, batch => BatchTks}};
T,
InsertTksMap#{Key => InsertTks},
BatchTksMap#{Key => BatchTks}
);
Result -> Result ->
?SLOG(error, #{msg => "split_sql_failed", sql => H, result => Result}), {error, #{msg => "split_sql_failed", sql => SQL, result => Result}}
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap)
end; end;
Type when is_atom(Type) -> Type when is_atom(Type) ->
?SLOG(error, #{msg => "detect_sql_type_unsupported", sql => H, type => Type}), {error, #{msg => "detect_sql_type_unsupported", sql => SQL, type => Type}};
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{msg => "detect_sql_type_failed", sql => H, reason => Reason}), {error, #{msg => "detect_sql_type_failed", sql => SQL, reason => Reason}}
parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) end.
end;
parse_batch_prepare_sql([], InsertTksMap, BatchTksMap) ->
#{
insert_tokens => InsertTksMap,
batch_tokens => BatchTksMap
}.
to_bin(List) when is_list(List) -> to_bin(List) when is_list(List) ->
unicode:characters_to_binary(List, utf8). unicode:characters_to_binary(List, utf8).

View File

@ -54,6 +54,11 @@
ok = tdengine:stop(Con) ok = tdengine:stop(Con)
). ).
-define(BRIDGE_TYPE_BIN, <<"tdengine">>).
-define(APPS, [
hackney, tdengine, emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_tdengine
]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% CT boilerplate %% CT boilerplate
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -66,16 +71,21 @@ all() ->
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout],
MustBatchCases = [t_batch_insert, t_auto_create_batch_insert], MustBatchCases = [t_batch_insert, t_auto_create_batch_insert],
BatchingGroups = [{group, with_batch}, {group, without_batch}], BatchingGroups = [{group, with_batch}, {group, without_batch}],
[ [
{async, BatchingGroups}, {async, BatchingGroups},
{sync, BatchingGroups}, {sync, BatchingGroups},
{with_batch, TCs -- NonBatchCases}, {with_batch, TCs},
{without_batch, TCs -- MustBatchCases} {without_batch, TCs -- MustBatchCases}
]. ].
init_per_suite(Config) ->
emqx_bridge_v2_testlib:init_per_suite(Config, ?APPS).
end_per_suite(Config) ->
emqx_bridge_v2_testlib:end_per_suite(Config).
init_per_group(async, Config) -> init_per_group(async, Config) ->
[{query_mode, async} | Config]; [{query_mode, async} | Config];
init_per_group(sync, Config) -> init_per_group(sync, Config) ->
@ -89,36 +99,37 @@ init_per_group(without_batch, Config0) ->
init_per_group(_Group, Config) -> init_per_group(_Group, Config) ->
Config. Config.
end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> end_per_group(default, Config) ->
connect_and_drop_table(Config), emqx_bridge_v2_testlib:end_per_group(Config),
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ok; ok;
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.
init_per_suite(Config) -> init_per_testcase(TestCase, Config0) ->
connect_and_clear_table(Config0),
Type = ?config(bridge_type, Config0),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = <<
(atom_to_binary(TestCase))/binary, UniqueNum/binary
>>,
{_ConfigString, ConnectorConfig} = connector_config(Name, Config0),
{_, ActionConfig} = action_config(TestCase, Name, Config0),
Config = [
{connector_type, Type},
{connector_name, Name},
{connector_config, ConnectorConfig},
{bridge_type, Type},
{bridge_name, Name},
{bridge_config, ActionConfig}
| Config0
],
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
ok = snabbkaffe:start_trace(),
Config. Config.
end_per_suite(_Config) -> end_per_testcase(TestCase, Config) ->
emqx_mgmt_api_test_util:end_suite(), emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config),
ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
ok.
init_per_testcase(_Testcase, Config) ->
connect_and_clear_table(Config), connect_and_clear_table(Config),
delete_bridge(Config),
snabbkaffe:start_trace(),
Config.
end_per_testcase(_Testcase, Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
connect_and_clear_table(Config),
ok = snabbkaffe:stop(),
delete_bridge(Config),
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -132,34 +143,14 @@ common_init(ConfigT) ->
Config0 = [ Config0 = [
{td_host, Host}, {td_host, Host},
{td_port, Port}, {td_port, Port},
{proxy_name, "tdengine_restful"}, {proxy_name, "tdengine_restful"}
{template, ?SQL_BRIDGE}
| ConfigT | ConfigT
], ],
BridgeType = proplists:get_value(bridge_type, Config0, <<"tdengine">>),
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
true -> true ->
% Setup toxiproxy Config = emqx_bridge_v2_testlib:init_per_group(default, ?BRIDGE_TYPE_BIN, Config0),
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), connect_and_create_table(Config),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
% Ensure enterprise bridge module is loaded
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, tdengine]),
_ = emqx_bridge_enterprise:module_info(),
emqx_mgmt_api_test_util:init_suite(),
% Connect to tdengine directly and create the table
connect_and_create_table(Config0),
{Name, TDConf} = tdengine_config(BridgeType, Config0),
Config =
[
{tdengine_config, TDConf},
{tdengine_bridge_type, BridgeType},
{tdengine_name, Name},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort}
| Config0
],
Config; Config;
false -> false ->
case os:getenv("IS_CI") of case os:getenv("IS_CI") of
@ -170,97 +161,100 @@ common_init(ConfigT) ->
end end
end. end.
tdengine_config(BridgeType, Config) -> action_config(TestCase, Name, Config) ->
Port = integer_to_list(?config(td_port, Config)), Type = ?config(bridge_type, Config),
Server = ?config(td_host, Config) ++ ":" ++ Port,
Name = atom_to_binary(?MODULE),
BatchSize = BatchSize =
case ?config(enable_batch, Config) of case ?config(enable_batch, Config) of
true -> ?BATCH_SIZE; true -> ?BATCH_SIZE;
false -> 1 false -> 1
end, end,
QueryMode = ?config(query_mode, Config), QueryMode = ?config(query_mode, Config),
Template = ?config(template, Config),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.~s.~s {\n" "actions.~s.~s {\n"
" enable = true\n" " enable = true\n"
" server = ~p\n" " connector = \"~s\"\n"
" database = ~p\n" " parameters = {\n"
" username = ~p\n" " database = ~p\n"
" password = ~p\n" " sql = ~p\n"
" sql = ~p\n" " }\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_ttl = 500ms\n" " request_ttl = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"
"}", "}\n",
[ [
BridgeType, Type,
Name,
Name, Name,
Server,
?TD_DATABASE, ?TD_DATABASE,
?TD_USERNAME, case TestCase of
?TD_PASSWORD, Auto when
Template, Auto =:= t_auto_create_simple_insert; Auto =:= t_auto_create_batch_insert
->
?AUTO_CREATE_BRIDGE;
_ ->
?SQL_BRIDGE
end,
BatchSize, BatchSize,
QueryMode QueryMode
] ]
), ),
{Name, parse_and_check(ConfigString, BridgeType, Name)}. ct:pal("ActionConfig:~ts~n", [ConfigString]),
{ConfigString, parse_action_and_check(ConfigString, Type, Name)}.
parse_and_check(ConfigString, BridgeType, Name) -> connector_config(Name, Config) ->
Host = ?config(td_host, Config),
Port = ?config(td_port, Config),
Type = ?config(bridge_type, Config),
Server = Host ++ ":" ++ integer_to_list(Port),
ConfigString =
io_lib:format(
"connectors.~s.~s {\n"
" enable = true\n"
" server = \"~s\"\n"
" username = ~p\n"
" password = ~p\n"
"}\n",
[
Type,
Name,
Server,
?TD_USERNAME,
?TD_PASSWORD
]
),
ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
{ConfigString, parse_connector_and_check(ConfigString, Type, Name)}.
parse_action_and_check(ConfigString, BridgeType, Name) ->
parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name).
parse_connector_and_check(ConfigString, ConnectorType, Name) ->
parse_and_check(
ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name
).
parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) ->
Type = to_bin(Type0),
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}), {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}),
#{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, #{RootKey := #{Type := #{Name := Config}}} = RawConf,
Config. Config.
create_bridge(Config) -> to_bin(List) when is_list(List) ->
create_bridge(Config, _Overrides = #{}). unicode:characters_to_binary(List, utf8);
to_bin(Atom) when is_atom(Atom) ->
create_bridge(Config, Overrides) -> erlang:atom_to_binary(Atom);
BridgeType = ?config(tdengine_bridge_type, Config), to_bin(Bin) when is_binary(Bin) ->
Name = ?config(tdengine_name, Config), Bin.
TDConfig0 = ?config(tdengine_config, Config),
TDConfig = emqx_utils_maps:deep_merge(TDConfig0, Overrides),
emqx_bridge:create(BridgeType, Name, TDConfig).
delete_bridge(Config) ->
BridgeType = ?config(tdengine_bridge_type, Config),
Name = ?config(tdengine_name, Config),
emqx_bridge:remove(BridgeType, Name).
create_bridge_http(Params) ->
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
Error -> Error
end.
send_message(Config, Payload) -> send_message(Config, Payload) ->
Name = ?config(tdengine_name, Config), BridgeType = ?config(bridge_type, Config),
BridgeType = ?config(tdengine_bridge_type, Config), Name = ?config(bridge_name, Config),
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), ct:print(">>> Name:~p~n BridgeType:~p~n", [Name, BridgeType]),
emqx_bridge:send_message(BridgeID, Payload). emqx_bridge_v2:send_message(BridgeType, Name, Payload, #{}).
query_resource(Config, Request) ->
Name = ?config(tdengine_name, Config),
BridgeType = ?config(tdengine_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
query_resource_async(Config, Request) ->
Name = ?config(tdengine_name, Config),
BridgeType = ?config(tdengine_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Return = emqx_resource:query(ResourceID, Request, #{
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
}),
{Return, Ref}.
receive_result(Ref, Timeout) -> receive_result(Ref, Timeout) ->
receive receive
@ -287,17 +281,13 @@ connect_direct_tdengine(Config) ->
% These funs connect and then stop the tdengine connection % These funs connect and then stop the tdengine connection
connect_and_create_table(Config) -> connect_and_create_table(Config) ->
?WITH_CON(begin ?WITH_CON(begin
{ok, _} = directly_query(Con, ?SQL_DROP_TABLE),
{ok, _} = directly_query(Con, ?SQL_DROP_STABLE),
{ok, _} = directly_query(Con, ?SQL_CREATE_DATABASE, []), {ok, _} = directly_query(Con, ?SQL_CREATE_DATABASE, []),
{ok, _} = directly_query(Con, ?SQL_CREATE_TABLE), {ok, _} = directly_query(Con, ?SQL_CREATE_TABLE),
{ok, _} = directly_query(Con, ?SQL_CREATE_STABLE) {ok, _} = directly_query(Con, ?SQL_CREATE_STABLE)
end). end).
connect_and_drop_table(Config) ->
?WITH_CON(begin
{ok, _} = directly_query(Con, ?SQL_DROP_TABLE),
{ok, _} = directly_query(Con, ?SQL_DROP_STABLE)
end).
connect_and_clear_table(Config) -> connect_and_clear_table(Config) ->
?WITH_CON({ok, _} = directly_query(Con, ?SQL_DELETE)). ?WITH_CON({ok, _} = directly_query(Con, ?SQL_DELETE)).
@ -322,275 +312,53 @@ directly_query(Con, Query) ->
directly_query(Con, Query, QueryOpts) -> directly_query(Con, Query, QueryOpts) ->
tdengine:insert(Con, Query, QueryOpts). tdengine:insert(Con, Query, QueryOpts).
is_success_check(Result) ->
?assertMatch({ok, #{<<"code">> := 0}}, Result).
to_str(Atom) when is_atom(Atom) ->
erlang:atom_to_list(Atom).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_setup_via_config_and_publish(Config) -> t_create_via_http(Config) ->
?assertMatch( emqx_bridge_v2_testlib:t_create_via_http(Config).
{ok, _},
create_bridge(Config)
),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
?check_trace(
begin
{_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, SentData),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch(
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, Result
),
?assertMatch(
[[?PAYLOAD], [?PAYLOAD]],
connect_and_get_payload(Config)
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(tdengine_connector_query_return, Trace0),
?assertMatch([#{result := {ok, #{<<"code">> := 0, <<"rows">> := 1}}}], Trace),
ok
end
),
ok.
t_setup_via_http_api_and_publish(Config) -> t_on_get_status(Config) ->
BridgeType = ?config(tdengine_bridge_type, Config), emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}).
Name = ?config(tdengine_name, Config),
QueryMode = ?config(query_mode, Config),
TDengineConfig0 = ?config(tdengine_config, Config),
TDengineConfig = TDengineConfig0#{
<<"name">> => Name,
<<"type">> => BridgeType
},
?assertMatch(
{ok, _},
create_bridge_http(TDengineConfig)
),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}, t_start_stop(Config) ->
?check_trace( emqx_bridge_v2_testlib:t_start_stop(Config, tdengine_connector_stop).
begin
Request = {send_message, SentData},
Res0 =
case QueryMode of
sync ->
query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
{ok, Res} = receive_result(Ref, 2_000),
Res
end,
?assertMatch( t_invalid_data(Config) ->
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, Res0 MakeMessageFun = fun() -> #{} end,
), IsSuccessCheck = fun(Result) ->
?assertMatch(
[[?PAYLOAD], [?PAYLOAD]],
connect_and_get_payload(Config)
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(tdengine_connector_query_return, Trace0),
?assertMatch([#{result := {ok, #{<<"code">> := 0, <<"rows">> := 1}}}], Trace),
ok
end
),
ok.
t_get_status(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
Name = ?config(tdengine_name, Config),
BridgeType = ?config(tdengine_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)),
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch( ?assertMatch(
{ok, Status} when Status =:= disconnected orelse Status =:= connecting, {error, #{
emqx_resource_manager:health_check(ResourceID) <<"code">> := 534,
<<"desc">> := _
}},
Result
) )
end),
ok.
t_write_failure(Config) ->
ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
{ok, _} = create_bridge(Config),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
{_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, SentData),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
case Result of
{error, Reason} when Reason =:= econnrefused; Reason =:= closed ->
ok;
_ ->
throw({unexpected, Result})
end,
ok
end),
ok.
% This test doesn't work with batch enabled since it is not possible
% to set the timeout directly for batch queries
t_write_timeout(Config) ->
ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(
Config,
#{
<<"resource_opts">> => #{
<<"request_ttl">> => <<"500ms">>,
<<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => <<"100ms">>
}
}
),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
%% FIXME: TDengine connector hangs indefinetily during
%% `call_query' while the connection is unresponsive. Should add
%% a timeout to `APPLY_RESOURCE' in buffer worker??
case QueryMode of
sync ->
emqx_common_test_helpers:with_failure(
timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
query_resource(Config, {send_message, SentData})
)
end
);
async ->
ct:comment("tdengine connector hangs the buffer worker forever")
end, end,
ok. ok = emqx_bridge_v2_testlib:t_sync_query(
Config, MakeMessageFun, IsSuccessCheck, tdengine_connector_query_return
t_simple_sql_query(Config) ->
EnableBatch = ?config(enable_batch, Config),
?assertMatch(
{ok, _},
create_bridge(Config)
), ),
Request = {query, <<"SELECT 1 AS T">>},
{_, {ok, #{result := Result}}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
case EnableBatch of
true ->
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false ->
?assertMatch({ok, #{<<"code">> := 0, <<"data">> := [[1]]}}, Result)
end,
ok. ok.
t_missing_data(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
{_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, #{}),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch(
{error, #{
<<"code">> := 534,
<<"desc">> := _
}},
Result
),
ok.
t_bad_sql_parameter(Config) ->
?assertMatch(
{ok, _},
create_bridge(Config)
),
Request = {send_message, <<"">>},
{_, {ok, #{result := Result}}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch({error, {unrecoverable_error, invalid_request}}, Result),
ok.
%% TODO
%% For supporting to generate a subtable name by mixing prefixes/suffixes with placeholders,
%% the SQL quote(escape) is removed now,
%% we should introduce a new syntax for placeholders to allow some vars to keep unquote.
%% t_nasty_sql_string(Config) ->
%% ?assertMatch(
%% {ok, _},
%% create_bridge(Config)
%% ),
%% % NOTE
%% % Column `payload` has BINARY type, so we would certainly like to test it
%% % with `lists:seq(1, 127)`, but:
%% % 1. There's no way to insert zero byte in an SQL string, seems that TDengine's
%% % parser[1] has no escaping sequence for it so a zero byte probably confuses
%% % interpreter somewhere down the line.
%% % 2. Bytes > 127 come back as U+FFFDs (i.e. replacement characters) in UTF-8 for
%% % some reason.
%% %
%% % [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301
%% Payload = list_to_binary(lists:seq(1, 127)),
%% Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
%% {_, {ok, #{result := Result}}} =
%% ?wait_async_action(
%% send_message(Config, Message),
%% #{?snk_kind := buffer_worker_flush_ack},
%% 2_000
%% ),
%% ?assertMatch(
%% {ok, #{<<"code">> := 0, <<"rows">> := 1}},
%% Result
%% ),
%% ?assertEqual(
%% Payload,
%% connect_and_get_payload(Config)
%% ).
t_simple_insert(Config) -> t_simple_insert(Config) ->
connect_and_clear_table(Config), connect_and_clear_table(Config),
?assertMatch(
{ok, _}, MakeMessageFun = fun() ->
create_bridge(Config) #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}
end,
ok = emqx_bridge_v2_testlib:t_sync_query(
Config, MakeMessageFun, fun is_success_check/1, tdengine_connector_query_return
), ),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
Request = {send_message, SentData},
{_, {ok, #{result := _Result}}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch( ?assertMatch(
[[?PAYLOAD], [?PAYLOAD]], [[?PAYLOAD], [?PAYLOAD]],
connect_and_get_payload(Config) connect_and_get_payload(Config)
@ -598,10 +366,7 @@ t_simple_insert(Config) ->
t_batch_insert(Config) -> t_batch_insert(Config) ->
connect_and_clear_table(Config), connect_and_clear_table(Config),
?assertMatch( ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
{ok, _},
create_bridge(Config)
),
Size = 5, Size = 5,
Ts = erlang:system_time(millisecond), Ts = erlang:system_time(millisecond),
@ -612,8 +377,7 @@ t_batch_insert(Config) ->
SentData = #{ SentData = #{
payload => ?PAYLOAD, timestamp => Ts + Idx, second_ts => Ts + Idx + 5000 payload => ?PAYLOAD, timestamp => Ts + Idx, second_ts => Ts + Idx + 5000
}, },
Request = {send_message, SentData}, send_message(Config, SentData)
query_resource(Config, Request)
end, end,
lists:seq(1, Size) lists:seq(1, Size)
), ),
@ -632,27 +396,22 @@ t_batch_insert(Config) ->
) )
). ).
t_auto_create_simple_insert(Config0) -> t_auto_create_simple_insert(Config) ->
ClientId = to_str(?FUNCTION_NAME), ClientId = to_str(?FUNCTION_NAME),
Config = get_auto_create_config(Config0),
?assertMatch( MakeMessageFun = fun() ->
{ok, _}, #{
create_bridge(Config) payload => ?PAYLOAD,
timestamp => 1668602148000,
second_ts => 1668602148000 + 100,
clientid => ClientId
}
end,
ok = emqx_bridge_v2_testlib:t_sync_query(
Config, MakeMessageFun, fun is_success_check/1, tdengine_connector_query_return
), ),
SentData = #{
payload => ?PAYLOAD,
timestamp => 1668602148000,
second_ts => 1668602148000 + 100,
clientid => ClientId
},
Request = {send_message, SentData},
{_, {ok, #{result := _Result}}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch( ?assertMatch(
[[?PAYLOAD]], [[?PAYLOAD]],
connect_and_query(Config, "SELECT payload FROM " ++ ClientId) connect_and_query(Config, "SELECT payload FROM " ++ ClientId)
@ -673,15 +432,10 @@ t_auto_create_simple_insert(Config0) ->
connect_and_query(Config, "DROP TABLE test_" ++ ClientId) connect_and_query(Config, "DROP TABLE test_" ++ ClientId)
). ).
t_auto_create_batch_insert(Config0) -> t_auto_create_batch_insert(Config) ->
ClientId1 = "client1", ClientId1 = "client1",
ClientId2 = "client2", ClientId2 = "client2",
Config = get_auto_create_config(Config0), ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
?assertMatch(
{ok, _},
create_bridge(Config)
),
Size1 = 2, Size1 = 2,
Size2 = 3, Size2 = 3,
@ -699,8 +453,7 @@ t_auto_create_batch_insert(Config0) ->
second_ts => Ts + Idx + Offset + 5000, second_ts => Ts + Idx + Offset + 5000,
clientid => ClientId clientid => ClientId
}, },
Request = {send_message, SentData}, send_message(Config, SentData)
query_resource(Config, Request)
end, end,
lists:seq(1, Size) lists:seq(1, Size)
) )
@ -738,17 +491,3 @@ t_auto_create_batch_insert(Config0) ->
end, end,
[ClientId1, ClientId2, "test_" ++ ClientId1, "test_" ++ ClientId2] [ClientId1, ClientId2, "test_" ++ ClientId1, "test_" ++ ClientId2]
). ).
to_bin(List) when is_list(List) ->
unicode:characters_to_binary(List, utf8);
to_bin(Bin) when is_binary(Bin) ->
Bin.
to_str(Atom) when is_atom(Atom) ->
erlang:atom_to_list(Atom).
get_auto_create_config(Config0) ->
Config = lists:keyreplace(template, 1, Config0, {template, ?AUTO_CREATE_BRIDGE}),
BridgeType = proplists:get_value(bridge_type, Config, <<"tdengine">>),
{_Name, TDConf} = tdengine_config(BridgeType, Config),
lists:keyreplace(tdengine_config, 1, Config, {tdengine_config, TDConf}).

View File

@ -60,6 +60,8 @@ resource_type(opents) ->
emqx_bridge_opents_connector; emqx_bridge_opents_connector;
resource_type(greptimedb) -> resource_type(greptimedb) ->
emqx_bridge_greptimedb_connector; emqx_bridge_greptimedb_connector;
resource_type(tdengine) ->
emqx_bridge_tdengine_connector;
resource_type(Type) -> resource_type(Type) ->
error({unknown_connector_type, Type}). error({unknown_connector_type, Type}).
@ -76,6 +78,8 @@ connector_impl_module(elasticsearch) ->
emqx_bridge_es_connector; emqx_bridge_es_connector;
connector_impl_module(opents) -> connector_impl_module(opents) ->
emqx_bridge_opents_connector; emqx_bridge_opents_connector;
connector_impl_module(tdengine) ->
emqx_bridge_tdengine_connector;
connector_impl_module(_ConnectorType) -> connector_impl_module(_ConnectorType) ->
undefined. undefined.
@ -235,6 +239,14 @@ connector_structs() ->
desc => <<"GreptimeDB Connector Config">>, desc => <<"GreptimeDB Connector Config">>,
required => false required => false
} }
)},
{tdengine,
mk(
hoconsc:map(name, ref(emqx_bridge_tdengine_connector, "config_connector")),
#{
desc => <<"TDengine Connector Config">>,
required => false
}
)} )}
]. ].
@ -258,7 +270,8 @@ schema_modules() ->
emqx_bridge_iotdb_connector, emqx_bridge_iotdb_connector,
emqx_bridge_es_connector, emqx_bridge_es_connector,
emqx_bridge_opents_connector, emqx_bridge_opents_connector,
emqx_bridge_greptimedb emqx_bridge_greptimedb,
emqx_bridge_tdengine_connector
]. ].
api_schemas(Method) -> api_schemas(Method) ->
@ -291,7 +304,8 @@ api_schemas(Method) ->
api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method),
api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method), api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method),
api_ref(emqx_bridge_opents_connector, <<"opents">>, Method), api_ref(emqx_bridge_opents_connector, <<"opents">>, Method),
api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector") api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"),
api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method)
]. ].
api_ref(Module, Type, Method) -> api_ref(Module, Type, Method) ->

View File

@ -162,7 +162,9 @@ connector_type_to_bridge_types(elasticsearch) ->
connector_type_to_bridge_types(opents) -> connector_type_to_bridge_types(opents) ->
[opents]; [opents];
connector_type_to_bridge_types(greptimedb) -> connector_type_to_bridge_types(greptimedb) ->
[greptimedb]. [greptimedb];
connector_type_to_bridge_types(tdengine) ->
[tdengine].
actions_config_name(action) -> <<"actions">>; actions_config_name(action) -> <<"actions">>;
actions_config_name(source) -> <<"sources">>. actions_config_name(source) -> <<"sources">>.

View File

@ -40,4 +40,10 @@ sql_template.desc:
sql_template.label: sql_template.label:
"""SQL Template""" """SQL Template"""
action_parameters.desc:
"""Tdengine action parameters"""
action_parameters.label:
"""Parameters"""
} }

View File

@ -8,4 +8,10 @@ The TDengine default port 6041 is used if `[:Port]` is not specified."""
server.label: server.label:
"""Server Host""" """Server Host"""
desc_config.desc:
"""Configuration for TDengine Connector."""
desc_config.label:
"""TDengine Connector Configuration"""
} }