432 lines
13 KiB
Erlang
432 lines
13 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_bridge_opents_SUITE).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
% DB defaults
|
|
-define(BRIDGE_TYPE_BIN, <<"opents">>).
|
|
-define(APPS, [opentsdb, emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_opents_SUITE]).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% CT boilerplate
|
|
%%------------------------------------------------------------------------------
|
|
|
|
all() ->
|
|
[
|
|
{group, default}
|
|
].
|
|
|
|
groups() ->
|
|
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
|
[
|
|
{default, AllTCs}
|
|
].
|
|
|
|
init_per_suite(Config) ->
|
|
emqx_bridge_v2_testlib:init_per_suite(Config, ?APPS).
|
|
|
|
end_per_suite(Config) ->
|
|
emqx_bridge_v2_testlib:end_per_suite(Config).
|
|
|
|
init_per_group(default, Config0) ->
|
|
Host = os:getenv("OPENTS_HOST", "toxiproxy.emqx.net"),
|
|
Port = list_to_integer(os:getenv("OPENTS_PORT", "4242")),
|
|
ProxyName = "opents",
|
|
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
|
|
true ->
|
|
Config = emqx_bridge_v2_testlib:init_per_group(default, ?BRIDGE_TYPE_BIN, Config0),
|
|
[
|
|
{bridge_host, Host},
|
|
{bridge_port, Port},
|
|
{proxy_name, ProxyName}
|
|
| Config
|
|
];
|
|
false ->
|
|
case os:getenv("IS_CI") of
|
|
"yes" ->
|
|
throw(no_opents);
|
|
_ ->
|
|
{skip, no_opents}
|
|
end
|
|
end;
|
|
init_per_group(_Group, Config) ->
|
|
Config.
|
|
|
|
end_per_group(default, Config) ->
|
|
emqx_bridge_v2_testlib:end_per_group(Config),
|
|
ok;
|
|
end_per_group(_Group, _Config) ->
|
|
ok.
|
|
|
|
init_per_testcase(TestCase, Config0) ->
|
|
Type = ?config(bridge_type, Config0),
|
|
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
|
Name = <<
|
|
(atom_to_binary(TestCase))/binary, UniqueNum/binary
|
|
>>,
|
|
{_ConfigString, ConnectorConfig} = connector_config(Name, Config0),
|
|
{_, ActionConfig} = action_config(Name, Config0),
|
|
Config = [
|
|
{connector_type, Type},
|
|
{connector_name, Name},
|
|
{connector_config, ConnectorConfig},
|
|
{bridge_type, Type},
|
|
{bridge_name, Name},
|
|
{bridge_config, ActionConfig}
|
|
| Config0
|
|
],
|
|
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
|
ok = snabbkaffe:start_trace(),
|
|
Config.
|
|
|
|
end_per_testcase(TestCase, Config) ->
|
|
emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Helper fns
|
|
%%------------------------------------------------------------------------------
|
|
|
|
action_config(Name, Config) ->
|
|
Type = ?config(bridge_type, Config),
|
|
ConfigString =
|
|
io_lib:format(
|
|
"actions.~s.~s {\n"
|
|
" enable = true\n"
|
|
" connector = \"~s\"\n"
|
|
" parameters = {\n"
|
|
" data = []\n"
|
|
" }\n"
|
|
"}\n",
|
|
[
|
|
Type,
|
|
Name,
|
|
Name
|
|
]
|
|
),
|
|
ct:pal("ActionConfig:~ts~n", [ConfigString]),
|
|
{ConfigString, parse_action_and_check(ConfigString, Type, Name)}.
|
|
|
|
connector_config(Name, Config) ->
|
|
Host = ?config(bridge_host, Config),
|
|
Port = ?config(bridge_port, Config),
|
|
Type = ?config(bridge_type, Config),
|
|
ServerURL = opents_server_url(Host, Port),
|
|
ConfigString =
|
|
io_lib:format(
|
|
"connectors.~s.~s {\n"
|
|
" enable = true\n"
|
|
" server = \"~s\"\n"
|
|
"}\n",
|
|
[
|
|
Type,
|
|
Name,
|
|
ServerURL
|
|
]
|
|
),
|
|
ct:pal("ConnectorConfig:~ts~n", [ConfigString]),
|
|
{ConfigString, parse_connector_and_check(ConfigString, Type, Name)}.
|
|
|
|
parse_action_and_check(ConfigString, BridgeType, Name) ->
|
|
parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name).
|
|
|
|
parse_connector_and_check(ConfigString, ConnectorType, Name) ->
|
|
parse_and_check(
|
|
ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name
|
|
).
|
|
%% emqx_utils_maps:safe_atom_key_map(Config).
|
|
|
|
parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) ->
|
|
Type = to_bin(Type0),
|
|
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
|
hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}),
|
|
#{RootKey := #{Type := #{Name := Config}}} = RawConf,
|
|
Config.
|
|
|
|
to_bin(List) when is_list(List) ->
|
|
unicode:characters_to_binary(List, utf8);
|
|
to_bin(Atom) when is_atom(Atom) ->
|
|
erlang:atom_to_binary(Atom);
|
|
to_bin(Bin) when is_binary(Bin) ->
|
|
Bin.
|
|
|
|
opents_server_url(Host, Port) ->
|
|
iolist_to_binary([
|
|
"http://",
|
|
Host,
|
|
":",
|
|
integer_to_binary(Port)
|
|
]).
|
|
|
|
is_success_check({ok, 200, #{failed := Failed}}) ->
|
|
?assertEqual(0, Failed);
|
|
is_success_check(Ret) ->
|
|
?assert(false, Ret).
|
|
|
|
is_error_check(Result) ->
|
|
?assertMatch({error, {400, #{failed := 1}}}, Result).
|
|
|
|
opentds_query(Config, Metric) ->
|
|
Path = <<"/api/query">>,
|
|
Opts = #{return_all => true},
|
|
Body = #{
|
|
start => <<"1h-ago">>,
|
|
queries => [
|
|
#{
|
|
aggregator => <<"last">>,
|
|
metric => Metric,
|
|
tags => #{
|
|
host => <<"*">>
|
|
}
|
|
}
|
|
],
|
|
showTSUID => false,
|
|
showQuery => false,
|
|
delete => false
|
|
},
|
|
opentsdb_request(Config, Path, Body, Opts).
|
|
|
|
opentsdb_request(Config, Path, Body) ->
|
|
opentsdb_request(Config, Path, Body, #{}).
|
|
|
|
opentsdb_request(Config, Path, Body, Opts) ->
|
|
Host = ?config(bridge_host, Config),
|
|
Port = ?config(bridge_port, Config),
|
|
ServerURL = opents_server_url(Host, Port),
|
|
URL = <<ServerURL/binary, Path/binary>>,
|
|
emqx_mgmt_api_test_util:request_api(post, URL, [], [], Body, Opts).
|
|
|
|
make_data(Metric, Value) ->
|
|
#{
|
|
metric => Metric,
|
|
tags => #{
|
|
<<"host">> => <<"serverA">>
|
|
},
|
|
value => Value
|
|
}.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Testcases
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_query_simple(Config) ->
|
|
Metric = <<"t_query_simple">>,
|
|
Value = 12,
|
|
MakeMessageFun = fun() -> make_data(Metric, Value) end,
|
|
ok = emqx_bridge_v2_testlib:t_sync_query(
|
|
Config, MakeMessageFun, fun is_success_check/1, opents_bridge_on_query
|
|
),
|
|
{ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
|
|
QResult = emqx_utils_json:decode(IoTDBResult),
|
|
?assertMatch(
|
|
[
|
|
#{
|
|
<<"metric">> := Metric,
|
|
<<"dps">> := _
|
|
}
|
|
],
|
|
QResult
|
|
),
|
|
[#{<<"dps">> := Dps}] = QResult,
|
|
?assertMatch([Value | _], maps:values(Dps)).
|
|
|
|
t_create_via_http(Config) ->
|
|
emqx_bridge_v2_testlib:t_create_via_http(Config).
|
|
|
|
t_start_stop(Config) ->
|
|
emqx_bridge_v2_testlib:t_start_stop(Config, opents_bridge_stopped).
|
|
|
|
t_on_get_status(Config) ->
|
|
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}).
|
|
|
|
t_query_invalid_data(Config) ->
|
|
Metric = <<"t_query_invalid_data">>,
|
|
Value = 12,
|
|
MakeMessageFun = fun() -> maps:remove(value, make_data(Metric, Value)) end,
|
|
ok = emqx_bridge_v2_testlib:t_sync_query(
|
|
Config, MakeMessageFun, fun is_error_check/1, opents_bridge_on_query
|
|
).
|
|
|
|
t_tags_validator(Config) ->
|
|
%% Create without data configured
|
|
?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
|
|
|
?assertMatch(
|
|
{ok, _},
|
|
emqx_bridge_v2_testlib:update_bridge_api(Config, #{
|
|
<<"parameters">> => #{
|
|
<<"data">> => [
|
|
#{
|
|
<<"metric">> => <<"${metric}">>,
|
|
<<"tags">> => <<"${tags}">>,
|
|
<<"value">> => <<"${payload.value}">>
|
|
}
|
|
]
|
|
}
|
|
})
|
|
),
|
|
|
|
?assertMatch(
|
|
{error, _},
|
|
emqx_bridge_v2_testlib:update_bridge_api(Config, #{
|
|
<<"parameters">> => #{
|
|
<<"data">> => [
|
|
#{
|
|
<<"metric">> => <<"${metric}">>,
|
|
<<"tags">> => <<"text">>,
|
|
<<"value">> => <<"${payload.value}">>
|
|
}
|
|
]
|
|
}
|
|
})
|
|
).
|
|
|
|
t_raw_int_value(Config) ->
|
|
raw_value_test(<<"t_raw_int_value">>, 42, Config).
|
|
|
|
t_raw_float_value(Config) ->
|
|
raw_value_test(<<"t_raw_float_value">>, 42.5, Config).
|
|
|
|
t_list_tags(Config) ->
|
|
?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
|
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
|
|
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
|
|
?retry(
|
|
_Sleep = 1_000,
|
|
_Attempts = 10,
|
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
),
|
|
|
|
?assertMatch(
|
|
{ok, _},
|
|
emqx_bridge_v2_testlib:update_bridge_api(Config, #{
|
|
<<"parameters">> => #{
|
|
<<"data">> => [
|
|
#{
|
|
<<"metric">> => <<"${metric}">>,
|
|
<<"tags">> => #{<<"host">> => <<"valueA">>},
|
|
value => <<"${value}">>
|
|
}
|
|
]
|
|
}
|
|
})
|
|
),
|
|
|
|
Metric = <<"t_list_tags">>,
|
|
Value = 12,
|
|
MakeMessageFun = fun() -> make_data(Metric, Value) end,
|
|
|
|
is_success_check(
|
|
emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()})
|
|
),
|
|
|
|
{ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
|
|
QResult = emqx_utils_json:decode(IoTDBResult),
|
|
?assertMatch(
|
|
[
|
|
#{
|
|
<<"metric">> := Metric,
|
|
<<"tags">> := #{<<"host">> := <<"valueA">>}
|
|
}
|
|
],
|
|
QResult
|
|
).
|
|
|
|
t_list_tags_with_var(Config) ->
|
|
?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
|
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
|
|
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
|
|
?retry(
|
|
_Sleep = 1_000,
|
|
_Attempts = 10,
|
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
),
|
|
|
|
?assertMatch(
|
|
{ok, _},
|
|
emqx_bridge_v2_testlib:update_bridge_api(Config, #{
|
|
<<"parameters">> => #{
|
|
<<"data">> => [
|
|
#{
|
|
<<"metric">> => <<"${metric}">>,
|
|
<<"tags">> => #{<<"host">> => <<"${value}">>},
|
|
value => <<"${value}">>
|
|
}
|
|
]
|
|
}
|
|
})
|
|
),
|
|
|
|
Metric = <<"t_list_tags_with_var">>,
|
|
Value = 12,
|
|
MakeMessageFun = fun() -> make_data(Metric, Value) end,
|
|
|
|
is_success_check(
|
|
emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()})
|
|
),
|
|
|
|
{ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
|
|
QResult = emqx_utils_json:decode(IoTDBResult),
|
|
?assertMatch(
|
|
[
|
|
#{
|
|
<<"metric">> := Metric,
|
|
<<"tags">> := #{<<"host">> := <<"12">>}
|
|
}
|
|
],
|
|
QResult
|
|
).
|
|
|
|
raw_value_test(Metric, RawValue, Config) ->
|
|
?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)),
|
|
ResourceId = emqx_bridge_v2_testlib:resource_id(Config),
|
|
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
|
|
?retry(
|
|
_Sleep = 1_000,
|
|
_Attempts = 10,
|
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
),
|
|
|
|
?assertMatch(
|
|
{ok, _},
|
|
emqx_bridge_v2_testlib:update_bridge_api(Config, #{
|
|
<<"parameters">> => #{
|
|
<<"data">> => [
|
|
#{
|
|
<<"metric">> => <<"${metric}">>,
|
|
<<"tags">> => <<"${tags}">>,
|
|
<<"value">> => RawValue
|
|
}
|
|
]
|
|
}
|
|
})
|
|
),
|
|
|
|
Value = 12,
|
|
MakeMessageFun = fun() -> make_data(Metric, Value) end,
|
|
|
|
is_success_check(
|
|
emqx_resource:simple_sync_query(ResourceId, {BridgeId, MakeMessageFun()})
|
|
),
|
|
|
|
{ok, {{_, 200, _}, _, IoTDBResult}} = opentds_query(Config, Metric),
|
|
QResult = emqx_utils_json:decode(IoTDBResult),
|
|
?assertMatch(
|
|
[
|
|
#{
|
|
<<"metric">> := Metric,
|
|
<<"dps">> := _
|
|
}
|
|
],
|
|
QResult
|
|
),
|
|
[#{<<"dps">> := Dps}] = QResult,
|
|
?assertMatch([RawValue | _], maps:values(Dps)).
|