feat(opents): improve the OpentsDB bridge to v2 style

This commit is contained in:
firest 2024-01-22 20:45:10 +08:00 committed by zhongwencool
parent 878c9ee8b1
commit dad8a32e0b
9 changed files with 607 additions and 332 deletions

View File

@ -98,7 +98,8 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_timescale_action_info, emqx_bridge_timescale_action_info,
emqx_bridge_redis_action_info, emqx_bridge_redis_action_info,
emqx_bridge_iotdb_action_info, emqx_bridge_iotdb_action_info,
emqx_bridge_es_action_info emqx_bridge_es_action_info,
emqx_bridge_opents_action_info
]. ].
-else. -else.
hard_coded_action_info_modules_ee() -> hard_coded_action_info_modules_ee() ->

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_bridge_opents). -module(emqx_bridge_opents).
@ -7,10 +7,12 @@
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2, array/1]).
-export([ -export([
conn_bridge_examples/1 conn_bridge_examples/1,
bridge_v2_examples/1,
default_data_template/0
]). ]).
-export([ -export([
@ -20,8 +22,11 @@
desc/1 desc/1
]). ]).
-define(CONNECTOR_TYPE, opents).
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% api %% v1 examples
conn_bridge_examples(Method) -> conn_bridge_examples(Method) ->
[ [
#{ #{
@ -34,7 +39,7 @@ conn_bridge_examples(Method) ->
values(_Method) -> values(_Method) ->
#{ #{
enable => true, enabledb => true,
type => opents, type => opents,
name => <<"foo">>, name => <<"foo">>,
server => <<"http://127.0.0.1:4242">>, server => <<"http://127.0.0.1:4242">>,
@ -50,7 +55,37 @@ values(_Method) ->
}. }.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions %% v2 examples
bridge_v2_examples(Method) ->
[
#{
<<"opents">> => #{
summary => <<"OpenTSDB Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
)
}
}
].
action_values() ->
#{
parameters => #{
data => default_data_template()
}
}.
default_data_template() ->
[
#{
metric => <<"${metric}">>,
tags => <<"${tags}">>,
value => <<"${value}">>
}
].
%% -------------------------------------------------------------------------------------------------
%% V1 Schema Definitions
namespace() -> "bridge_opents". namespace() -> "bridge_opents".
roots() -> []. roots() -> [].
@ -65,10 +100,89 @@ fields("post") ->
fields("put") -> fields("put") ->
fields("config"); fields("config");
fields("get") -> fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post"). emqx_bridge_schema:status_fields() ++ fields("post");
%% -------------------------------------------------------------------------------------------------
%% V2 Schema Definitions
fields(action) ->
{opents,
mk(
hoconsc:map(name, ref(?MODULE, action_config)),
#{
desc => <<"OpenTSDB Action Config">>,
required => false
}
)};
fields(action_config) ->
emqx_bridge_v2_schema:make_producer_action_schema(
mk(
ref(?MODULE, action_parameters),
#{
required => true, desc => ?DESC("action_parameters")
}
)
);
fields(action_parameters) ->
[
{data,
mk(
array(ref(?MODULE, action_parameters_data)),
#{
desc => ?DESC("action_parameters_data"),
default => <<"[]">>
}
)}
];
fields(action_parameters_data) ->
[
{timestamp,
mk(
binary(),
#{
desc => ?DESC("config_parameters_timestamp"),
required => false
}
)},
{metric,
mk(
binary(),
#{
required => true,
desc => ?DESC("config_parameters_metric")
}
)},
{tags,
mk(
binary(),
#{
required => true,
desc => ?DESC("config_parameters_tags")
}
)},
{value,
mk(
binary(),
#{
required => true,
desc => ?DESC("config_parameters_value")
}
)}
];
fields("post_bridge_v2") ->
emqx_bridge_schema:type_and_name_fields(enum([opents])) ++ fields(action_config);
fields("put_bridge_v2") ->
fields(action_config);
fields("get_bridge_v2") ->
emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2").
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(action_config) ->
?DESC("desc_config");
desc(action_parameters) ->
?DESC("action_parameters");
desc(action_parameters_data) ->
?DESC("action_parameters_data");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."]; ["Configuration for OpenTSDB using `", string:to_upper(Method), "` method."];
desc(_) -> desc(_) ->

