test: check data at iotdb instance

This commit is contained in:
Stefan Strigler 2023-05-29 16:37:09 +02:00
parent a3021c58f1
commit 860d7b169a
1 changed files with 148 additions and 66 deletions

View File

@ -67,7 +67,7 @@ end_per_group(_Group, _Config) ->
init_per_testcase(TestCase, Config0) -> init_per_testcase(TestCase, Config0) ->
Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3), Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3),
reset_service(Config), iotdb_reset(Config),
Config. Config.
end_per_testcase(TestCase, Config) -> end_per_testcase(TestCase, Config) ->
@ -76,6 +76,13 @@ end_per_testcase(TestCase, Config) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper fns %% Helper fns
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
iotdb_server_url(Host, Port) ->
iolist_to_binary([
"http://",
Host,
":",
integer_to_binary(Port)
]).
bridge_config(TestCase, _TestGroup, Config) -> bridge_config(TestCase, _TestGroup, Config) ->
UniqueNum = integer_to_binary(erlang:unique_integer()), UniqueNum = integer_to_binary(erlang:unique_integer()),
@ -84,12 +91,7 @@ bridge_config(TestCase, _TestGroup, Config) ->
Name = << Name = <<
(atom_to_binary(TestCase))/binary, UniqueNum/binary (atom_to_binary(TestCase))/binary, UniqueNum/binary
>>, >>,
ServerURL = iolist_to_binary([ ServerURL = iotdb_server_url(Host, Port),
"http://",
Host,
":",
integer_to_binary(Port)
]),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"bridges.iotdb.~s {\n" "bridges.iotdb.~s {\n"
@ -114,36 +116,19 @@ bridge_config(TestCase, _TestGroup, Config) ->
), ),
{Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}. {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}.
reset_service(Config) ->
_BridgeConfig =
#{
<<"base_url">> := BaseURL,
<<"authentication">> := #{
<<"username">> := Username,
<<"password">> := Password
}
} =
?config(bridge_config, Config),
ct:pal("bridge config: ~p", [_BridgeConfig]),
Path = <<BaseURL/binary, "/rest/v2/nonQuery">>,
BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
Headers = [
{"Content-type", "application/json"},
{"Authorization", binary_to_list(BasicToken)}
],
Device = iotdb_device(Config),
Body = #{sql => <<"delete from ", Device/binary, ".*">>},
{ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Headers, Body, #{}).
make_iotdb_payload(DeviceId, Measurement, Type, Value) -> make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
#{ #{
measurement => s_to_b(Measurement), measurement => s_to_b(Measurement),
data_type => s_to_b(Type), data_type => s_to_b(Type),
value => s_to_b(Value), value => s_to_b(Value),
device_id => DeviceId, device_id => DeviceId,
is_aligned => false is_aligned => true
}. }.
make_iotdb_payload(DeviceId, Measurement, Type, Value, Timestamp) ->
Payload = make_iotdb_payload(DeviceId, Measurement, Type, Value),
Payload#{timestamp => Timestamp}.
s_to_b(S) when is_list(S) -> list_to_binary(S); s_to_b(S) when is_list(S) -> list_to_binary(S);
s_to_b(V) -> V. s_to_b(V) -> V.
@ -163,6 +148,39 @@ iotdb_device(Config) ->
Device = re:replace(MQTTTopic, "/", ".dev", [global, {return, binary}]), Device = re:replace(MQTTTopic, "/", ".dev", [global, {return, binary}]),
<<"root.", Device/binary>>. <<"root.", Device/binary>>.
iotdb_request(Config, Path, Body) ->
iotdb_request(Config, Path, Body, #{}).
iotdb_request(Config, Path, Body, Opts) ->
_BridgeConfig =
#{
<<"base_url">> := BaseURL,
<<"authentication">> := #{
<<"username">> := Username,
<<"password">> := Password
}
} =
?config(bridge_config, Config),
ct:pal("bridge config: ~p", [_BridgeConfig]),
URL = <<BaseURL/binary, Path/binary>>,
BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
Headers = [
{"Content-type", "application/json"},
{"Authorization", binary_to_list(BasicToken)}
],
emqx_mgmt_api_test_util:request_api(post, URL, "", Headers, Body, Opts).
iotdb_reset(Config) ->
Device = iotdb_device(Config),
Body = #{sql => <<"delete from ", Device/binary, ".*">>},
{ok, _} = iotdb_request(Config, <<"/rest/v2/nonQuery">>, Body).
iotdb_query(Config, Query) ->
Path = <<"/rest/v2/query">>,
Opts = #{return_all => true},
Body = #{sql => Query},
iotdb_request(Config, Path, Body, Opts).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Testcases %% Testcases
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -175,7 +193,15 @@ t_sync_query_simple(Config) ->
fun(Result) -> fun(Result) ->
?assertEqual(ok, element(1, Result)) ?assertEqual(ok, element(1, Result))
end, end,
emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). ok = emqx_bridge_testlib:t_sync_query(
Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query
),
Query = <<"select temp from ", DeviceId/binary>>,
{ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
?assertMatch(
#{<<"values">> := [[36]]},
emqx_utils_json:decode(IoTDBResult)
).
t_async_query(Config) -> t_async_query(Config) ->
DeviceId = iotdb_device(Config), DeviceId = iotdb_device(Config),
@ -185,54 +211,110 @@ t_async_query(Config) ->
fun(Result) -> fun(Result) ->
?assertEqual(ok, element(1, Result)) ?assertEqual(ok, element(1, Result))
end, end,
emqx_bridge_testlib:t_async_query( ok = emqx_bridge_testlib:t_async_query(
Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query_async Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query_async
),
Query = <<"select temp from ", DeviceId/binary>>,
{ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
?assertMatch(
#{<<"values">> := [[36]]},
emqx_utils_json:decode(IoTDBResult)
). ).
t_sync_query_aggregated(Config) -> t_sync_query_aggregated(Config) ->
DeviceId = iotdb_device(Config), DeviceId = iotdb_device(Config),
Payload = [ Payload = [
make_iotdb_payload(DeviceId, "temp", "INT32", "36"), make_iotdb_payload(DeviceId, "temp", "INT32", "36", 1685112026290),
make_iotdb_payload(DeviceId, "temp", "INT32", 36), make_iotdb_payload(DeviceId, "temp", "INT32", 37, 1685112026291),
make_iotdb_payload(DeviceId, "temp", "INT32", 36.7), make_iotdb_payload(DeviceId, "temp", "INT32", 38.7, 1685112026292),
(make_iotdb_payload(DeviceId, "temp", "INT32", "37"))#{timestamp => <<"now_us">>}, make_iotdb_payload(DeviceId, "temp", "INT32", "39", <<"1685112026293">>),
(make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{timestamp => <<"now_ns">>}, make_iotdb_payload(DeviceId, "temp", "INT64", "36", 1685112026294),
(make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ make_iotdb_payload(DeviceId, "temp", "INT64", 36, 1685112026295),
timestamp => <<"1685112026296">> make_iotdb_payload(DeviceId, "temp", "INT64", 36.7, 1685112026296),
}, %% implicit 'now()' timestamp
(make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ make_iotdb_payload(DeviceId, "temp", "INT32", "40"),
timestamp => 1685112026296 %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB
}, (make_iotdb_payload(DeviceId, "temp", "INT32", "41"))#{timestamp => <<"now_us">>},
make_iotdb_payload(DeviceId, "temp", "INT64", "36"), (make_iotdb_payload(DeviceId, "temp", "INT32", "42"))#{timestamp => <<"now_ns">>},
make_iotdb_payload(DeviceId, "temp", "INT64", 36),
make_iotdb_payload(DeviceId, "temp", "INT64", 36.7), make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", 1685112026290),
make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1"), make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, 1685112026291),
make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1), make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, 1685112026292),
make_iotdb_payload(DeviceId, "started", "BOOLEAN", true), make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", 1685112026293),
make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true"), make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, 1685112026294),
make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE"), make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, 1685112026295),
make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True"),
make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0"), make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", 1685112026300),
make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 0), make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, 1685112026300),
make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false), make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, 1685112026300),
make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false"), make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", 1685112026300),
make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE"), make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", 1685112026300),
make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False"), make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", 1685112026300),
make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null), make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", 1685112026300),
make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3"), make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, 1685112026300),
make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3), make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, 1685112026300),
make_iotdb_payload(DeviceId, "weight", "FLOAT", 87), make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", 1685112026300),
make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3"), make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", 1685112026300),
make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3), make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", 1685112026300),
make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87), make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, 1685112026300),
make_iotdb_payload(DeviceId, "foo", "TEXT", "bar")
make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300)
], ],
MakeMessageFun = make_message_fun(DeviceId, Payload), MakeMessageFun = make_message_fun(DeviceId, Payload),
IsSuccessCheck = IsSuccessCheck =
fun(Result) -> fun(Result) ->
?assertEqual(ok, element(1, Result)) ?assertEqual(ok, element(1, Result))
end, end,
emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). ok = emqx_bridge_testlib:t_sync_query(
Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query
),
%% check temp
QueryTemp = <<"select temp from ", DeviceId/binary>>,
{ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp),
?assertMatch(
#{<<"values">> := [[36, 37, 38, 39, 36, 36, 36, 40, 41, 42]]},
emqx_utils_json:decode(ResultTemp)
),
%% check weight
QueryWeight = <<"select weight from ", DeviceId/binary>>,
{ok, {{_, 200, _}, _, ResultWeight}} = iotdb_query(Config, QueryWeight),
?assertMatch(
#{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]},
emqx_utils_json:decode(ResultWeight)
),
%% check rest ts = 1685112026300
QueryRest = <<"select * from ", DeviceId/binary, " where time = 1685112026300">>,
{ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest),
#{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode(
ResultRest
),
Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)),
Exp = #{
exp(DeviceId, "charged") => true,
exp(DeviceId, "floated") => true,
exp(DeviceId, "started") => true,
exp(DeviceId, "stoked") => true,
exp(DeviceId, "enriched") => true,
exp(DeviceId, "gutted") => true,
exp(DeviceId, "drained") => false,
exp(DeviceId, "toasted") => false,
exp(DeviceId, "uncharted") => false,
exp(DeviceId, "dazzled") => false,
exp(DeviceId, "unplugged") => false,
exp(DeviceId, "unraveled") => false,
exp(DeviceId, "undecided") => null,
exp(DeviceId, "foo") => <<"bar">>,
exp(DeviceId, "temp") => null,
exp(DeviceId, "weight") => null
},
?assertEqual(Exp, Results),
ok.
exp(Dev, M0) ->
M = s_to_b(M0),
<<Dev/binary, ".", M/binary>>.
t_sync_query_fail(Config) -> t_sync_query_fail(Config) ->
DeviceId = iotdb_device(Config), DeviceId = iotdb_device(Config),