diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 5b4323e7c..025451988 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -32,7 +32,7 @@ init_per_group(TestGroup, BridgeType, Config) -> {ok, _} = application:ensure_all_started(emqx_connector), emqx_mgmt_api_test_util:init_suite(), UniqueNum = integer_to_binary(erlang:unique_integer([positive])), - MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>, + MQTTTopic = <<"mqtt/topic/abc", UniqueNum/binary>>, [ {proxy_host, ProxyHost}, {proxy_port, ProxyPort}, diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 3b3f323b0..0811807db 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -8,6 +8,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(BRIDGE_TYPE_BIN, <<"iotdb">>). -define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_iotdb]). @@ -143,9 +144,15 @@ make_message_fun(Topic, Payload) -> } end. +iotdb_topic(Config) -> + ?config(mqtt_topic, Config). + iotdb_device(Config) -> - MQTTTopic = ?config(mqtt_topic, Config), - Device = re:replace(MQTTTopic, "/", ".dev", [global, {return, binary}]), + Topic = iotdb_topic(Config), + topic_to_iotdb_device(Topic). + +topic_to_iotdb_device(Topic) -> + Device = re:replace(Topic, "/", ".", [global, {return, binary}]), <<"root.", Device/binary>>. iotdb_request(Config, Path, Body) -> @@ -172,6 +179,9 @@ iotdb_request(Config, Path, Body, Opts) -> iotdb_reset(Config) -> Device = iotdb_device(Config), + iotdb_reset(Config, Device). + +iotdb_reset(Config, Device) -> Body = #{sql => <<"delete from ", Device/binary, ".*">>}, {ok, _} = iotdb_request(Config, <<"/rest/v2/nonQuery">>, Body). @@ -181,6 +191,12 @@ iotdb_query(Config, Query) -> Body = #{sql => Query}, iotdb_request(Config, Path, Body, Opts). +is_success_check({ok, 200, _, Body}) -> + ?assert(is_code(200, emqx_utils_json:decode(Body))). + +is_code(Code, #{<<"code">> := Code}) -> true; +is_code(_, _) -> false. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -188,13 +204,9 @@ iotdb_query(Config, Query) -> t_sync_query_simple(Config) -> DeviceId = iotdb_device(Config), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), - MakeMessageFun = make_message_fun(DeviceId, Payload), - IsSuccessCheck = - fun(Result) -> - ?assertEqual(ok, element(1, Result)) - end, + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), ok = emqx_bridge_testlib:t_sync_query( - Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query + Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query ), Query = <<"select temp from ", DeviceId/binary>>, {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query), @@ -206,13 +218,9 @@ t_sync_query_simple(Config) -> t_async_query(Config) -> DeviceId = iotdb_device(Config), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), - MakeMessageFun = make_message_fun(DeviceId, Payload), - IsSuccessCheck = - fun(Result) -> - ?assertEqual(ok, element(1, Result)) - end, + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), ok = emqx_bridge_testlib:t_async_query( - Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query_async + Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async ), Query = <<"select temp from ", DeviceId/binary>>, {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query), @@ -260,13 +268,9 @@ t_sync_query_aggregated(Config) -> make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300) ], - MakeMessageFun = make_message_fun(DeviceId, Payload), - IsSuccessCheck = - fun(Result) -> - ?assertEqual(ok, element(1, Result)) - end, + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), ok = emqx_bridge_testlib:t_sync_query( - Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query + Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query ), %% check temp @@ -319,7 +323,7 @@ exp(Dev, M0) -> t_sync_query_fail(Config) -> DeviceId = iotdb_device(Config), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"), - MakeMessageFun = make_message_fun(DeviceId, Payload), + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), IsSuccessCheck = fun(Result) -> ?assertEqual(error, element(1, Result)) @@ -327,7 +331,6 @@ t_sync_query_fail(Config) -> emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). t_sync_query_badpayload(Config) -> - DeviceId = iotdb_device(Config), BadPayload = #{foo => bar}, IsSuccessCheck = fun(Result) -> @@ -335,14 +338,13 @@ t_sync_query_badpayload(Config) -> end, emqx_bridge_testlib:t_sync_query( Config, - make_message_fun(DeviceId, BadPayload), + make_message_fun(iotdb_topic(Config), BadPayload), IsSuccessCheck, iotdb_bridge_on_query ), ok. t_async_query_badpayload(Config) -> - DeviceId = iotdb_device(Config), BadPayload = #{foo => bar}, IsSuccessCheck = fun(Result) -> @@ -350,7 +352,7 @@ t_async_query_badpayload(Config) -> end, emqx_bridge_testlib:t_async_query( Config, - make_message_fun(DeviceId, BadPayload), + make_message_fun(iotdb_topic(Config), BadPayload), IsSuccessCheck, iotdb_bridge_on_query_async ), @@ -364,3 +366,78 @@ t_start_stop(Config) -> t_on_get_status(Config) -> emqx_bridge_testlib:t_on_get_status(Config). + +t_device_id(Config) -> + ResourceId = emqx_bridge_testlib:resource_id(Config), + %% Create without device_id configured + ?assertMatch({ok, _}, emqx_bridge_testlib:create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ConfiguredDevice = <<"root.someOtherDevice234">>, + DeviceId = <<"root.deviceFooBar123">>, + Topic = <<"some/random/topic">>, + TopicDevice = topic_to_iotdb_device(Topic), + iotdb_reset(Config, DeviceId), + iotdb_reset(Config, TopicDevice), + iotdb_reset(Config, ConfiguredDevice), + Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true), + MessageF1 = make_message_fun(Topic, Payload1), + ?assertNotEqual(DeviceId, TopicDevice), + is_success_check( + emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) + ), + {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), + #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1), + ?assertNotEqual([], Values1_1), + {ok, {{_, 200, _}, _, Res1_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), + #{<<"values">> := Values1_2} = emqx_utils_json:decode(Res1_2), + ?assertEqual([], Values1_2), + + %% test without device_id in message, taking it from topic + iotdb_reset(Config, DeviceId), + iotdb_reset(Config, TopicDevice), + iotdb_reset(Config, ConfiguredDevice), + Payload2 = maps:remove(device_id, make_iotdb_payload(DeviceId, "root", "BOOLEAN", true)), + MessageF2 = make_message_fun(Topic, Payload2), + is_success_check( + emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF2()}) + ), + {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), + #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1), + ?assertEqual([], Values2_1), + {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), + #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2), + ?assertNotEqual([], Values2_2), + + iotdb_reset(Config, DeviceId), + iotdb_reset(Config, TopicDevice), + iotdb_reset(Config, ConfiguredDevice), + + %% reconfigure bridge with device_id + {ok, _} = + emqx_bridge_testlib:update_bridge_api(Config, #{<<"device_id">> => ConfiguredDevice}), + + is_success_check( + emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) + ), + + %% even though we had a device_id in the message it's not being used + {ok, {{_, 200, _}, _, Res3_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), + #{<<"values">> := Values3_1} = emqx_utils_json:decode(Res3_1), + ?assertEqual([], Values3_1), + {ok, {{_, 200, _}, _, Res3_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), + #{<<"values">> := Values3_2} = emqx_utils_json:decode(Res3_2), + ?assertEqual([], Values3_2), + {ok, {{_, 200, _}, _, Res3_3}} = iotdb_query( + Config, <<"select * from ", ConfiguredDevice/binary>> + ), + #{<<"values">> := Values3_3} = emqx_utils_json:decode(Res3_3), + ?assertNotEqual([], Values3_3), + + iotdb_reset(Config, DeviceId), + iotdb_reset(Config, TopicDevice), + iotdb_reset(Config, ConfiguredDevice), + ok.