View File

@ -0,0 +1,71 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_opents_action_info).
-behaviour(emqx_action_info).
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
%% behaviour callbacks
-export([
action_type_name/0,
bridge_v1_config_to_action_config/2,
bridge_v1_config_to_connector_config/1,
bridge_v1_type_name/0,
connector_action_config_to_bridge_v1_config/2,
connector_type_name/0,
schema_module/0
]).
-import(emqx_utils_conv, [bin/1]).
-define(ACTION_TYPE, opents).
-define(SCHEMA_MODULE, emqx_bridge_opents).
action_type_name() -> ?ACTION_TYPE.
bridge_v1_type_name() -> ?ACTION_TYPE.
connector_type_name() -> ?ACTION_TYPE.
schema_module() -> ?SCHEMA_MODULE.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
MergedConfig =
emqx_utils_maps:deep_merge(
maps:without(
[<<"description">>, <<"local_topic">>, <<"connector">>, <<"data">>],
emqx_utils_maps:unindent(<<"parameters">>, ActionConfig)
),
ConnectorConfig
),
BridgeV1Keys = schema_keys("config"),
maps:with(BridgeV1Keys, MergedConfig).
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionTopLevelKeys = schema_keys(action_config),
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
ActionConfig#{<<"connector">> => ConnectorName}
).
bridge_v1_config_to_connector_config(BridgeV1Config) ->
ConnectorKeys = schema_keys(emqx_bridge_opents_connector, "config_connector"),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
maps:with(ConnectorKeys, BridgeV1Config)
).
make_config_map(PickKeys, IndentKeys, Config) ->
Conf0 = maps:with(PickKeys, Config#{<<"data">> => []}),
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
schema_keys(Name) ->
schema_keys(?SCHEMA_MODULE, Name).
schema_keys(Mod, Name) ->
[bin(Key) || Key <- proplists:get_keys(Mod:fields(Name))].

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_bridge_opents_connector). -module(emqx_bridge_opents_connector).
@ -12,7 +12,7 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-export([roots/0, fields/1]). -export([namespace/0, roots/0, fields/1, desc/1]).
%% `emqx_resource' API %% `emqx_resource' API
-export([ -export([
@ -21,15 +21,25 @@
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_get_status/2 on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]). ]).
-export([connector_examples/1]).
-export([connect/1]). -export([connect/1]).
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-define(CONNECTOR_TYPE, opents).
namespace() -> "opents_connector".
%%===================================================================== %%=====================================================================
%% Hocon schema %% V1 Hocon schema
roots() -> roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}]. [{config, #{type => hoconsc:ref(?MODULE, config)}}].
@ -40,8 +50,56 @@ fields(config) ->
{summary, mk(boolean(), #{default => true, desc => ?DESC("summary")})}, {summary, mk(boolean(), #{default => true, desc => ?DESC("summary")})},
{details, mk(boolean(), #{default => false, desc => ?DESC("details")})}, {details, mk(boolean(), #{default => false, desc => ?DESC("details")})},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
];
%%=====================================================================
%% V2 Hocon schema
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
proplists_without([auto_reconnect], fields(config));
fields("post") ->
emqx_connector_schema:type_and_name_fields(enum([opents])) ++ fields("config_connector");
fields("put") ->
fields("config_connector");
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post").
desc(config) ->
?DESC("desc_config");
desc("config_connector") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
desc(_) ->
undefined.
proplists_without(Keys, List) ->
[El || El = {K, _} <- List, not lists:member(K, Keys)].
%%=====================================================================
%% V2 examples
connector_examples(Method) ->
[
#{
<<"opents">> =>
#{
summary => <<"OpenTSDB Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?CONNECTOR_TYPE, connector_example_values()
)
}
}
]. ].
connector_example_values() ->
#{
name => <<"opents_connector">>,
type => opents,
enable => true,
server => <<"http://localhost:4242/">>,
pool_size => 8
}.
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
@ -56,8 +114,7 @@ on_start(
server := Server, server := Server,
pool_size := PoolSize, pool_size := PoolSize,
summary := Summary, summary := Summary,
details := Details, details := Details
resource_opts := #{batch_size := BatchSize}
} = Config } = Config
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
@ -70,11 +127,10 @@ on_start(
{server, to_str(Server)}, {server, to_str(Server)},
{summary, Summary}, {summary, Summary},
{details, Details}, {details, Details},
{max_batch_size, BatchSize},
{pool_size, PoolSize} {pool_size, PoolSize}
], ],
State = #{pool_name => InstanceId, server => Server}, State = #{pool_name => InstanceId, server => Server, channels => #{}},
case opentsdb_connectivity(Server) of case opentsdb_connectivity(Server) of
ok -> ok ->
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
@ -93,6 +149,7 @@ on_stop(InstanceId, _State) ->
msg => "stopping_opents_connector", msg => "stopping_opents_connector",
connector => InstanceId connector => InstanceId
}), }),
?tp(opents_bridge_stopped, #{instance_id => InstanceId}),
emqx_resource_pool:stop(InstanceId). emqx_resource_pool:stop(InstanceId).
on_query(InstanceId, Request, State) -> on_query(InstanceId, Request, State) ->
@ -101,10 +158,14 @@ on_query(InstanceId, Request, State) ->
on_batch_query( on_batch_query(
InstanceId, InstanceId,
BatchReq, BatchReq,
State #{channels := Channels} = State
) -> ) ->
Datas = [format_opentsdb_msg(Msg) || {_Key, Msg} <- BatchReq], case try_render_messages(BatchReq, Channels) of
do_query(InstanceId, Datas, State). {ok, Datas} ->
do_query(InstanceId, Datas, State);
Error ->
Error
end.
on_get_status(_InstanceId, #{server := Server}) -> on_get_status(_InstanceId, #{server := Server}) ->
Result = Result =
@ -117,6 +178,39 @@ on_get_status(_InstanceId, #{server := Server}) ->
end, end,
Result. Result.
on_add_channel(
_InstanceId,
#{channels := Channels} = OldState,
ChannelId,
#{
parameters := #{data := Data} = Parameter
}
) ->
case maps:is_key(ChannelId, Channels) of
true ->
{error, already_exists};
_ ->
Channel = Parameter#{
data := preproc_data_template(Data)
},
Channels2 = Channels#{ChannelId => Channel},
{ok, OldState#{channels := Channels2}}
end.
on_remove_channel(_InstanceId, #{channels := Channels} = OldState, ChannelId) ->
{ok, OldState#{channels => maps:remove(ChannelId, Channels)}}.
on_get_channels(InstanceId) ->
emqx_bridge_v2:get_channels_for_connector(InstanceId).
on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) ->
case maps:is_key(ChannelId, Channels) of
true ->
on_get_status(InstanceId, State);
_ ->
{error, not_exists}
end.
%%======================================================================================== %%========================================================================================
%% Helper fns %% Helper fns
%%======================================================================================== %%========================================================================================
@ -127,6 +221,9 @@ do_query(InstanceId, Query, #{pool_name := PoolName} = State) ->
"opents_connector_received", "opents_connector_received",
#{connector => InstanceId, query => Query, state => State} #{connector => InstanceId, query => Query, state => State}
), ),
?tp(opents_bridge_on_query, #{instance_id => InstanceId}),
Result = ecpool:pick_and_do(PoolName, {opentsdb, put, [Query]}, no_handover), Result = ecpool:pick_and_do(PoolName, {opentsdb, put, [Query]}, no_handover),
case Result of case Result of
@ -172,17 +269,66 @@ opentsdb_connectivity(Server) ->
end, end,
emqx_connector_lib:http_connectivity(SvrUrl, ?HTTP_CONNECT_TIMEOUT). emqx_connector_lib:http_connectivity(SvrUrl, ?HTTP_CONNECT_TIMEOUT).
format_opentsdb_msg(Msg) -> try_render_messages([{ChannelId, _} | _] = BatchReq, Channels) ->
maps:with( case maps:find(ChannelId, Channels) of
[ {ok, Channel} ->
timestamp, {ok,
metric, lists:foldl(
tags, fun({_, Message}, Acc) ->
value, render_channel_message(Message, Channel, Acc)
<<"timestamp">>, end,
<<"metric">>, [],
<<"tags">>, BatchReq
<<"value">> )};
], _ ->
Msg {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
end.
render_channel_message(Msg, #{data := DataList}, Acc) ->
RawOpts = #{return => rawlist, var_trans => fun(X) -> X end},
lists:foldl(
fun(#{metric := MetricTk, tags := TagsTk, value := ValueTk} = Data, InAcc) ->
MetricVal = emqx_placeholder:proc_tmpl(MetricTk, Msg),
TagsVal =
case emqx_placeholder:proc_tmpl(TagsTk, Msg, RawOpts) of
[undefined] ->
#{};
[Any] ->
Any
end,
ValueVal =
case ValueTk of
[_] ->
erlang:hd(emqx_placeholder:proc_tmpl(ValueTk, Msg, RawOpts));
_ ->
emqx_placeholder:proc_tmpl(ValueTk, Msg)
end,
Base = #{metric => MetricVal, tags => TagsVal, value => ValueVal},
[
case maps:get(timestamp, Data, undefined) of
undefined ->
Base;
TimestampTk ->
Base#{timestamp => emqx_placeholder:proc_tmpl(TimestampTk, Msg)}
end
| InAcc
]
end,
Acc,
DataList
).
preproc_data_template([]) ->
preproc_data_template(emqx_bridge_opents:default_data_template());
preproc_data_template(DataList) ->
lists:map(
fun(Data) ->
maps:map(
fun(_Key, Value) ->
emqx_placeholder:preproc_tmpl(Value)
end,
Data
)
end,
DataList
). ).

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_bridge_opents_SUITE). -module(emqx_bridge_opents_SUITE).
@ -12,7 +12,8 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
% DB defaults % DB defaults
-define(BATCH_SIZE, 10). -define(BRIDGE_TYPE_BIN, <<"opents">>).
-define(APPS, [opentsdb, emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_opents_SUITE]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% CT boilerplate %% CT boilerplate
@ -20,95 +21,34 @@
all() -> all() ->
[ [
{group, with_batch}, {group, default}
{group, without_batch}
]. ].
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), AllTCs = emqx_common_test_helpers:all(?MODULE),
[ [
{with_batch, TCs}, {default, AllTCs}
{without_batch, TCs}
]. ].
init_per_group(with_batch, Config0) ->
Config = [{batch_size, ?BATCH_SIZE} | Config0],
common_init(Config);
init_per_group(without_batch, Config0) ->
Config = [{batch_size, 1} | Config0],
common_init(Config);
init_per_group(_Group, Config) ->
Config.
end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ok;
end_per_group(_Group, _Config) ->
ok.
init_per_suite(Config) -> init_per_suite(Config) ->
Config. emqx_bridge_v2_testlib:init_per_suite(Config, ?APPS).
end_per_suite(_Config) -> end_per_suite(Config) ->
emqx_mgmt_api_test_util:end_suite(), emqx_bridge_v2_testlib:end_per_suite(Config).
ok = emqx_common_test_helpers:stop_apps([opentsdb, emqx_bridge, emqx_resource, emqx_conf]),
ok.
init_per_testcase(_Testcase, Config) -> init_per_group(default, Config0) ->
delete_bridge(Config), Host = os:getenv("OPENTS_HOST", "toxiproxy.emqx.net"),
snabbkaffe:start_trace(),
Config.
end_per_testcase(_Testcase, Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
ok = snabbkaffe:stop(),
delete_bridge(Config),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
common_init(ConfigT) ->
Host = os:getenv("OPENTS_HOST", "toxiproxy"),
Port = list_to_integer(os:getenv("OPENTS_PORT", "4242")), Port = list_to_integer(os:getenv("OPENTS_PORT", "4242")),
ProxyName = "opents",
Config0 = [
{opents_host, Host},
{opents_port, Port},
{proxy_name, "opents"}
| ConfigT
],
BridgeType = proplists:get_value(bridge_type, Config0, <<"opents">>),
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
true -> true ->
% Setup toxiproxy Config = emqx_bridge_v2_testlib:init_per_group(default, ?BRIDGE_TYPE_BIN, Config0),
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), [
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), {bridge_host, Host},
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), {bridge_port, Port},
% Ensure enterprise bridge module is loaded {proxy_name, ProxyName}
ok = emqx_common_test_helpers:start_apps([ | Config
emqx_conf, emqx_resource, emqx_bridge ];
]),
_ = application:ensure_all_started(opentsdb),
_ = emqx_bridge_enterprise:module_info(),
emqx_mgmt_api_test_util:init_suite(),
{Name, OpenTSConf} = opents_config(BridgeType, Config0),
Config =
[
{opents_config, OpenTSConf},
{opents_bridge_type, BridgeType},
{opents_name, Name},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort}
| Config0
],
Config;
false -> false ->
case os:getenv("IS_CI") of case os:getenv("IS_CI") of
"yes" -> "yes" ->
@ -116,244 +56,152 @@ common_init(ConfigT) ->
_ -> _ ->
{skip, no_opents} {skip, no_opents}
end end
end. end;
init_per_group(_Group, Config) ->
opents_config(BridgeType, Config) ->
Port = integer_to_list(?config(opents_port, Config)),
Server = "http://" ++ ?config(opents_host, Config) ++ ":" ++ Port,
Name = atom_to_binary(?MODULE),
BatchSize = ?config(batch_size, Config),
ConfigString =
io_lib:format(
"bridges.~s.~s {\n"
" enable = true\n"
" server = ~p\n"
" resource_opts = {\n"
" request_ttl = 500ms\n"
" batch_size = ~b\n"
" query_mode = sync\n"
" }\n"
"}",
[
BridgeType,
Name,
Server,
BatchSize
]
),
{Name, parse_and_check(ConfigString, BridgeType, Name)}.
parse_and_check(ConfigString, BridgeType, Name) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
#{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf,
Config. Config.
create_bridge(Config) -> end_per_group(default, Config) ->
create_bridge(Config, _Overrides = #{}). emqx_bridge_v2_testlib:end_per_group(Config),
ok;
end_per_group(_Group, _Config) ->
ok.
create_bridge(Config, Overrides) -> init_per_testcase(TestCase, Config0) ->
BridgeType = ?config(opents_bridge_type, Config), Type = ?config(bridge_type, Config0),
Name = ?config(opents_name, Config), UniqueNum = integer_to_binary(erlang:unique_integer()),
Config0 = ?config(opents_config, Config), Name = <<
Config1 = emqx_utils_maps:deep_merge(Config0, Overrides), (atom_to_binary(TestCase))/binary, UniqueNum/binary
emqx_bridge:create(BridgeType, Name, Config1). >>,
{_ConfigString, ConnectorConfig} = connector_config(Name, Config0),
{_, ActionConfig} = action_config(Name, Config0),
Config = [
{connector_type, Type},
{connector_name, Name},
{connector_config, ConnectorConfig},
{bridge_type, Type},
{bridge_name, Name},
{bridge_config, ActionConfig}
| Config0
],
%% iotdb_reset(Config),
ok = snabbkaffe:start_trace(),
Config.
delete_bridge(Config) -> end_per_testcase(TestCase, Config) ->
BridgeType = ?config(opents_bridge_type, Config), emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config).
Name = ?config(opents_name, Config),
emqx_bridge:remove(BridgeType, Name).
create_bridge_http(Params) ->
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
Error -> Error
end.
send_message(Config, Payload) ->
Name = ?config(opents_name, Config),
BridgeType = ?config(opents_bridge_type, Config),
BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
emqx_bridge:send_message(BridgeID, Payload).
query_resource(Config, Request) ->
query_resource(Config, Request, 1_000).
query_resource(Config, Request, Timeout) ->
Name = ?config(opents_name, Config),
BridgeType = ?config(opents_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => Timeout}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Helper fns
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_setup_via_config_and_publish(Config) -> action_config(Name, Config) ->
?assertMatch( Type = ?config(bridge_type, Config),
{ok, _}, ConfigString =
create_bridge(Config) io_lib:format(
), "actions.~s.~s {\n"
SentData = make_data(), " enable = true\n"
?check_trace( " connector = \"~s\"\n"
begin " parameters = {\n"
{_, {ok, #{result := Result}}} = " data = []\n"
?wait_async_action( " }\n"
send_message(Config, SentData), "}\n",
#{?snk_kind := buffer_worker_flush_ack}, [
2_000 Type,
), Name,
?assertMatch( Name
{ok, 200, #{failed := 0, success := 1}}, Result ]
), ),
ok ct:pal("ActionConfig:~ts~n", [ConfigString]),
end, {ConfigString, parse_action_and_check(ConfigString, Type, Name)}.
fun(Trace0) ->
Trace = ?of_kind(opents_connector_query_return, Trace0),
?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace),
ok
end
),
ok.
t_setup_via_http_api_and_publish(Config) -> connector_config(Name, Config) ->
BridgeType = ?config(opents_bridge_type, Config), Host = ?config(bridge_host, Config),
Name = ?config(opents_name, Config), Port = ?config(bridge_port, Config),
OpentsConfig0 = ?config(opents_config, Config), Type = ?config(bridge_type, Config),
OpentsConfig = OpentsConfig0#{ ServerURL = opents_server_url(Host, Port),
<<"name">> => Name, ConfigString =
<<"type">> => BridgeType io_lib:format(
}, "connectors.~s.~s {\n"
?assertMatch( " enable = true\n"
{ok, _}, " server = \"~s\"\n"
create_bridge_http(OpentsConfig) "}\n",
), [
SentData = make_data(), Type,
?check_trace( Name,
begin ServerURL
Request = {send_message, SentData}, ]
Res0 = query_resource(Config, Request, 2_500), ),
?assertMatch( ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
{ok, 200, #{failed := 0, success := 1}}, Res0 {ConfigString, parse_connector_and_check(ConfigString, Type, Name)}.
),
ok
end,
fun(Trace0) ->
Trace = ?of_kind(opents_connector_query_return, Trace0),
?assertMatch([#{result := {ok, 200, #{failed := 0, success := 1}}}], Trace),
ok
end
),
ok.
t_get_status(Config) -> parse_action_and_check(ConfigString, BridgeType, Name) ->
?assertMatch( parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name).
{ok, _},
create_bridge(Config)
),
Name = ?config(opents_name, Config), parse_connector_and_check(ConfigString, ConnectorType, Name) ->
BridgeType = ?config(opents_bridge_type, Config), parse_and_check(
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name
).
%% emqx_utils_maps:safe_atom_key_map(Config).
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) ->
ok. Type = to_bin(Type0),
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}),
#{RootKey := #{Type := #{Name := Config}}} = RawConf,
Config.
t_create_disconnected(Config) -> to_bin(List) when is_list(List) ->
BridgeType = proplists:get_value(bridge_type, Config, <<"opents">>), unicode:characters_to_binary(List, utf8);
Config1 = lists:keyreplace(opents_port, 1, Config, {opents_port, 61234}), to_bin(Atom) when is_atom(Atom) ->
{_Name, OpenTSConf} = opents_config(BridgeType, Config1), erlang:atom_to_binary(Atom);
to_bin(Bin) when is_binary(Bin) ->
Bin.
Config2 = lists:keyreplace(opents_config, 1, Config1, {opents_config, OpenTSConf}), opents_server_url(Host, Port) ->
?assertMatch({ok, _}, create_bridge(Config2)), iolist_to_binary([
"http://",
Host,
":",
integer_to_binary(Port)
]).
Name = ?config(opents_name, Config), is_success_check({ok, 200, #{failed := Failed}}) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?assertEqual(0, Failed);
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceID)), is_success_check(Ret) ->
ok. ?assert(false, Ret).
t_write_failure(Config) -> is_error_check(Result) ->
ProxyName = ?config(proxy_name, Config), ?assertMatch({error, {400, #{failed := 1}}}, Result).
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
{ok, _} = create_bridge(Config),
SentData = make_data(),
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
{_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, SentData),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch({error, _}, Result),
ok
end),
ok.
t_write_timeout(Config) -> opentds_query(Config, Metric) ->
ProxyName = ?config(proxy_name, Config), Path = <<"/api/query">>,
ProxyPort = ?config(proxy_port, Config), Opts = #{return_all => true},
ProxyHost = ?config(proxy_host, Config), Body = #{
{ok, _} = create_bridge( start => <<"1h-ago">>,
Config, queries => [
#{ #{
<<"resource_opts">> => #{ aggregator => <<"last">>,
<<"request_ttl">> => <<"500ms">>, metric => Metric,
<<"resume_interval">> => <<"100ms">>, tags => #{
<<"health_check_interval">> => <<"100ms">> host => <<"*">>
}
} }
} ],
), showTSUID => false,
SentData = make_data(), showQuery => false,
emqx_common_test_helpers:with_failure( delete => false
timeout, ProxyName, ProxyHost, ProxyPort, fun() -> },
?assertMatch( opentsdb_request(Config, Path, Body, Opts).
{error, {resource_error, #{reason := timeout}}},
query_resource(Config, {send_message, SentData})
)
end
),
ok.
t_missing_data(Config) -> opentsdb_request(Config, Path, Body) ->
?assertMatch( opentsdb_request(Config, Path, Body, #{}).
{ok, _},
create_bridge(Config)
),
{_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, #{}),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch(
{error, {400, #{failed := 1, success := 0}}},
Result
),
ok.
t_bad_data(Config) -> opentsdb_request(Config, Path, Body, Opts) ->
?assertMatch( Host = ?config(bridge_host, Config),
{ok, _}, Port = ?config(bridge_port, Config),
create_bridge(Config) ServerURL = opents_server_url(Host, Port),
), URL = <<ServerURL/binary, Path/binary>>,
Data = maps:without([metric], make_data()), emqx_mgmt_api_test_util:request_api(post, URL, [], [], Body, Opts).
{_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, Data),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch(
{error, {400, #{failed := 1, success := 0}}}, Result
),
ok.
make_data() ->
make_data(<<"cpu">>, 12).
make_data(Metric, Value) -> make_data(Metric, Value) ->
#{ #{
@ -363,3 +211,45 @@ make_data(Metric, Value) ->
}, },
value => Value value => Value
}. }.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_query_simple(Config) ->
Metric = <<"t_query_simple">>,
Value = 12,
MakeMessageFun = fun() -> make_data(Metric, Value) end,
ok = emqx_bridge_v2_testlib:t_sync_query(
Config, MakeMessageFun, fun is_success_check/1, opents_bridge_on_query
),
{ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
QResult = emqx_utils_json:decode(IoTDBResult),
?assertMatch(
[
#{
<<"metric">> := Metric,
<<"dps">> := _
}
],
QResult
),
[#{<<"dps">> := Dps}] = QResult,
?assertMatch([Value | _], maps:values(Dps)).
t_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config).
t_start_stop(Config) ->
emqx_bridge_v2_testlib:t_start_stop(Config, opents_bridge_stopped).
t_on_get_status(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}).
t_query_invalid_data(Config) ->
Metric = <<"t_query_invalid_data">>,
Value = 12,
MakeMessageFun = fun() -> maps:remove(value, make_data(Metric, Value)) end,
ok = emqx_bridge_v2_testlib:t_sync_query(
Config, MakeMessageFun, fun is_error_check/1, opents_bridge_on_query
).

View File

@ -52,6 +52,8 @@ resource_type(iotdb) ->
emqx_bridge_iotdb_connector; emqx_bridge_iotdb_connector;
resource_type(elasticsearch) -> resource_type(elasticsearch) ->
emqx_bridge_es_connector; emqx_bridge_es_connector;
resource_type(opents) ->
emqx_bridge_opents_connector;
resource_type(Type) -> resource_type(Type) ->
error({unknown_connector_type, Type}). error({unknown_connector_type, Type}).
@ -66,6 +68,8 @@ connector_impl_module(iotdb) ->
emqx_bridge_iotdb_connector; emqx_bridge_iotdb_connector;
connector_impl_module(elasticsearch) -> connector_impl_module(elasticsearch) ->
emqx_bridge_es_connector; emqx_bridge_es_connector;
connector_impl_module(opents) ->
emqx_bridge_opents_connector;
connector_impl_module(_ConnectorType) -> connector_impl_module(_ConnectorType) ->
undefined. undefined.
@ -193,6 +197,14 @@ connector_structs() ->
desc => <<"ElasticSearch Connector Config">>, desc => <<"ElasticSearch Connector Config">>,
required => false required => false
} }
)},
{opents,
mk(
hoconsc:map(name, ref(emqx_bridge_opents_connector, "config_connector")),
#{
desc => <<"OpenTSDB Connector Config">>,
required => false
}
)} )}
]. ].
@ -212,7 +224,8 @@ schema_modules() ->
emqx_postgresql_connector_schema, emqx_postgresql_connector_schema,
emqx_bridge_redis_schema, emqx_bridge_redis_schema,
emqx_bridge_iotdb_connector, emqx_bridge_iotdb_connector,
emqx_bridge_es_connector emqx_bridge_es_connector,
emqx_bridge_opents_connector
]. ].
api_schemas(Method) -> api_schemas(Method) ->
@ -241,7 +254,8 @@ api_schemas(Method) ->
api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"),
api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"),
api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method),
api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method) api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method),
api_ref(emqx_bridge_opents_connector, <<"opents">>, Method)
]. ].
api_ref(Module, Type, Method) -> api_ref(Module, Type, Method) ->

