From dfad020c495b165fe026d4e45cf788f47a3e1ef6 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 1 Feb 2024 23:12:59 +0800 Subject: [PATCH 1/2] feat(tdengine): improve the TDengine bridge to v2 style --- apps/emqx_bridge/src/emqx_action_info.erl | 3 +- .../src/emqx_bridge_opents_connector.erl | 2 +- .../src/emqx_bridge_tdengine.erl | 137 +++-- .../src/emqx_bridge_tdengine_action_info.erl | 71 +++ .../src/emqx_bridge_tdengine_connector.erl | 202 ++++-- .../test/emqx_bridge_tdengine_SUITE.erl | 573 +++++------------- .../src/schema/emqx_connector_ee_schema.erl | 18 +- .../src/schema/emqx_connector_schema.erl | 4 +- rel/i18n/emqx_bridge_tdengine.hocon | 6 + rel/i18n/emqx_bridge_tdengine_connector.hocon | 6 + 10 files changed, 492 insertions(+), 530 deletions(-) create mode 100644 apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_action_info.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index b495fa671..b83fa92bf 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -102,7 +102,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_iotdb_action_info, emqx_bridge_es_action_info, emqx_bridge_opents_action_info, - emqx_bridge_greptimedb_action_info + emqx_bridge_greptimedb_action_info, + emqx_bridge_tdengine_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index faa8c769c..68bdfc9ef 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -74,7 +74,7 @@ desc(connector_resource_opts) -> desc("config_connector") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> - ["Configuration for IoTDB using `", string:to_upper(Method), "` method."]; + ["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."]; desc(_) -> undefined. diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl index da170e943..3025ff55e 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl @@ -9,35 +9,21 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). --export([ - conn_bridge_examples/1, - values/1 -]). - --export([ - namespace/0, - roots/0, - fields/1, - desc/1 -]). +-export([conn_bridge_examples/1, values/1, bridge_v2_examples/1]). +-export([namespace/0, roots/0, fields/1, desc/1]). -define(DEFAULT_SQL, << - "insert into t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) " - "values (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})" + "insert into t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, " + "arrived) values (${ts}, '${id}', '${topic}', ${qos}, '${payload}', " + "${timestamp})" >>). +-define(CONNECTOR_TYPE, tdengine). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). %% ------------------------------------------------------------------------------------------------- -%% api - +%% v1 examples conn_bridge_examples(Method) -> - [ - #{ - <<"tdengine">> => #{ - summary => <<"TDengine Bridge">>, - value => values(Method) - } - } - ]. + [#{<<"tdengine">> => #{summary => <<"TDengine Bridge">>, value => values(Method)}}]. values(_Method) -> #{ @@ -51,21 +37,46 @@ values(_Method) -> password => <<"******">>, sql => ?DEFAULT_SQL, local_topic => <<"local/topic/#">>, - resource_opts => #{ - worker_pool_size => 8, - health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - batch_size => ?DEFAULT_BATCH_SIZE, - batch_time => ?DEFAULT_BATCH_TIME, - query_mode => sync, - max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + resource_opts => + #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => sync, + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + } + }. + +%% ------------------------------------------------------------------------------------------------- +%% v2 examples +bridge_v2_examples(Method) -> + [ + #{ + <<"tdengine">> => #{ + summary => <<"TDengine Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{ + parameters => #{ + database => <<"mqtt">>, + sql => ?DEFAULT_SQL } }. %% ------------------------------------------------------------------------------------------------- -%% Hocon Schema Definitions -namespace() -> "bridge_tdengine". +%% v1 Hocon Schema Definitions +namespace() -> + "bridge_tdengine". -roots() -> []. +roots() -> + []. fields("config") -> [ @@ -73,24 +84,68 @@ fields("config") -> {sql, mk( binary(), - #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + #{ + desc => ?DESC("sql_template"), + default => ?DEFAULT_SQL, + format => <<"sql">> + } )}, - {local_topic, - mk( - binary(), - #{desc => ?DESC("local_topic"), default => undefined} - )} - ] ++ emqx_resource_schema:fields("resource_opts") ++ + {local_topic, mk(binary(), #{desc => ?DESC("local_topic"), default => undefined})} + ] ++ + emqx_resource_schema:fields("resource_opts") ++ emqx_bridge_tdengine_connector:fields(config); fields("post") -> [type_field(), name_field() | fields("config")]; fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"); +%% ------------------------------------------------------------------------------------------------- +%% v2 Hocon Schema Definitions +fields(action) -> + {tdengine, + mk( + hoconsc:map(name, ref(?MODULE, action_config)), + #{ + desc => <<"TDengine Action Config">>, + required => false + } + )}; +fields(action_config) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk( + ref(?MODULE, action_parameters), + #{ + required => true, desc => ?DESC("action_parameters") + } + ) + ); +fields(action_parameters) -> + [ + {database, fun emqx_connector_schema_lib:database/1}, + {sql, + mk( + binary(), + #{ + desc => ?DESC("sql_template"), + default => ?DEFAULT_SQL, + format => <<"sql">> + } + )} + ]; +fields("post_bridge_v2") -> + emqx_bridge_schema:type_and_name_fields(enum([tdengine])) ++ fields(action_config); +fields("put_bridge_v2") -> + fields(action_config); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"). desc("config") -> ?DESC("desc_config"); +desc(action_config) -> + ?DESC("desc_config"); +desc(action_parameters) -> + ?DESC("action_parameters"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for TDengine using `", string:to_upper(Method), "` method."]; desc(_) -> diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_action_info.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_action_info.erl new file mode 100644 index 000000000..11db9c52e --- /dev/null +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_action_info.erl @@ -0,0 +1,71 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_tdengine_action_info). + +-behaviour(emqx_action_info). + +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + +%% behaviour callbacks +-export([ + action_type_name/0, + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + bridge_v1_type_name/0, + connector_action_config_to_bridge_v1_config/2, + connector_type_name/0, + schema_module/0 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(ACTION_TYPE, tdengine). +-define(SCHEMA_MODULE, emqx_bridge_tdengine). + +action_type_name() -> ?ACTION_TYPE. +bridge_v1_type_name() -> ?ACTION_TYPE. +connector_type_name() -> ?ACTION_TYPE. + +schema_module() -> ?SCHEMA_MODULE. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + MergedConfig = + emqx_utils_maps:deep_merge( + maps:without( + [<<"description">>, <<"local_topic">>, <<"connector">>, <<"data">>], + emqx_utils_maps:unindent(<<"parameters">>, ActionConfig) + ), + ConnectorConfig + ), + BridgeV1Keys = schema_keys("config"), + maps:with(BridgeV1Keys, MergedConfig). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(action_config), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ConnectorKeys = schema_keys(emqx_bridge_tdengine_connector, "config_connector"), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + maps:with(ConnectorKeys, BridgeV1Config) + ). + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + schema_keys(?SCHEMA_MODULE, Name). + +schema_keys(Mod, Name) -> + [bin(Key) || Key <- proplists:get_keys(Mod:fields(Name))]. diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 522007cbc..d35be0f2e 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -11,7 +11,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --export([roots/0, fields/1]). +-export([namespace/0, roots/0, fields/1, desc/1]). %% `emqx_resource' API -export([ @@ -20,9 +20,15 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). +-export([connector_examples/1]). + -export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -31,8 +37,12 @@ default_port => 6041 }). +-define(CONNECTOR_TYPE, tdengine). + +namespace() -> "tdengine_connector". + %%===================================================================== -%% Hocon schema +%% V1 Hocon schema roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. @@ -40,17 +50,45 @@ fields(config) -> [ {server, server()} | adjust_fields(emqx_connector_schema_lib:relational_db_fields()) - ]. + ]; +%%===================================================================== +%% V2 Hocon schema + +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields("post") -> + emqx_connector_schema:type_and_name_fields(enum([tdengine])) ++ fields("config_connector"); +fields("put") -> + fields("config_connector"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc(config) -> + ?DESC("desc_config"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); +desc("config_connector") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for TDengine using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. adjust_fields(Fields) -> - lists:map( + lists:filtermap( fun ({username, OrigUsernameFn}) -> - {username, add_default_fn(OrigUsernameFn, <<"root">>)}; + {true, {username, add_default_fn(OrigUsernameFn, <<"root">>)}}; ({password, _}) -> - {password, emqx_connector_schema_lib:password_field(#{required => true})}; - (Field) -> - Field + {true, {password, emqx_connector_schema_lib:password_field(#{required => true})}}; + ({database, _}) -> + false; + (_Field) -> + true end, Fields ). @@ -65,6 +103,32 @@ server() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?TD_HOST_OPTIONS). +%%===================================================================== +%% V2 Hocon schema +connector_examples(Method) -> + [ + #{ + <<"tdengine">> => + #{ + summary => <<"TDengine Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_example_values() + ) + } + } + ]. + +connector_example_values() -> + #{ + name => <<"tdengine_connector">>, + type => tdengine, + enable => true, + server => <<"127.0.0.1:6041">>, + pool_size => 8, + username => <<"root">>, + password => <<"******">> + }. + %%======================================================================================== %% `emqx_resource' API %%======================================================================================== @@ -93,11 +157,10 @@ on_start( {username, Username}, {password, Password}, {pool_size, PoolSize}, - {pool, binary_to_atom(InstanceId, utf8)} + {pool, InstanceId} ], - Prepares = parse_prepare_sql(Config), - State = Prepares#{pool_name => InstanceId, query_opts => query_opts(Config)}, + State = #{pool_name => InstanceId, channels => #{}}, case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; @@ -110,34 +173,33 @@ on_stop(InstanceId, _State) -> msg => "stopping_tdengine_connector", connector => InstanceId }), + ?tp(tdengine_connector_stop, #{instance_id => InstanceId}), emqx_resource_pool:stop(InstanceId). -on_query(InstanceId, {query, SQL}, State) -> - do_query(InstanceId, SQL, State); -on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> - case maps:find(Key, InsertTksMap) of - {ok, Tokens} when is_map(Data) -> - SQL = emqx_placeholder:proc_tmpl(Tokens, Data), - do_query(InstanceId, SQL, State); +on_query(InstanceId, {ChannelId, Data}, #{channels := Channels} = State) -> + case maps:find(ChannelId, Channels) of + {ok, #{insert := Tokens, opts := Opts}} -> + Query = emqx_placeholder:proc_tmpl(Tokens, Data), + do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State); _ -> - {error, {unrecoverable_error, invalid_request}} + {error, {unrecoverable_error, {invalid_channel_id, InstanceId}}} end. %% aggregate the batch queries to one SQL is a heavy job, we should put it in the worker process on_batch_query( InstanceId, - [{Key, _Data = #{}} | _] = BatchReq, - #{batch_tokens := BatchTksMap, query_opts := Opts} = State + [{ChannelId, _Data = #{}} | _] = BatchReq, + #{channels := Channels} = State ) -> - case maps:find(Key, BatchTksMap) of - {ok, Tokens} -> + case maps:find(ChannelId, Channels) of + {ok, #{batch := Tokens, opts := Opts}} -> do_query_job( InstanceId, {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]}, State ); _ -> - {error, {unrecoverable_error, batch_prepare_not_implemented}} + {error, {unrecoverable_error, {invalid_channel_id, InstanceId}}} end; on_batch_query(InstanceId, BatchReq, State) -> LogMeta = #{connector => InstanceId, request => BatchReq, state => State}, @@ -157,13 +219,46 @@ do_get_status(Conn) -> status_result(_Status = true) -> connected; status_result(_Status = false) -> connecting. +on_add_channel( + _InstanceId, + #{channels := Channels} = OldState, + ChannelId, + #{ + parameters := #{database := Database, sql := SQL} + } +) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + _ -> + case parse_prepare_sql(SQL) of + {ok, Result} -> + Opts = [{db_name, Database}], + Channels2 = Channels#{ChannelId => Result#{opts => Opts}}, + {ok, OldState#{channels := Channels2}}; + Error -> + Error + end + end. + +on_remove_channel(_InstanceId, #{channels := Channels} = OldState, ChannelId) -> + {ok, OldState#{channels => maps:remove(ChannelId, Channels)}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) -> + case maps:is_key(ChannelId, Channels) of + true -> + on_get_status(InstanceId, State); + _ -> + {error, not_exists} + end. + %%======================================================================================== %% Helper fns %%======================================================================================== -do_query(InstanceId, Query, #{query_opts := Opts} = State) -> - do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State). - do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> ?TRACE( "QUERY", @@ -171,12 +266,11 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> #{connector => InstanceId, job => Job, state => State} ), Result = ecpool:pick_and_do(PoolName, Job, no_handover), - case Result of {error, Reason} -> ?tp( tdengine_connector_query_return, - #{error => Reason} + #{instance_id => InstanceId, error => Reason} ), ?SLOG(error, #{ msg => "tdengine_connector_do_query_failed", @@ -193,7 +287,7 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) -> _ -> ?tp( tdengine_connector_query_return, - #{result => Result} + #{instance_id => InstanceId, result => Result} ), Result end. @@ -221,49 +315,23 @@ connect(Opts) -> NOpts = [{password, emqx_secret:unwrap(Secret)} | OptsRest], tdengine:start_link(NOpts). -query_opts(#{database := Database} = _Opts) -> - [{db_name, Database}]. - -parse_prepare_sql(Config) -> - SQL = - case maps:get(sql, Config, undefined) of - undefined -> #{}; - Template -> #{send_message => Template} - end, - - parse_batch_prepare_sql(maps:to_list(SQL), #{}, #{}). - -parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> - case emqx_utils_sql:get_statement_type(H) of - select -> - parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); +parse_prepare_sql(SQL) -> + case emqx_utils_sql:get_statement_type(SQL) of insert -> - InsertTks = emqx_placeholder:preproc_tmpl(H), - H1 = string:trim(H, trailing, ";"), - case split_insert_sql(H1) of + InsertTks = emqx_placeholder:preproc_tmpl(SQL), + SQL1 = string:trim(SQL, trailing, ";"), + case split_insert_sql(SQL1) of [_InsertPart, BatchDesc] -> BatchTks = emqx_placeholder:preproc_tmpl(BatchDesc), - parse_batch_prepare_sql( - T, - InsertTksMap#{Key => InsertTks}, - BatchTksMap#{Key => BatchTks} - ); + {ok, #{insert => InsertTks, batch => BatchTks}}; Result -> - ?SLOG(error, #{msg => "split_sql_failed", sql => H, result => Result}), - parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) + {error, #{msg => "split_sql_failed", sql => SQL, result => Result}} end; Type when is_atom(Type) -> - ?SLOG(error, #{msg => "detect_sql_type_unsupported", sql => H, type => Type}), - parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap); + {error, #{msg => "detect_sql_type_unsupported", sql => SQL, type => Type}}; {error, Reason} -> - ?SLOG(error, #{msg => "detect_sql_type_failed", sql => H, reason => Reason}), - parse_batch_prepare_sql(T, InsertTksMap, BatchTksMap) - end; -parse_batch_prepare_sql([], InsertTksMap, BatchTksMap) -> - #{ - insert_tokens => InsertTksMap, - batch_tokens => BatchTksMap - }. + {error, #{msg => "detect_sql_type_failed", sql => SQL, reason => Reason}} + end. to_bin(List) when is_list(List) -> unicode:characters_to_binary(List, utf8). diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 92ad3a611..610eca714 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -54,6 +54,11 @@ ok = tdengine:stop(Con) ). +-define(BRIDGE_TYPE_BIN, <<"tdengine">>). +-define(APPS, [ + hackney, tdengine, emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_tdengine +]). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -66,16 +71,21 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), - NonBatchCases = [t_write_timeout], MustBatchCases = [t_batch_insert, t_auto_create_batch_insert], BatchingGroups = [{group, with_batch}, {group, without_batch}], [ {async, BatchingGroups}, {sync, BatchingGroups}, - {with_batch, TCs -- NonBatchCases}, + {with_batch, TCs}, {without_batch, TCs -- MustBatchCases} ]. +init_per_suite(Config) -> + emqx_bridge_v2_testlib:init_per_suite(Config, ?APPS). + +end_per_suite(Config) -> + emqx_bridge_v2_testlib:end_per_suite(Config). + init_per_group(async, Config) -> [{query_mode, async} | Config]; init_per_group(sync, Config) -> @@ -89,36 +99,37 @@ init_per_group(without_batch, Config0) -> init_per_group(_Group, Config) -> Config. -end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> - connect_and_drop_table(Config), - ProxyHost = ?config(proxy_host, Config), - ProxyPort = ?config(proxy_port, Config), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), +end_per_group(default, Config) -> + emqx_bridge_v2_testlib:end_per_group(Config), ok; end_per_group(_Group, _Config) -> ok. -init_per_suite(Config) -> +init_per_testcase(TestCase, Config0) -> + connect_and_clear_table(Config0), + Type = ?config(bridge_type, Config0), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = << + (atom_to_binary(TestCase))/binary, UniqueNum/binary + >>, + {_ConfigString, ConnectorConfig} = connector_config(Name, Config0), + {_, ActionConfig} = action_config(TestCase, Name, Config0), + Config = [ + {connector_type, Type}, + {connector_name, Name}, + {connector_config, ConnectorConfig}, + {bridge_type, Type}, + {bridge_name, Name}, + {bridge_config, ActionConfig} + | Config0 + ], + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + ok = snabbkaffe:start_trace(), Config. -end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), - ok. - -init_per_testcase(_Testcase, Config) -> +end_per_testcase(TestCase, Config) -> + emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config), connect_and_clear_table(Config), - delete_bridge(Config), - snabbkaffe:start_trace(), - Config. - -end_per_testcase(_Testcase, Config) -> - ProxyHost = ?config(proxy_host, Config), - ProxyPort = ?config(proxy_port, Config), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - connect_and_clear_table(Config), - ok = snabbkaffe:stop(), - delete_bridge(Config), ok. %%------------------------------------------------------------------------------ @@ -132,34 +143,14 @@ common_init(ConfigT) -> Config0 = [ {td_host, Host}, {td_port, Port}, - {proxy_name, "tdengine_restful"}, - {template, ?SQL_BRIDGE} + {proxy_name, "tdengine_restful"} | ConfigT ], - BridgeType = proplists:get_value(bridge_type, Config0, <<"tdengine">>), case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of true -> - % Setup toxiproxy - ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), - ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - % Ensure enterprise bridge module is loaded - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, tdengine]), - _ = emqx_bridge_enterprise:module_info(), - emqx_mgmt_api_test_util:init_suite(), - % Connect to tdengine directly and create the table - connect_and_create_table(Config0), - {Name, TDConf} = tdengine_config(BridgeType, Config0), - Config = - [ - {tdengine_config, TDConf}, - {tdengine_bridge_type, BridgeType}, - {tdengine_name, Name}, - {proxy_host, ProxyHost}, - {proxy_port, ProxyPort} - | Config0 - ], + Config = emqx_bridge_v2_testlib:init_per_group(default, ?BRIDGE_TYPE_BIN, Config0), + connect_and_create_table(Config), Config; false -> case os:getenv("IS_CI") of @@ -170,97 +161,100 @@ common_init(ConfigT) -> end end. -tdengine_config(BridgeType, Config) -> - Port = integer_to_list(?config(td_port, Config)), - Server = ?config(td_host, Config) ++ ":" ++ Port, - Name = atom_to_binary(?MODULE), +action_config(TestCase, Name, Config) -> + Type = ?config(bridge_type, Config), BatchSize = case ?config(enable_batch, Config) of true -> ?BATCH_SIZE; false -> 1 end, QueryMode = ?config(query_mode, Config), - Template = ?config(template, Config), ConfigString = io_lib:format( - "bridges.~s.~s {\n" + "actions.~s.~s {\n" " enable = true\n" - " server = ~p\n" - " database = ~p\n" - " username = ~p\n" - " password = ~p\n" - " sql = ~p\n" + " connector = \"~s\"\n" + " parameters = {\n" + " database = ~p\n" + " sql = ~p\n" + " }\n" " resource_opts = {\n" " request_ttl = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" - "}", + "}\n", [ - BridgeType, + Type, + Name, Name, - Server, ?TD_DATABASE, - ?TD_USERNAME, - ?TD_PASSWORD, - Template, + case TestCase of + Auto when + Auto =:= t_auto_create_simple_insert; Auto =:= t_auto_create_batch_insert + -> + ?AUTO_CREATE_BRIDGE; + _ -> + ?SQL_BRIDGE + end, BatchSize, QueryMode ] ), - {Name, parse_and_check(ConfigString, BridgeType, Name)}. + ct:pal("ActionConfig:~ts~n", [ConfigString]), + {ConfigString, parse_action_and_check(ConfigString, Type, Name)}. -parse_and_check(ConfigString, BridgeType, Name) -> +connector_config(Name, Config) -> + Host = ?config(td_host, Config), + Port = ?config(td_port, Config), + Type = ?config(bridge_type, Config), + Server = Host ++ ":" ++ integer_to_list(Port), + ConfigString = + io_lib:format( + "connectors.~s.~s {\n" + " enable = true\n" + " server = \"~s\"\n" + " username = ~p\n" + " password = ~p\n" + "}\n", + [ + Type, + Name, + Server, + ?TD_USERNAME, + ?TD_PASSWORD + ] + ), + ct:pal("ConnectorConfig:~ts~n", [ConfigString]), + {ConfigString, parse_connector_and_check(ConfigString, Type, Name)}. + +parse_action_and_check(ConfigString, BridgeType, Name) -> + parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name). + +parse_connector_and_check(ConfigString, ConnectorType, Name) -> + parse_and_check( + ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name + ). + +parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) -> + Type = to_bin(Type0), {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), - hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), - #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, + hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}), + #{RootKey := #{Type := #{Name := Config}}} = RawConf, Config. -create_bridge(Config) -> - create_bridge(Config, _Overrides = #{}). - -create_bridge(Config, Overrides) -> - BridgeType = ?config(tdengine_bridge_type, Config), - Name = ?config(tdengine_name, Config), - TDConfig0 = ?config(tdengine_config, Config), - TDConfig = emqx_utils_maps:deep_merge(TDConfig0, Overrides), - emqx_bridge:create(BridgeType, Name, TDConfig). - -delete_bridge(Config) -> - BridgeType = ?config(tdengine_bridge_type, Config), - Name = ?config(tdengine_name, Config), - emqx_bridge:remove(BridgeType, Name). - -create_bridge_http(Params) -> - Path = emqx_mgmt_api_test_util:api_path(["bridges"]), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of - {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; - Error -> Error - end. +to_bin(List) when is_list(List) -> + unicode:characters_to_binary(List, utf8); +to_bin(Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom); +to_bin(Bin) when is_binary(Bin) -> + Bin. send_message(Config, Payload) -> - Name = ?config(tdengine_name, Config), - BridgeType = ?config(tdengine_bridge_type, Config), - BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), - emqx_bridge:send_message(BridgeID, Payload). - -query_resource(Config, Request) -> - Name = ?config(tdengine_name, Config), - BridgeType = ?config(tdengine_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). - -query_resource_async(Config, Request) -> - Name = ?config(tdengine_name, Config), - BridgeType = ?config(tdengine_bridge_type, Config), - Ref = alias([reply]), - AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end, - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - Return = emqx_resource:query(ResourceID, Request, #{ - timeout => 500, async_reply_fun => {AsyncReplyFun, []} - }), - {Return, Ref}. + BridgeType = ?config(bridge_type, Config), + Name = ?config(bridge_name, Config), + ct:print(">>> Name:~p~n BridgeType:~p~n", [Name, BridgeType]), + emqx_bridge_v2:send_message(BridgeType, Name, Payload, #{}). receive_result(Ref, Timeout) -> receive @@ -287,17 +281,13 @@ connect_direct_tdengine(Config) -> % These funs connect and then stop the tdengine connection connect_and_create_table(Config) -> ?WITH_CON(begin + {ok, _} = directly_query(Con, ?SQL_DROP_TABLE), + {ok, _} = directly_query(Con, ?SQL_DROP_STABLE), {ok, _} = directly_query(Con, ?SQL_CREATE_DATABASE, []), {ok, _} = directly_query(Con, ?SQL_CREATE_TABLE), {ok, _} = directly_query(Con, ?SQL_CREATE_STABLE) end). -connect_and_drop_table(Config) -> - ?WITH_CON(begin - {ok, _} = directly_query(Con, ?SQL_DROP_TABLE), - {ok, _} = directly_query(Con, ?SQL_DROP_STABLE) - end). - connect_and_clear_table(Config) -> ?WITH_CON({ok, _} = directly_query(Con, ?SQL_DELETE)). @@ -322,275 +312,53 @@ directly_query(Con, Query) -> directly_query(Con, Query, QueryOpts) -> tdengine:insert(Con, Query, QueryOpts). +is_success_check(Result) -> + ?assertMatch({ok, #{<<"code">> := 0}}, Result). + +to_str(Atom) when is_atom(Atom) -> + erlang:atom_to_list(Atom). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ -t_setup_via_config_and_publish(Config) -> - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), - SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}, - ?check_trace( - begin - {_, {ok, #{result := Result}}} = - ?wait_async_action( - send_message(Config, SentData), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), - ?assertMatch( - {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Result - ), - ?assertMatch( - [[?PAYLOAD], [?PAYLOAD]], - connect_and_get_payload(Config) - ), - ok - end, - fun(Trace0) -> - Trace = ?of_kind(tdengine_connector_query_return, Trace0), - ?assertMatch([#{result := {ok, #{<<"code">> := 0, <<"rows">> := 1}}}], Trace), - ok - end - ), - ok. +t_create_via_http(Config) -> + emqx_bridge_v2_testlib:t_create_via_http(Config). -t_setup_via_http_api_and_publish(Config) -> - BridgeType = ?config(tdengine_bridge_type, Config), - Name = ?config(tdengine_name, Config), - QueryMode = ?config(query_mode, Config), - TDengineConfig0 = ?config(tdengine_config, Config), - TDengineConfig = TDengineConfig0#{ - <<"name">> => Name, - <<"type">> => BridgeType - }, - ?assertMatch( - {ok, _}, - create_bridge_http(TDengineConfig) - ), +t_on_get_status(Config) -> + emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}). - SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}, - ?check_trace( - begin - Request = {send_message, SentData}, - Res0 = - case QueryMode of - sync -> - query_resource(Config, Request); - async -> - {_, Ref} = query_resource_async(Config, Request), - {ok, Res} = receive_result(Ref, 2_000), - Res - end, +t_start_stop(Config) -> + emqx_bridge_v2_testlib:t_start_stop(Config, tdengine_connector_stop). - ?assertMatch( - {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Res0 - ), - ?assertMatch( - [[?PAYLOAD], [?PAYLOAD]], - connect_and_get_payload(Config) - ), - ok - end, - fun(Trace0) -> - Trace = ?of_kind(tdengine_connector_query_return, Trace0), - ?assertMatch([#{result := {ok, #{<<"code">> := 0, <<"rows">> := 1}}}], Trace), - ok - end - ), - ok. - -t_get_status(Config) -> - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), - ProxyPort = ?config(proxy_port, Config), - ProxyHost = ?config(proxy_host, Config), - ProxyName = ?config(proxy_name, Config), - - Name = ?config(tdengine_name, Config), - BridgeType = ?config(tdengine_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), - emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> +t_invalid_data(Config) -> + MakeMessageFun = fun() -> #{} end, + IsSuccessCheck = fun(Result) -> ?assertMatch( - {ok, Status} when Status =:= disconnected orelse Status =:= connecting, - emqx_resource_manager:health_check(ResourceID) + {error, #{ + <<"code">> := 534, + <<"desc">> := _ + }}, + Result ) - end), - ok. - -t_write_failure(Config) -> - ProxyName = ?config(proxy_name, Config), - ProxyPort = ?config(proxy_port, Config), - ProxyHost = ?config(proxy_host, Config), - {ok, _} = create_bridge(Config), - SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}, - emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - {_, {ok, #{result := Result}}} = - ?wait_async_action( - send_message(Config, SentData), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), - case Result of - {error, Reason} when Reason =:= econnrefused; Reason =:= closed -> - ok; - _ -> - throw({unexpected, Result}) - end, - ok - end), - ok. - -% This test doesn't work with batch enabled since it is not possible -% to set the timeout directly for batch queries -t_write_timeout(Config) -> - ProxyName = ?config(proxy_name, Config), - ProxyPort = ?config(proxy_port, Config), - ProxyHost = ?config(proxy_host, Config), - QueryMode = ?config(query_mode, Config), - {ok, _} = create_bridge( - Config, - #{ - <<"resource_opts">> => #{ - <<"request_ttl">> => <<"500ms">>, - <<"resume_interval">> => <<"100ms">>, - <<"health_check_interval">> => <<"100ms">> - } - } - ), - SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}, - %% FIXME: TDengine connector hangs indefinetily during - %% `call_query' while the connection is unresponsive. Should add - %% a timeout to `APPLY_RESOURCE' in buffer worker?? - case QueryMode of - sync -> - emqx_common_test_helpers:with_failure( - timeout, ProxyName, ProxyHost, ProxyPort, fun() -> - ?assertMatch( - {error, {resource_error, #{reason := timeout}}}, - query_resource(Config, {send_message, SentData}) - ) - end - ); - async -> - ct:comment("tdengine connector hangs the buffer worker forever") end, - ok. - -t_simple_sql_query(Config) -> - EnableBatch = ?config(enable_batch, Config), - ?assertMatch( - {ok, _}, - create_bridge(Config) + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, IsSuccessCheck, tdengine_connector_query_return ), - Request = {query, <<"SELECT 1 AS T">>}, - {_, {ok, #{result := Result}}} = - ?wait_async_action( - query_resource(Config, Request), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), - case EnableBatch of - true -> - ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); - false -> - ?assertMatch({ok, #{<<"code">> := 0, <<"data">> := [[1]]}}, Result) - end, + ok. -t_missing_data(Config) -> - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), - {_, {ok, #{result := Result}}} = - ?wait_async_action( - send_message(Config, #{}), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), - ?assertMatch( - {error, #{ - <<"code">> := 534, - <<"desc">> := _ - }}, - Result - ), - ok. - -t_bad_sql_parameter(Config) -> - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), - Request = {send_message, <<"">>}, - {_, {ok, #{result := Result}}} = - ?wait_async_action( - query_resource(Config, Request), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), - - ?assertMatch({error, {unrecoverable_error, invalid_request}}, Result), - ok. - -%% TODO -%% For supporting to generate a subtable name by mixing prefixes/suffixes with placeholders, -%% the SQL quote(escape) is removed now, -%% we should introduce a new syntax for placeholders to allow some vars to keep unquote. -%% t_nasty_sql_string(Config) -> -%% ?assertMatch( -%% {ok, _}, -%% create_bridge(Config) -%% ), -%% % NOTE -%% % Column `payload` has BINARY type, so we would certainly like to test it -%% % with `lists:seq(1, 127)`, but: -%% % 1. There's no way to insert zero byte in an SQL string, seems that TDengine's -%% % parser[1] has no escaping sequence for it so a zero byte probably confuses -%% % interpreter somewhere down the line. -%% % 2. Bytes > 127 come back as U+FFFDs (i.e. replacement characters) in UTF-8 for -%% % some reason. -%% % -%% % [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301 -%% Payload = list_to_binary(lists:seq(1, 127)), -%% Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)}, -%% {_, {ok, #{result := Result}}} = -%% ?wait_async_action( -%% send_message(Config, Message), -%% #{?snk_kind := buffer_worker_flush_ack}, -%% 2_000 -%% ), -%% ?assertMatch( -%% {ok, #{<<"code">> := 0, <<"rows">> := 1}}, -%% Result -%% ), -%% ?assertEqual( -%% Payload, -%% connect_and_get_payload(Config) -%% ). - t_simple_insert(Config) -> connect_and_clear_table(Config), - ?assertMatch( - {ok, _}, - create_bridge(Config) + + MakeMessageFun = fun() -> + #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010} + end, + + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, fun is_success_check/1, tdengine_connector_query_return ), - SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}, - Request = {send_message, SentData}, - {_, {ok, #{result := _Result}}} = - ?wait_async_action( - query_resource(Config, Request), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), ?assertMatch( [[?PAYLOAD], [?PAYLOAD]], connect_and_get_payload(Config) @@ -598,10 +366,7 @@ t_simple_insert(Config) -> t_batch_insert(Config) -> connect_and_clear_table(Config), - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), + ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)), Size = 5, Ts = erlang:system_time(millisecond), @@ -612,8 +377,7 @@ t_batch_insert(Config) -> SentData = #{ payload => ?PAYLOAD, timestamp => Ts + Idx, second_ts => Ts + Idx + 5000 }, - Request = {send_message, SentData}, - query_resource(Config, Request) + send_message(Config, SentData) end, lists:seq(1, Size) ), @@ -632,27 +396,22 @@ t_batch_insert(Config) -> ) ). -t_auto_create_simple_insert(Config0) -> +t_auto_create_simple_insert(Config) -> ClientId = to_str(?FUNCTION_NAME), - Config = get_auto_create_config(Config0), - ?assertMatch( - {ok, _}, - create_bridge(Config) + + MakeMessageFun = fun() -> + #{ + payload => ?PAYLOAD, + timestamp => 1668602148000, + second_ts => 1668602148000 + 100, + clientid => ClientId + } + end, + + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, fun is_success_check/1, tdengine_connector_query_return ), - SentData = #{ - payload => ?PAYLOAD, - timestamp => 1668602148000, - second_ts => 1668602148000 + 100, - clientid => ClientId - }, - Request = {send_message, SentData}, - {_, {ok, #{result := _Result}}} = - ?wait_async_action( - query_resource(Config, Request), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), ?assertMatch( [[?PAYLOAD]], connect_and_query(Config, "SELECT payload FROM " ++ ClientId) @@ -673,15 +432,10 @@ t_auto_create_simple_insert(Config0) -> connect_and_query(Config, "DROP TABLE test_" ++ ClientId) ). -t_auto_create_batch_insert(Config0) -> +t_auto_create_batch_insert(Config) -> ClientId1 = "client1", ClientId2 = "client2", - Config = get_auto_create_config(Config0), - - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), + ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)), Size1 = 2, Size2 = 3, @@ -699,8 +453,7 @@ t_auto_create_batch_insert(Config0) -> second_ts => Ts + Idx + Offset + 5000, clientid => ClientId }, - Request = {send_message, SentData}, - query_resource(Config, Request) + send_message(Config, SentData) end, lists:seq(1, Size) ) @@ -738,17 +491,3 @@ t_auto_create_batch_insert(Config0) -> end, [ClientId1, ClientId2, "test_" ++ ClientId1, "test_" ++ ClientId2] ). - -to_bin(List) when is_list(List) -> - unicode:characters_to_binary(List, utf8); -to_bin(Bin) when is_binary(Bin) -> - Bin. - -to_str(Atom) when is_atom(Atom) -> - erlang:atom_to_list(Atom). - -get_auto_create_config(Config0) -> - Config = lists:keyreplace(template, 1, Config0, {template, ?AUTO_CREATE_BRIDGE}), - BridgeType = proplists:get_value(bridge_type, Config, <<"tdengine">>), - {_Name, TDConf} = tdengine_config(BridgeType, Config), - lists:keyreplace(tdengine_config, 1, Config, {tdengine_config, TDConf}). diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 95c9d2991..1a66cda15 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -60,6 +60,8 @@ resource_type(opents) -> emqx_bridge_opents_connector; resource_type(greptimedb) -> emqx_bridge_greptimedb_connector; +resource_type(tdengine) -> + emqx_bridge_tdengine_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -76,6 +78,8 @@ connector_impl_module(elasticsearch) -> emqx_bridge_es_connector; connector_impl_module(opents) -> emqx_bridge_opents_connector; +connector_impl_module(tdengine) -> + emqx_bridge_tdengine_connector; connector_impl_module(_ConnectorType) -> undefined. @@ -235,6 +239,14 @@ connector_structs() -> desc => <<"GreptimeDB Connector Config">>, required => false } + )}, + {tdengine, + mk( + hoconsc:map(name, ref(emqx_bridge_tdengine_connector, "config_connector")), + #{ + desc => <<"TDengine Connector Config">>, + required => false + } )} ]. @@ -258,7 +270,8 @@ schema_modules() -> emqx_bridge_iotdb_connector, emqx_bridge_es_connector, emqx_bridge_opents_connector, - emqx_bridge_greptimedb + emqx_bridge_greptimedb, + emqx_bridge_tdengine_connector ]. api_schemas(Method) -> @@ -291,7 +304,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method), api_ref(emqx_bridge_opents_connector, <<"opents">>, Method), - api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector") + api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_connector"), + api_ref(emqx_bridge_tdengine_connector, <<"tdengine">>, Method) ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index b7c4d9f74..0b139772d 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -162,7 +162,9 @@ connector_type_to_bridge_types(elasticsearch) -> connector_type_to_bridge_types(opents) -> [opents]; connector_type_to_bridge_types(greptimedb) -> - [greptimedb]. + [greptimedb]; +connector_type_to_bridge_types(tdengine) -> + [tdengine]. actions_config_name(action) -> <<"actions">>; actions_config_name(source) -> <<"sources">>. diff --git a/rel/i18n/emqx_bridge_tdengine.hocon b/rel/i18n/emqx_bridge_tdengine.hocon index ec6c10779..ac2ac3a7b 100644 --- a/rel/i18n/emqx_bridge_tdengine.hocon +++ b/rel/i18n/emqx_bridge_tdengine.hocon @@ -40,4 +40,10 @@ sql_template.desc: sql_template.label: """SQL Template""" +action_parameters.desc: +"""Tdengine action parameters""" + +action_parameters.label: +"""Parameters""" + } diff --git a/rel/i18n/emqx_bridge_tdengine_connector.hocon b/rel/i18n/emqx_bridge_tdengine_connector.hocon index 9c42dbaa0..ff7340cca 100644 --- a/rel/i18n/emqx_bridge_tdengine_connector.hocon +++ b/rel/i18n/emqx_bridge_tdengine_connector.hocon @@ -8,4 +8,10 @@ The TDengine default port 6041 is used if `[:Port]` is not specified.""" server.label: """Server Host""" +desc_config.desc: +"""Configuration for TDengine Connector.""" + +desc_config.label: +"""TDengine Connector Configuration""" + } From 2241461acba7aa6621fd0cf9e16780df9ec20892 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 1 Feb 2024 23:18:20 +0800 Subject: [PATCH 2/2] chore: update change & bump version --- apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src | 2 +- apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl | 4 ++-- changes/ee/feat-12449.en.md | 1 + rel/i18n/emqx_bridge_tdengine.hocon | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) create mode 100644 changes/ee/feat-12449.en.md diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src index 5375a6ba9..898a3211d 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_tdengine, [ {description, "EMQX Enterprise TDEngine Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 610eca714..511518bda 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -281,8 +281,8 @@ connect_direct_tdengine(Config) -> % These funs connect and then stop the tdengine connection connect_and_create_table(Config) -> ?WITH_CON(begin - {ok, _} = directly_query(Con, ?SQL_DROP_TABLE), - {ok, _} = directly_query(Con, ?SQL_DROP_STABLE), + _ = directly_query(Con, ?SQL_DROP_TABLE), + _ = directly_query(Con, ?SQL_DROP_STABLE), {ok, _} = directly_query(Con, ?SQL_CREATE_DATABASE, []), {ok, _} = directly_query(Con, ?SQL_CREATE_TABLE), {ok, _} = directly_query(Con, ?SQL_CREATE_STABLE) diff --git a/changes/ee/feat-12449.en.md b/changes/ee/feat-12449.en.md new file mode 100644 index 000000000..b9a9115d5 --- /dev/null +++ b/changes/ee/feat-12449.en.md @@ -0,0 +1 @@ +The bridges for TDengine have been split so it is available via the connectors and actions APIs. They are still backwards compatible with the old bridge API. diff --git a/rel/i18n/emqx_bridge_tdengine.hocon b/rel/i18n/emqx_bridge_tdengine.hocon index ac2ac3a7b..914bbae36 100644 --- a/rel/i18n/emqx_bridge_tdengine.hocon +++ b/rel/i18n/emqx_bridge_tdengine.hocon @@ -41,7 +41,7 @@ sql_template.label: """SQL Template""" action_parameters.desc: -"""Tdengine action parameters""" +"""TDengine action parameters""" action_parameters.label: """Parameters"""