diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 54f60bbc2..a6569e067 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -67,7 +67,7 @@ end_per_group(_Group, _Config) -> init_per_testcase(TestCase, Config0) -> Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3), - reset_service(Config), + iotdb_reset(Config), Config. end_per_testcase(TestCase, Config) -> @@ -76,6 +76,13 @@ end_per_testcase(TestCase, Config) -> %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ +iotdb_server_url(Host, Port) -> + iolist_to_binary([ + "http://", + Host, + ":", + integer_to_binary(Port) + ]). bridge_config(TestCase, _TestGroup, Config) -> UniqueNum = integer_to_binary(erlang:unique_integer()), @@ -84,12 +91,7 @@ bridge_config(TestCase, _TestGroup, Config) -> Name = << (atom_to_binary(TestCase))/binary, UniqueNum/binary >>, - ServerURL = iolist_to_binary([ - "http://", - Host, - ":", - integer_to_binary(Port) - ]), + ServerURL = iotdb_server_url(Host, Port), ConfigString = io_lib:format( "bridges.iotdb.~s {\n" @@ -114,36 +116,19 @@ bridge_config(TestCase, _TestGroup, Config) -> ), {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}. -reset_service(Config) -> - _BridgeConfig = - #{ - <<"base_url">> := BaseURL, - <<"authentication">> := #{ - <<"username">> := Username, - <<"password">> := Password - } - } = - ?config(bridge_config, Config), - ct:pal("bridge config: ~p", [_BridgeConfig]), - Path = <>, - BasicToken = base64:encode(<>), - Headers = [ - {"Content-type", "application/json"}, - {"Authorization", binary_to_list(BasicToken)} - ], - Device = iotdb_device(Config), - Body = #{sql => <<"delete from ", Device/binary, ".*">>}, - {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Headers, Body, #{}). - make_iotdb_payload(DeviceId, Measurement, Type, Value) -> #{ measurement => s_to_b(Measurement), data_type => s_to_b(Type), value => s_to_b(Value), device_id => DeviceId, - is_aligned => false + is_aligned => true }. +make_iotdb_payload(DeviceId, Measurement, Type, Value, Timestamp) -> + Payload = make_iotdb_payload(DeviceId, Measurement, Type, Value), + Payload#{timestamp => Timestamp}. + s_to_b(S) when is_list(S) -> list_to_binary(S); s_to_b(V) -> V. @@ -163,6 +148,39 @@ iotdb_device(Config) -> Device = re:replace(MQTTTopic, "/", ".dev", [global, {return, binary}]), <<"root.", Device/binary>>. +iotdb_request(Config, Path, Body) -> + iotdb_request(Config, Path, Body, #{}). + +iotdb_request(Config, Path, Body, Opts) -> + _BridgeConfig = + #{ + <<"base_url">> := BaseURL, + <<"authentication">> := #{ + <<"username">> := Username, + <<"password">> := Password + } + } = + ?config(bridge_config, Config), + ct:pal("bridge config: ~p", [_BridgeConfig]), + URL = <>, + BasicToken = base64:encode(<>), + Headers = [ + {"Content-type", "application/json"}, + {"Authorization", binary_to_list(BasicToken)} + ], + emqx_mgmt_api_test_util:request_api(post, URL, "", Headers, Body, Opts). + +iotdb_reset(Config) -> + Device = iotdb_device(Config), + Body = #{sql => <<"delete from ", Device/binary, ".*">>}, + {ok, _} = iotdb_request(Config, <<"/rest/v2/nonQuery">>, Body). + +iotdb_query(Config, Query) -> + Path = <<"/rest/v2/query">>, + Opts = #{return_all => true}, + Body = #{sql => Query}, + iotdb_request(Config, Path, Body, Opts). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -175,7 +193,15 @@ t_sync_query_simple(Config) -> fun(Result) -> ?assertEqual(ok, element(1, Result)) end, - emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). + ok = emqx_bridge_testlib:t_sync_query( + Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query + ), + Query = <<"select temp from ", DeviceId/binary>>, + {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query), + ?assertMatch( + #{<<"values">> := [[36]]}, + emqx_utils_json:decode(IoTDBResult) + ). t_async_query(Config) -> DeviceId = iotdb_device(Config), @@ -185,54 +211,110 @@ t_async_query(Config) -> fun(Result) -> ?assertEqual(ok, element(1, Result)) end, - emqx_bridge_testlib:t_async_query( + ok = emqx_bridge_testlib:t_async_query( Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query_async + ), + Query = <<"select temp from ", DeviceId/binary>>, + {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query), + ?assertMatch( + #{<<"values">> := [[36]]}, + emqx_utils_json:decode(IoTDBResult) ). t_sync_query_aggregated(Config) -> DeviceId = iotdb_device(Config), Payload = [ - make_iotdb_payload(DeviceId, "temp", "INT32", "36"), - make_iotdb_payload(DeviceId, "temp", "INT32", 36), - make_iotdb_payload(DeviceId, "temp", "INT32", 36.7), - (make_iotdb_payload(DeviceId, "temp", "INT32", "37"))#{timestamp => <<"now_us">>}, - (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{timestamp => <<"now_ns">>}, - (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ - timestamp => <<"1685112026296">> - }, - (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ - timestamp => 1685112026296 - }, - make_iotdb_payload(DeviceId, "temp", "INT64", "36"), - make_iotdb_payload(DeviceId, "temp", "INT64", 36), - make_iotdb_payload(DeviceId, "temp", "INT64", 36.7), - make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1"), - make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1), - make_iotdb_payload(DeviceId, "started", "BOOLEAN", true), - make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true"), - make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE"), - make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True"), - make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0"), - make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 0), - make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false), - make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false"), - make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE"), - make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False"), - make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null), - make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3"), - make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3), - make_iotdb_payload(DeviceId, "weight", "FLOAT", 87), - make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3"), - make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3), - make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87), - make_iotdb_payload(DeviceId, "foo", "TEXT", "bar") + make_iotdb_payload(DeviceId, "temp", "INT32", "36", 1685112026290), + make_iotdb_payload(DeviceId, "temp", "INT32", 37, 1685112026291), + make_iotdb_payload(DeviceId, "temp", "INT32", 38.7, 1685112026292), + make_iotdb_payload(DeviceId, "temp", "INT32", "39", <<"1685112026293">>), + make_iotdb_payload(DeviceId, "temp", "INT64", "36", 1685112026294), + make_iotdb_payload(DeviceId, "temp", "INT64", 36, 1685112026295), + make_iotdb_payload(DeviceId, "temp", "INT64", 36.7, 1685112026296), + %% implicit 'now()' timestamp + make_iotdb_payload(DeviceId, "temp", "INT32", "40"), + %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB + (make_iotdb_payload(DeviceId, "temp", "INT32", "41"))#{timestamp => <<"now_us">>}, + (make_iotdb_payload(DeviceId, "temp", "INT32", "42"))#{timestamp => <<"now_ns">>}, + + make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", 1685112026290), + make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, 1685112026291), + make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, 1685112026292), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", 1685112026293), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, 1685112026294), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, 1685112026295), + + make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", 1685112026300), + make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, 1685112026300), + make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, 1685112026300), + make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", 1685112026300), + make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", 1685112026300), + make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", 1685112026300), + make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", 1685112026300), + make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, 1685112026300), + make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, 1685112026300), + make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", 1685112026300), + make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", 1685112026300), + make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", 1685112026300), + make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, 1685112026300), + + make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300) ], MakeMessageFun = make_message_fun(DeviceId, Payload), IsSuccessCheck = fun(Result) -> ?assertEqual(ok, element(1, Result)) end, - emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). + ok = emqx_bridge_testlib:t_sync_query( + Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query + ), + + %% check temp + QueryTemp = <<"select temp from ", DeviceId/binary>>, + {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp), + ?assertMatch( + #{<<"values">> := [[36, 37, 38, 39, 36, 36, 36, 40, 41, 42]]}, + emqx_utils_json:decode(ResultTemp) + ), + %% check weight + QueryWeight = <<"select weight from ", DeviceId/binary>>, + {ok, {{_, 200, _}, _, ResultWeight}} = iotdb_query(Config, QueryWeight), + ?assertMatch( + #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]}, + emqx_utils_json:decode(ResultWeight) + ), + %% check rest ts = 1685112026300 + QueryRest = <<"select * from ", DeviceId/binary, " where time = 1685112026300">>, + {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest), + #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode( + ResultRest + ), + Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)), + Exp = #{ + exp(DeviceId, "charged") => true, + exp(DeviceId, "floated") => true, + exp(DeviceId, "started") => true, + exp(DeviceId, "stoked") => true, + exp(DeviceId, "enriched") => true, + exp(DeviceId, "gutted") => true, + exp(DeviceId, "drained") => false, + exp(DeviceId, "toasted") => false, + exp(DeviceId, "uncharted") => false, + exp(DeviceId, "dazzled") => false, + exp(DeviceId, "unplugged") => false, + exp(DeviceId, "unraveled") => false, + exp(DeviceId, "undecided") => null, + exp(DeviceId, "foo") => <<"bar">>, + exp(DeviceId, "temp") => null, + exp(DeviceId, "weight") => null + }, + ?assertEqual(Exp, Results), + + ok. + +exp(Dev, M0) -> + M = s_to_b(M0), + <>. t_sync_query_fail(Config) -> DeviceId = iotdb_device(Config),