View File

@ -154,7 +154,9 @@ connector_type_to_bridge_types(timescale) ->
connector_type_to_bridge_types(iotdb) -> connector_type_to_bridge_types(iotdb) ->
[iotdb]; [iotdb];
connector_type_to_bridge_types(elasticsearch) -> connector_type_to_bridge_types(elasticsearch) ->
[elasticsearch]. [elasticsearch];
connector_type_to_bridge_types(opents) ->
[opents].
actions_config_name(action) -> <<"actions">>; actions_config_name(action) -> <<"actions">>;
actions_config_name(source) -> <<"sources">>. actions_config_name(source) -> <<"sources">>.

View File

@ -23,4 +23,35 @@ emqx_bridge_opents {
desc_name.label: desc_name.label:
"Bridge Name" "Bridge Name"
action_parameters_data.desc:
"""OpenTSDB action parameter data"""
action_parameters_data.label:
"""Parameter Data"""
config_parameters_timestamp.desc:
"""Timestamp. Placeholders in format of ${var} is supported"""
config_parameters_timestamp.label:
"""Timestamp"""
config_parameters_metric.metric:
"""Metric. Placeholders in format of ${var} is supported"""
config_parameters_metric.metric:
"""Metric"""
config_parameters_tags.desc:
"""Data Type, Placeholders in format of ${var} is supported"""
config_parameters_tags.label:
"""Tags"""
config_parameters_value.desc:
"""Value. Placeholders in format of ${var} is supported"""
config_parameters_value.label:
"""Value"""
} }

View File

@ -17,4 +17,10 @@ emqx_bridge_opents_connector {
details.label: details.label:
"Details" "Details"
desc_config.desc:
"""Configuration for OpenTSDB Connector."""
desc_config.label:
"""OpenTSDB Connector Configuration"""
} }