From e337e1dc408369cc8486f2ee60ee3b2701f9c26a Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 22 Jan 2024 20:45:10 +0800 Subject: [PATCH] feat(opents): improve the OpentsDB bridge to v2 style --- apps/emqx_bridge/src/emqx_action_info.erl | 3 +- .../src/emqx_bridge_opents.erl | 128 ++++- .../src/emqx_bridge_opents_action_info.erl | 71 +++ .../src/emqx_bridge_opents_connector.erl | 194 ++++++- .../test/emqx_bridge_opents_SUITE.erl | 484 +++++++----------- .../src/schema/emqx_connector_ee_schema.erl | 18 +- .../src/schema/emqx_connector_schema.erl | 4 +- rel/i18n/emqx_bridge_opents.hocon | 31 ++ rel/i18n/emqx_bridge_opents_connector.hocon | 6 + 9 files changed, 607 insertions(+), 332 deletions(-) create mode 100644 apps/emqx_bridge_opents/src/emqx_bridge_opents_action_info.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index d80050191..4f6228998 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -98,7 +98,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_timescale_action_info, emqx_bridge_redis_action_info, emqx_bridge_iotdb_action_info, - emqx_bridge_es_action_info + emqx_bridge_es_action_info, + emqx_bridge_opents_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl index cfb12453d..7e490576f 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_opents). @@ -7,10 +7,12 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). +-import(hoconsc, [mk/2, enum/1, ref/2, array/1]). -export([ - conn_bridge_examples/1 + conn_bridge_examples/1, + bridge_v2_examples/1, + default_data_template/0 ]). -export([ @@ -20,8 +22,11 @@ desc/1 ]). +-define(CONNECTOR_TYPE, opents). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). + %% ------------------------------------------------------------------------------------------------- -%% api +%% v1 examples conn_bridge_examples(Method) -> [ #{ @@ -34,7 +39,7 @@ conn_bridge_examples(Method) -> values(_Method) -> #{ - enable => true, + enabledb => true, type => opents, name => <<"foo">>, server => <<"http://127.0.0.1:4242">>, @@ -50,7 +55,37 @@ values(_Method) -> }. %% ------------------------------------------------------------------------------------------------- -%% Hocon Schema Definitions +%% v2 examples +bridge_v2_examples(Method) -> + [ + #{ + <<"opents">> => #{ + summary => <<"OpenTSDB Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{ + parameters => #{ + data => default_data_template() + } + }. + +default_data_template() -> + [ + #{ + metric => <<"${metric}">>, + tags => <<"${tags}">>, + value => <<"${value}">> + } + ]. + +%% ------------------------------------------------------------------------------------------------- +%% V1 Schema Definitions namespace() -> "bridge_opents". roots() -> []. @@ -65,10 +100,89 @@ fields("post") -> fields("put") -> fields("config"); fields("get") -> - emqx_bridge_schema:status_fields() ++ fields("post"). + emqx_bridge_schema:status_fields() ++ fields("post"); +%% ------------------------------------------------------------------------------------------------- +%% V2 Schema Definitions + +fields(action) -> + {opents, + mk( + hoconsc:map(name, ref(?MODULE, action_config)), + #{ + desc => <<"OpenTSDB Action Config">>, + required => false + } + )}; +fields(action_config) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk( + ref(?MODULE, action_parameters), + #{ + required => true, desc => ?DESC("action_parameters") + } + ) + ); +fields(action_parameters) -> + [ + {data, + mk( + array(ref(?MODULE, action_parameters_data)), + #{ + desc => ?DESC("action_parameters_data"), + default => <<"[]">> + } + )} + ]; +fields(action_parameters_data) -> + [ + {timestamp, + mk( + binary(), + #{ + desc => ?DESC("config_parameters_timestamp"), + required => false + } + )}, + {metric, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_metric") + } + )}, + {tags, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_tags") + } + )}, + {value, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_value") + } + )} + ]; +fields("post_bridge_v2") -> + emqx_bridge_schema:type_and_name_fields(enum([opents])) ++ fields(action_config); +fields("put_bridge_v2") -> + fields(action_config); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"). desc("config") -> ?DESC("desc_config"); +desc(action_config) -> + ?DESC("desc_config"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc(action_parameters_data) -> + ?DESC("action_parameters_data"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."]; desc(_) -> diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_action_info.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_action_info.erl new file mode 100644 index 000000000..4c4c9568c --- /dev/null +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_action_info.erl @@ -0,0 +1,71 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_opents_action_info). + +-behaviour(emqx_action_info). + +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + +%% behaviour callbacks +-export([ + action_type_name/0, + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + bridge_v1_type_name/0, + connector_action_config_to_bridge_v1_config/2, + connector_type_name/0, + schema_module/0 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(ACTION_TYPE, opents). +-define(SCHEMA_MODULE, emqx_bridge_opents). + +action_type_name() -> ?ACTION_TYPE. +bridge_v1_type_name() -> ?ACTION_TYPE. +connector_type_name() -> ?ACTION_TYPE. + +schema_module() -> ?SCHEMA_MODULE. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + MergedConfig = + emqx_utils_maps:deep_merge( + maps:without( + [<<"description">>, <<"local_topic">>, <<"connector">>, <<"data">>], + emqx_utils_maps:unindent(<<"parameters">>, ActionConfig) + ), + ConnectorConfig + ), + BridgeV1Keys = schema_keys("config"), + maps:with(BridgeV1Keys, MergedConfig). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(action_config), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ConnectorKeys = schema_keys(emqx_bridge_opents_connector, "config_connector"), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + maps:with(ConnectorKeys, BridgeV1Config) + ). + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config#{<<"data">> => []}), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + schema_keys(?SCHEMA_MODULE, Name). + +schema_keys(Mod, Name) -> + [bin(Key) || Key <- proplists:get_keys(Mod:fields(Name))]. diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 9271abe15..6af1e2f55 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_opents_connector). @@ -12,7 +12,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --export([roots/0, fields/1]). +-export([namespace/0, roots/0, fields/1, desc/1]). %% `emqx_resource' API -export([ @@ -21,15 +21,25 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). +-export([connector_examples/1]). + -export([connect/1]). -import(hoconsc, [mk/2, enum/1, ref/2]). +-define(CONNECTOR_TYPE, opents). + +namespace() -> "opents_connector". + %%===================================================================== -%% Hocon schema +%% V1 Hocon schema roots() -> [{config, #{type => hoconsc:ref(?MODULE, config)}}]. @@ -40,8 +50,56 @@ fields(config) -> {summary, mk(boolean(), #{default => true, desc => ?DESC("summary")})}, {details, mk(boolean(), #{default => false, desc => ?DESC("details")})}, {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} + ]; +%%===================================================================== +%% V2 Hocon schema + +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + proplists_without([auto_reconnect], fields(config)); +fields("post") -> + emqx_connector_schema:type_and_name_fields(enum([opents])) ++ fields("config_connector"); +fields("put") -> + fields("config_connector"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc(config) -> + ?DESC("desc_config"); +desc("config_connector") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for IoTDB using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +proplists_without(Keys, List) -> + [El || El = {K, _} <- List, not lists:member(K, Keys)]. + +%%===================================================================== +%% V2 examples +connector_examples(Method) -> + [ + #{ + <<"opents">> => + #{ + summary => <<"OpenTSDB Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_example_values() + ) + } + } ]. +connector_example_values() -> + #{ + name => <<"opents_connector">>, + type => opents, + enable => true, + server => <<"http://localhost:4242/">>, + pool_size => 8 + }. + %%======================================================================================== %% `emqx_resource' API %%======================================================================================== @@ -56,8 +114,7 @@ on_start( server := Server, pool_size := PoolSize, summary := Summary, - details := Details, - resource_opts := #{batch_size := BatchSize} + details := Details } = Config ) -> ?SLOG(info, #{ @@ -70,11 +127,10 @@ on_start( {server, to_str(Server)}, {summary, Summary}, {details, Details}, - {max_batch_size, BatchSize}, {pool_size, PoolSize} ], - State = #{pool_name => InstanceId, server => Server}, + State = #{pool_name => InstanceId, server => Server, channels => #{}}, case opentsdb_connectivity(Server) of ok -> case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of @@ -93,6 +149,7 @@ on_stop(InstanceId, _State) -> msg => "stopping_opents_connector", connector => InstanceId }), + ?tp(opents_bridge_stopped, #{instance_id => InstanceId}), emqx_resource_pool:stop(InstanceId). on_query(InstanceId, Request, State) -> @@ -101,10 +158,14 @@ on_query(InstanceId, Request, State) -> on_batch_query( InstanceId, BatchReq, - State + #{channels := Channels} = State ) -> - Datas = [format_opentsdb_msg(Msg) || {_Key, Msg} <- BatchReq], - do_query(InstanceId, Datas, State). + case try_render_messages(BatchReq, Channels) of + {ok, Datas} -> + do_query(InstanceId, Datas, State); + Error -> + Error + end. on_get_status(_InstanceId, #{server := Server}) -> Result = @@ -117,6 +178,39 @@ on_get_status(_InstanceId, #{server := Server}) -> end, Result. +on_add_channel( + _InstanceId, + #{channels := Channels} = OldState, + ChannelId, + #{ + parameters := #{data := Data} = Parameter + } +) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + _ -> + Channel = Parameter#{ + data := preproc_data_template(Data) + }, + Channels2 = Channels#{ChannelId => Channel}, + {ok, OldState#{channels := Channels2}} + end. + +on_remove_channel(_InstanceId, #{channels := Channels} = OldState, ChannelId) -> + {ok, OldState#{channels => maps:remove(ChannelId, Channels)}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) -> + case maps:is_key(ChannelId, Channels) of + true -> + on_get_status(InstanceId, State); + _ -> + {error, not_exists} + end. + %%======================================================================================== %% Helper fns %%======================================================================================== @@ -127,6 +221,9 @@ do_query(InstanceId, Query, #{pool_name := PoolName} = State) -> "opents_connector_received", #{connector => InstanceId, query => Query, state => State} ), + + ?tp(opents_bridge_on_query, #{instance_id => InstanceId}), + Result = ecpool:pick_and_do(PoolName, {opentsdb, put, [Query]}, no_handover), case Result of @@ -172,17 +269,66 @@ opentsdb_connectivity(Server) -> end, emqx_connector_lib:http_connectivity(SvrUrl, ?HTTP_CONNECT_TIMEOUT). -format_opentsdb_msg(Msg) -> - maps:with( - [ - timestamp, - metric, - tags, - value, - <<"timestamp">>, - <<"metric">>, - <<"tags">>, - <<"value">> - ], - Msg +try_render_messages([{ChannelId, _} | _] = BatchReq, Channels) -> + case maps:find(ChannelId, Channels) of + {ok, Channel} -> + {ok, + lists:foldl( + fun({_, Message}, Acc) -> + render_channel_message(Message, Channel, Acc) + end, + [], + BatchReq + )}; + _ -> + {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} + end. + +render_channel_message(Msg, #{data := DataList}, Acc) -> + RawOpts = #{return => rawlist, var_trans => fun(X) -> X end}, + lists:foldl( + fun(#{metric := MetricTk, tags := TagsTk, value := ValueTk} = Data, InAcc) -> + MetricVal = emqx_placeholder:proc_tmpl(MetricTk, Msg), + TagsVal = + case emqx_placeholder:proc_tmpl(TagsTk, Msg, RawOpts) of + [undefined] -> + #{}; + [Any] -> + Any + end, + ValueVal = + case ValueTk of + [_] -> + erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts)); + _ -> + emqx_placeholder:proc_tmpl(ValueTk, Msg) + end, + Base = #{metric => MetricVal, tags => TagsVal, value => ValueVal}, + [ + case maps:get(timestamp, Data, undefined) of + undefined -> + Base; + TimestampTk -> + Base#{timestamp => emqx_placeholder:proc_tmpl(TimestampTk, Msg)} + end + | InAcc + ] + end, + Acc, + DataList + ). + +preproc_data_template([]) -> + preproc_data_template(emqx_bridge_opents:default_data_template()); +preproc_data_template(DataList) -> + lists:map( + fun(Data) -> + maps:map( + fun(_Key, Value) -> + emqx_placeholder:preproc_tmpl(Value) + end, + Data + ) + end, + DataList ). diff --git a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl index 3632ce786..f86ae6986 100644 --- a/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl +++ b/apps/emqx_bridge_opents/test/emqx_bridge_opents_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_opents_SUITE). @@ -12,7 +12,8 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). % DB defaults --define(BATCH_SIZE, 10). +-define(BRIDGE_TYPE_BIN, <<"opents">>). +-define(APPS, [opentsdb, emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_opents_SUITE]). %%------------------------------------------------------------------------------ %% CT boilerplate @@ -20,95 +21,34 @@ all() -> [ - {group, with_batch}, - {group, without_batch} + {group, default} ]. groups() -> - TCs = emqx_common_test_helpers:all(?MODULE), + AllTCs = emqx_common_test_helpers:all(?MODULE), [ - {with_batch, TCs}, - {without_batch, TCs} + {default, AllTCs} ]. -init_per_group(with_batch, Config0) -> - Config = [{batch_size, ?BATCH_SIZE} | Config0], - common_init(Config); -init_per_group(without_batch, Config0) -> - Config = [{batch_size, 1} | Config0], - common_init(Config); -init_per_group(_Group, Config) -> - Config. - -end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> - ProxyHost = ?config(proxy_host, Config), - ProxyPort = ?config(proxy_port, Config), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - ok; -end_per_group(_Group, _Config) -> - ok. - init_per_suite(Config) -> - Config. + emqx_bridge_v2_testlib:init_per_suite(Config, ?APPS). -end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([opentsdb, emqx_bridge, emqx_resource, emqx_conf]), - ok. +end_per_suite(Config) -> + emqx_bridge_v2_testlib:end_per_suite(Config). -init_per_testcase(_Testcase, Config) -> - delete_bridge(Config), - snabbkaffe:start_trace(), - Config. - -end_per_testcase(_Testcase, Config) -> - ProxyHost = ?config(proxy_host, Config), - ProxyPort = ?config(proxy_port, Config), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - ok = snabbkaffe:stop(), - delete_bridge(Config), - ok. - -%%------------------------------------------------------------------------------ -%% Helper fns -%%------------------------------------------------------------------------------ - -common_init(ConfigT) -> - Host = os:getenv("OPENTS_HOST", "toxiproxy"), +init_per_group(default, Config0) -> + Host = os:getenv("OPENTS_HOST", "toxiproxy.emqx.net"), Port = list_to_integer(os:getenv("OPENTS_PORT", "4242")), - - Config0 = [ - {opents_host, Host}, - {opents_port, Port}, - {proxy_name, "opents"} - | ConfigT - ], - - BridgeType = proplists:get_value(bridge_type, Config0, <<"opents">>), + ProxyName = "opents", case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of true -> - % Setup toxiproxy - ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), - ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), - emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), - % Ensure enterprise bridge module is loaded - ok = emqx_common_test_helpers:start_apps([ - emqx_conf, emqx_resource, emqx_bridge - ]), - _ = application:ensure_all_started(opentsdb), - _ = emqx_bridge_enterprise:module_info(), - emqx_mgmt_api_test_util:init_suite(), - {Name, OpenTSConf} = opents_config(BridgeType, Config0), - Config = - [ - {opents_config, OpenTSConf}, - {opents_bridge_type, BridgeType}, - {opents_name, Name}, - {proxy_host, ProxyHost}, - {proxy_port, ProxyPort} - | Config0 - ], - Config; + Config = emqx_bridge_v2_testlib:init_per_group(default, ?BRIDGE_TYPE_BIN, Config0), + [ + {bridge_host, Host}, + {bridge_port, Port}, + {proxy_name, ProxyName} + | Config + ]; false -> case os:getenv("IS_CI") of "yes" -> @@ -116,244 +56,152 @@ common_init(ConfigT) -> _ -> {skip, no_opents} end - end. - -opents_config(BridgeType, Config) -> - Port = integer_to_list(?config(opents_port, Config)), - Server = "http://" ++ ?config(opents_host, Config) ++ ":" ++ Port, - Name = atom_to_binary(?MODULE), - BatchSize = ?config(batch_size, Config), - ConfigString = - io_lib:format( - "bridges.~s.~s {\n" - " enable = true\n" - " server = ~p\n" - " resource_opts = {\n" - " request_ttl = 500ms\n" - " batch_size = ~b\n" - " query_mode = sync\n" - " }\n" - "}", - [ - BridgeType, - Name, - Server, - BatchSize - ] - ), - {Name, parse_and_check(ConfigString, BridgeType, Name)}. - -parse_and_check(ConfigString, BridgeType, Name) -> - {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), - hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), - #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, + end; +init_per_group(_Group, Config) -> Config. -create_bridge(Config) -> - create_bridge(Config, _Overrides = #{}). +end_per_group(default, Config) -> + emqx_bridge_v2_testlib:end_per_group(Config), + ok; +end_per_group(_Group, _Config) -> + ok. -create_bridge(Config, Overrides) -> - BridgeType = ?config(opents_bridge_type, Config), - Name = ?config(opents_name, Config), - Config0 = ?config(opents_config, Config), - Config1 = emqx_utils_maps:deep_merge(Config0, Overrides), - emqx_bridge:create(BridgeType, Name, Config1). +init_per_testcase(TestCase, Config0) -> + Type = ?config(bridge_type, Config0), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = << + (atom_to_binary(TestCase))/binary, UniqueNum/binary + >>, + {_ConfigString, ConnectorConfig} = connector_config(Name, Config0), + {_, ActionConfig} = action_config(Name, Config0), + Config = [ + {connector_type, Type}, + {connector_name, Name}, + {connector_config, ConnectorConfig}, + {bridge_type, Type}, + {bridge_name, Name}, + {bridge_config, ActionConfig} + | Config0 + ], + %% iotdb_reset(Config), + ok = snabbkaffe:start_trace(), + Config. -delete_bridge(Config) -> - BridgeType = ?config(opents_bridge_type, Config), - Name = ?config(opents_name, Config), - emqx_bridge:remove(BridgeType, Name). - -create_bridge_http(Params) -> - Path = emqx_mgmt_api_test_util:api_path(["bridges"]), - AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of - {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; - Error -> Error - end. - -send_message(Config, Payload) -> - Name = ?config(opents_name, Config), - BridgeType = ?config(opents_bridge_type, Config), - BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), - emqx_bridge:send_message(BridgeID, Payload). - -query_resource(Config, Request) -> - query_resource(Config, Request, 1_000). - -query_resource(Config, Request, Timeout) -> - Name = ?config(opents_name, Config), - BridgeType = ?config(opents_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => Timeout}). +end_per_testcase(TestCase, Config) -> + emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config). %%------------------------------------------------------------------------------ -%% Testcases +%% Helper fns %%------------------------------------------------------------------------------ -t_setup_via_config_and_publish(Config) -> - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), - SentData = make_data(), - ?check_trace( - begin - {_, {ok, #{result := Result}}} = - ?wait_async_action( - send_message(Config, SentData), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), - ?assertMatch( - {ok, 200, #{failed := 0, success := 1}}, Result - ), - ok - end, - fun(Trace0) -> - Trace = ?of_kind(opents_connector_query_return, Trace0), - ?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace), - ok - end - ), - ok. +action_config(Name, Config) -> + Type = ?config(bridge_type, Config), + ConfigString = + io_lib:format( + "actions.~s.~s {\n" + " enable = true\n" + " connector = \"~s\"\n" + " parameters = {\n" + " data = []\n" + " }\n" + "}\n", + [ + Type, + Name, + Name + ] + ), + ct:pal("ActionConfig:~ts~n", [ConfigString]), + {ConfigString, parse_action_and_check(ConfigString, Type, Name)}. -t_setup_via_http_api_and_publish(Config) -> - BridgeType = ?config(opents_bridge_type, Config), - Name = ?config(opents_name, Config), - OpentsConfig0 = ?config(opents_config, Config), - OpentsConfig = OpentsConfig0#{ - <<"name">> => Name, - <<"type">> => BridgeType - }, - ?assertMatch( - {ok, _}, - create_bridge_http(OpentsConfig) - ), - SentData = make_data(), - ?check_trace( - begin - Request = {send_message, SentData}, - Res0 = query_resource(Config, Request, 2_500), - ?assertMatch( - {ok, 200, #{failed := 0, success := 1}}, Res0 - ), - ok - end, - fun(Trace0) -> - Trace = ?of_kind(opents_connector_query_return, Trace0), - ?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace), - ok - end - ), - ok. +connector_config(Name, Config) -> + Host = ?config(bridge_host, Config), + Port = ?config(bridge_port, Config), + Type = ?config(bridge_type, Config), + ServerURL = opents_server_url(Host, Port), + ConfigString = + io_lib:format( + "connectors.~s.~s {\n" + " enable = true\n" + " server = \"~s\"\n" + "}\n", + [ + Type, + Name, + ServerURL + ] + ), + ct:pal("ConnectorConfig:~ts~n", [ConfigString]), + {ConfigString, parse_connector_and_check(ConfigString, Type, Name)}. -t_get_status(Config) -> - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), +parse_action_and_check(ConfigString, BridgeType, Name) -> + parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name). - Name = ?config(opents_name, Config), - BridgeType = ?config(opents_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), +parse_connector_and_check(ConfigString, ConnectorType, Name) -> + parse_and_check( + ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name + ). +%% emqx_utils_maps:safe_atom_key_map(Config). - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), - ok. +parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) -> + Type = to_bin(Type0), + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}), + #{RootKey := #{Type := #{Name := Config}}} = RawConf, + Config. -t_create_disconnected(Config) -> - BridgeType = proplists:get_value(bridge_type, Config, <<"opents">>), - Config1 = lists:keyreplace(opents_port, 1, Config, {opents_port, 61234}), - {_Name, OpenTSConf} = opents_config(BridgeType, Config1), +to_bin(List) when is_list(List) -> + unicode:characters_to_binary(List, utf8); +to_bin(Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom); +to_bin(Bin) when is_binary(Bin) -> + Bin. - Config2 = lists:keyreplace(opents_config, 1, Config1, {opents_config, OpenTSConf}), - ?assertMatch({ok, _}, create_bridge(Config2)), +opents_server_url(Host, Port) -> + iolist_to_binary([ + "http://", + Host, + ":", + integer_to_binary(Port) + ]). - Name = ?config(opents_name, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)), - ok. +is_success_check({ok, 200, #{failed := Failed}}) -> + ?assertEqual(0, Failed); +is_success_check(Ret) -> + ?assert(false, Ret). -t_write_failure(Config) -> - ProxyName = ?config(proxy_name, Config), - ProxyPort = ?config(proxy_port, Config), - ProxyHost = ?config(proxy_host, Config), - {ok, _} = create_bridge(Config), - SentData = make_data(), - emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - {_, {ok, #{result := Result}}} = - ?wait_async_action( - send_message(Config, SentData), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), - ?assertMatch({error, _}, Result), - ok - end), - ok. +is_error_check(Result) -> + ?assertMatch({error, {400, #{failed := 1}}}, Result). -t_write_timeout(Config) -> - ProxyName = ?config(proxy_name, Config), - ProxyPort = ?config(proxy_port, Config), - ProxyHost = ?config(proxy_host, Config), - {ok, _} = create_bridge( - Config, - #{ - <<"resource_opts">> => #{ - <<"request_ttl">> => <<"500ms">>, - <<"resume_interval">> => <<"100ms">>, - <<"health_check_interval">> => <<"100ms">> +opentds_query(Config, Metric) -> + Path = <<"/api/query">>, + Opts = #{return_all => true}, + Body = #{ + start => <<"1h-ago">>, + queries => [ + #{ + aggregator => <<"last">>, + metric => Metric, + tags => #{ + host => <<"*">> + } } - } - ), - SentData = make_data(), - emqx_common_test_helpers:with_failure( - timeout, ProxyName, ProxyHost, ProxyPort, fun() -> - ?assertMatch( - {error, {resource_error, #{reason := timeout}}}, - query_resource(Config, {send_message, SentData}) - ) - end - ), - ok. + ], + showTSUID => false, + showQuery => false, + delete => false + }, + opentsdb_request(Config, Path, Body, Opts). -t_missing_data(Config) -> - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), - {_, {ok, #{result := Result}}} = - ?wait_async_action( - send_message(Config, #{}), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), - ?assertMatch( - {error, {400, #{failed := 1, success := 0}}}, - Result - ), - ok. +opentsdb_request(Config, Path, Body) -> + opentsdb_request(Config, Path, Body, #{}). -t_bad_data(Config) -> - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), - Data = maps:without([metric], make_data()), - {_, {ok, #{result := Result}}} = - ?wait_async_action( - send_message(Config, Data), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), - - ?assertMatch( - {error, {400, #{failed := 1, success := 0}}}, Result - ), - ok. - -make_data() -> - make_data(<<"cpu">>, 12). +opentsdb_request(Config, Path, Body, Opts) -> + Host = ?config(bridge_host, Config), + Port = ?config(bridge_port, Config), + ServerURL = opents_server_url(Host, Port), + URL = <>, + emqx_mgmt_api_test_util:request_api(post, URL, [], [], Body, Opts). make_data(Metric, Value) -> #{ @@ -363,3 +211,45 @@ make_data(Metric, Value) -> }, value => Value }. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_query_simple(Config) -> + Metric = <<"t_query_simple">>, + Value = 12, + MakeMessageFun = fun() -> make_data(Metric, Value) end, + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, fun is_success_check/1, opents_bridge_on_query + ), + {ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric), + QResult = emqx_utils_json:decode(IoTDBResult), + ?assertMatch( + [ + #{ + <<"metric">> := Metric, + <<"dps">> := _ + } + ], + QResult + ), + [#{<<"dps">> := Dps}] = QResult, + ?assertMatch([Value | _], maps:values(Dps)). + +t_create_via_http(Config) -> + emqx_bridge_v2_testlib:t_create_via_http(Config). + +t_start_stop(Config) -> + emqx_bridge_v2_testlib:t_start_stop(Config, opents_bridge_stopped). + +t_on_get_status(Config) -> + emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}). + +t_query_invalid_data(Config) -> + Metric = <<"t_query_invalid_data">>, + Value = 12, + MakeMessageFun = fun() -> maps:remove(value, make_data(Metric, Value)) end, + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, fun is_error_check/1, opents_bridge_on_query + ). diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 655892d88..90c1ae1ce 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -52,6 +52,8 @@ resource_type(iotdb) -> emqx_bridge_iotdb_connector; resource_type(elasticsearch) -> emqx_bridge_es_connector; +resource_type(opents) -> + emqx_bridge_opents_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -66,6 +68,8 @@ connector_impl_module(iotdb) -> emqx_bridge_iotdb_connector; connector_impl_module(elasticsearch) -> emqx_bridge_es_connector; +connector_impl_module(opents) -> + emqx_bridge_opents_connector; connector_impl_module(_ConnectorType) -> undefined. @@ -193,6 +197,14 @@ connector_structs() -> desc => <<"ElasticSearch Connector Config">>, required => false } + )}, + {opents, + mk( + hoconsc:map(name, ref(emqx_bridge_opents_connector, "config_connector")), + #{ + desc => <<"OpenTSDB Connector Config">>, + required => false + } )} ]. @@ -212,7 +224,8 @@ schema_modules() -> emqx_postgresql_connector_schema, emqx_bridge_redis_schema, emqx_bridge_iotdb_connector, - emqx_bridge_es_connector + emqx_bridge_es_connector, + emqx_bridge_opents_connector ]. api_schemas(Method) -> @@ -241,7 +254,8 @@ api_schemas(Method) -> api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), - api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method) + api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method), + api_ref(emqx_bridge_opents_connector, <<"opents">>, Method) ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 615b89230..1829e04e6 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -154,7 +154,9 @@ connector_type_to_bridge_types(timescale) -> connector_type_to_bridge_types(iotdb) -> [iotdb]; connector_type_to_bridge_types(elasticsearch) -> - [elasticsearch]. + [elasticsearch]; +connector_type_to_bridge_types(opents) -> + [opents]. actions_config_name(action) -> <<"actions">>; actions_config_name(source) -> <<"sources">>. diff --git a/rel/i18n/emqx_bridge_opents.hocon b/rel/i18n/emqx_bridge_opents.hocon index ff44a9e18..5f1c4b0af 100644 --- a/rel/i18n/emqx_bridge_opents.hocon +++ b/rel/i18n/emqx_bridge_opents.hocon @@ -23,4 +23,35 @@ emqx_bridge_opents { desc_name.label: "Bridge Name" + +action_parameters_data.desc: +"""OpenTSDB action parameter data""" + +action_parameters_data.label: +"""Parameter Data""" + +config_parameters_timestamp.desc: +"""Timestamp. Placeholders in format of ${var} is supported""" + +config_parameters_timestamp.label: +"""Timestamp""" + +config_parameters_metric.metric: +"""Metric. Placeholders in format of ${var} is supported""" + +config_parameters_metric.metric: +"""Metric""" + +config_parameters_tags.desc: +"""Data Type, Placeholders in format of ${var} is supported""" + +config_parameters_tags.label: +"""Tags""" + +config_parameters_value.desc: +"""Value. Placeholders in format of ${var} is supported""" + +config_parameters_value.label: +"""Value""" + } diff --git a/rel/i18n/emqx_bridge_opents_connector.hocon b/rel/i18n/emqx_bridge_opents_connector.hocon index 5c39d1e0e..a54c240a0 100644 --- a/rel/i18n/emqx_bridge_opents_connector.hocon +++ b/rel/i18n/emqx_bridge_opents_connector.hocon @@ -17,4 +17,10 @@ emqx_bridge_opents_connector { details.label: "Details" + +desc_config.desc: +"""Configuration for OpenTSDB Connector.""" + +desc_config.label: +"""OpenTSDB Connector Configuration""" }