From 6f54220a518e62b952194630b82f962f72e57f8c Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 26 May 2023 13:52:16 +0200 Subject: [PATCH 01/12] feat(emqx_bridge_iotdb): handle bad message format gracefully --- .../src/emqx_bridge_iotdb_impl.erl | 118 +++++++++++------- 1 file changed, 73 insertions(+), 45 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index f19fc0839..abb09ad5c 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -110,15 +110,19 @@ on_query(InstanceId, {send_message, Message}, State) -> send_message => Message, state => emqx_utils:redact(State) }), - IoTDBPayload = make_iotdb_insert_request(Message, State), - handle_response( - emqx_connector_http:on_query( - InstanceId, {send_message, IoTDBPayload}, State - ) - ). + case make_iotdb_insert_request(Message, State) of + {ok, IoTDBPayload} -> + handle_response( + emqx_connector_http:on_query( + InstanceId, {send_message, IoTDBPayload}, State + ) + ); + Error -> + Error + end. -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) -> - {ok, pid()}. + {ok, pid()} | {error, empty_request}. on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> ?SLOG(debug, #{ msg => "iotdb_bridge_on_query_async_called", @@ -126,18 +130,22 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> send_message => Message, state => emqx_utils:redact(State) }), - IoTDBPayload = make_iotdb_insert_request(Message, State), - ReplyFunAndArgs = - { - fun(Result) -> - Response = handle_response(Result), - emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) - end, - [] - }, - emqx_connector_http:on_query_async( - InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State - ). + case make_iotdb_insert_request(Message, State) of + {ok, IoTDBPayload} -> + ReplyFunAndArgs = + { + fun(Result) -> + Response = handle_response(Result), + emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) + end, + [] + }, + emqx_connector_http:on_query_async( + InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State + ); + Error -> + Error + end. %%-------------------------------------------------------------------- %% Internal Functions @@ -160,27 +168,42 @@ make_parsed_payload( <<"value">> => Value }. +preproc_data_list(DataList) -> + lists:foldl( + fun preproc_data/2, + [], + DataList + ). + preproc_data( #{ <<"measurement">> := Measurement, <<"data_type">> := DataType, <<"value">> := Value - } = Data + } = Data, + Acc ) -> - #{ - timestamp => emqx_plugin_libs_rule:preproc_tmpl( - maps:get(<<"timestamp">>, Data, <<"now">>) - ), - measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), - data_type => DataType, - value => emqx_plugin_libs_rule:preproc_tmpl(Value) - }. - -preproc_data_list(DataList) -> - lists:map( - fun preproc_data/1, - DataList - ). + [ + #{ + timestamp => emqx_plugin_libs_rule:preproc_tmpl( + maps:get(<<"timestamp">>, Data, <<"now">>) + ), + measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + data_type => DataType, + value => emqx_plugin_libs_rule:preproc_tmpl(Value) + } + | Acc + ]; +preproc_data(_NoMatch, Acc) -> + ?SLOG( + warning, + #{ + msg => "iotdb_bridge_preproc_data_failed", + required_fields => ['measurement', 'data_type', 'value'], + received => _NoMatch + } + ), + Acc. proc_data(PreProcessedData, Msg) -> NowNS = erlang:system_time(nanosecond), @@ -282,18 +305,23 @@ make_iotdb_insert_request(MessageUnparsedPayload, State) -> DeviceId = device_id(Message, State), IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X), Payload = make_list(maps:get(payload, Message)), - PreProcessedData = preproc_data_list(Payload), - DataList = proc_data(PreProcessedData, Message), - InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, - Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), - maps:merge(Rows, #{ - iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, - iotdb_field_key(device_id, IotDBVsn) => DeviceId - }). + case preproc_data_list(Payload) of + [] -> + {error, invalid_data}; + PreProcessedData -> + DataList = proc_data(PreProcessedData, Message), + InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, + Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), + {ok, + maps:merge(Rows, #{ + iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, + iotdb_field_key(device_id, IotDBVsn) => DeviceId + })} + end. -replace_dtypes(Rows, IotDBVsn) -> - {Types, Map} = maps:take(dtypes, Rows), - Map#{iotdb_field_key(data_types, IotDBVsn) => Types}. +replace_dtypes(Rows0, IotDBVsn) -> + {Types, Rows} = maps:take(dtypes, Rows0), + Rows#{iotdb_field_key(data_types, IotDBVsn) => Types}. aggregate_rows(DataList, InitAcc) -> lists:foldr( From 3d3f2a223c2461852d26a628751268ffef3606e4 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 26 May 2023 18:16:05 +0200 Subject: [PATCH 02/12] test: test more value conversions --- apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src | 2 +- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index cebf60cb1..a3e4f1eb3 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_impl diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 434587cf0..d21d0c372 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -191,14 +191,19 @@ t_sync_query_aggregated(Config) -> DeviceId = iotdb_device(Config), Payload = [ make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), - (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "37"))#{timestamp => <<"mow_us">>}, - (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{timestamp => <<"mow_ns">>}, + (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "37"))#{timestamp => <<"now_us">>}, + (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{timestamp => <<"now_ns">>}, + (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{ + timestamp => <<"1685112026296">> + }, make_iotdb_payload(DeviceId, "charged", <<"BOOLEAN">>, "1"), make_iotdb_payload(DeviceId, "stoked", <<"BOOLEAN">>, "true"), make_iotdb_payload(DeviceId, "enriched", <<"BOOLEAN">>, <<"TRUE">>), + make_iotdb_payload(DeviceId, "gutted", <<"BOOLEAN">>, <<"True">>), make_iotdb_payload(DeviceId, "drained", <<"BOOLEAN">>, "0"), make_iotdb_payload(DeviceId, "dazzled", <<"BOOLEAN">>, "false"), make_iotdb_payload(DeviceId, "unplugged", <<"BOOLEAN">>, <<"FALSE">>), + make_iotdb_payload(DeviceId, "unraveled", <<"BOOLEAN">>, <<"False">>), make_iotdb_payload(DeviceId, "weight", <<"FLOAT">>, "87.3"), make_iotdb_payload(DeviceId, "foo", <<"TEXT">>, <<"bar">>) ], From 64d582770d6323bedbf3f40b80d5ff4476568ad4 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Fri, 26 May 2023 18:16:26 +0200 Subject: [PATCH 03/12] test: add tracepoints --- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 25 ++++++++++++++----- .../src/emqx_bridge_iotdb_impl.erl | 4 ++- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 10 +++++--- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 47f29aa36..383290027 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -203,7 +203,7 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config) -> %% Testcases %%------------------------------------------------------------------------------ -t_sync_query(Config, MakeMessageFun, IsSuccessCheck) -> +t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> ResourceId = resource_id(Config), ?check_trace( begin @@ -217,11 +217,13 @@ t_sync_query(Config, MakeMessageFun, IsSuccessCheck) -> IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)), ok end, - [] + fun(Trace) -> + ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace)) + end ), ok. -t_async_query(Config, MakeMessageFun, IsSuccessCheck) -> +t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> ResourceId = resource_id(Config), ReplyFun = fun(Pid, Result) -> @@ -236,10 +238,21 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck) -> ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), Message = {send_message, MakeMessageFun()}, - emqx_resource:query(ResourceId, Message, #{async_reply_fun => {ReplyFun, [self()]}}), + ?assertMatch( + {ok, {ok, _}}, + ?wait_async_action( + emqx_resource:query(ResourceId, Message, #{ + async_reply_fun => {ReplyFun, [self()]} + }), + #{?snk_kind := TracePoint, instance_id := ResourceId}, + 5_000 + ) + ), ok end, - [] + fun(Trace) -> + ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace)) + end ), receive {result, Result} -> IsSuccessCheck(Result) @@ -318,7 +331,7 @@ t_start_stop(Config, StopTracePoint) -> end, fun(Trace) -> %% one for each probe, one for real - ?assertMatch([_, _, _], ?of_kind(StopTracePoint, Trace)), + ?assertMatch([_, _, #{instance_id := ResourceId}], ?of_kind(StopTracePoint, Trace)), ok end ), diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index abb09ad5c..667266c82 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -72,7 +72,7 @@ on_start(InstanceId, Config) -> instance_id => InstanceId, request => maps:get(request, State, <<>>) }), - ?tp(iotdb_bridge_started, #{}), + ?tp(iotdb_bridge_started, #{instance_id => InstanceId}), {ok, maps:merge(Config, State)}; {error, Reason} -> ?SLOG(error, #{ @@ -104,6 +104,7 @@ on_get_status(InstanceId, State) -> | {ok, pos_integer(), [term()]} | {error, term()}. on_query(InstanceId, {send_message, Message}, State) -> + ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}), ?SLOG(debug, #{ msg => "iotdb_bridge_on_query_called", instance_id => InstanceId, @@ -124,6 +125,7 @@ on_query(InstanceId, {send_message, Message}, State) -> -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) -> {ok, pid()} | {error, empty_request}. on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> + ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}), ?SLOG(debug, #{ msg => "iotdb_bridge_on_query_async_called", instance_id => InstanceId, diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index d21d0c372..5062fb6b9 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -175,7 +175,7 @@ t_sync_query_simple(Config) -> fun(Result) -> ?assertEqual(ok, element(1, Result)) end, - emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck). + emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). t_async_query(Config) -> DeviceId = iotdb_device(Config), @@ -185,7 +185,9 @@ t_async_query(Config) -> fun(Result) -> ?assertEqual(ok, element(1, Result)) end, - emqx_bridge_testlib:t_async_query(Config, MakeMessageFun, IsSuccessCheck). + emqx_bridge_testlib:t_async_query( + Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query_async + ). t_sync_query_aggregated(Config) -> DeviceId = iotdb_device(Config), @@ -212,7 +214,7 @@ t_sync_query_aggregated(Config) -> fun(Result) -> ?assertEqual(ok, element(1, Result)) end, - emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck). + emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). t_sync_query_fail(Config) -> DeviceId = iotdb_device(Config), @@ -222,7 +224,7 @@ t_sync_query_fail(Config) -> fun(Result) -> ?assertEqual(error, element(1, Result)) end, - emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck). + emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). t_create_via_http(Config) -> emqx_bridge_testlib:t_create_via_http(Config). From 1381b54a8d46e072e6aab2051535cf9465ceb6fb Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 29 May 2023 12:07:18 +0200 Subject: [PATCH 04/12] fix(emqx_bridge_iotdb): allow non-binary values --- .../src/emqx_bridge_iotdb_impl.erl | 7 ++- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 60 ++++++++++++------- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 667266c82..289b8f027 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -192,7 +192,7 @@ preproc_data( ), measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), data_type => DataType, - value => emqx_plugin_libs_rule:preproc_tmpl(Value) + value => maybe_preproc_tmpl(Value) } | Acc ]; @@ -207,6 +207,11 @@ preproc_data(_NoMatch, Acc) -> ), Acc. +maybe_preproc_tmpl(Value) when is_binary(Value) -> + emqx_plugin_libs_rule:preproc_tmpl(Value); +maybe_preproc_tmpl(Value) -> + Value. + proc_data(PreProcessedData, Msg) -> NowNS = erlang:system_time(nanosecond), Nows = #{ diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 5062fb6b9..65bb977d1 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -140,13 +140,16 @@ make_iotdb_payload(DeviceId) -> make_iotdb_payload(DeviceId, Measurement, Type, Value) -> #{ - measurement => Measurement, - data_type => Type, - value => Value, + measurement => s_to_b(Measurement), + data_type => s_to_b(Type), + value => s_to_b(Value), device_id => DeviceId, is_aligned => false }. +s_to_b(S) when is_list(S) -> list_to_binary(S); +s_to_b(V) -> V. + make_message_fun(Topic, Payload) -> fun() -> MsgId = erlang:unique_integer([positive]), @@ -169,7 +172,7 @@ iotdb_device(Config) -> t_sync_query_simple(Config) -> DeviceId = iotdb_device(Config), - Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), + Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), MakeMessageFun = make_message_fun(DeviceId, Payload), IsSuccessCheck = fun(Result) -> @@ -179,7 +182,7 @@ t_sync_query_simple(Config) -> t_async_query(Config) -> DeviceId = iotdb_device(Config), - Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), + Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), MakeMessageFun = make_message_fun(DeviceId, Payload), IsSuccessCheck = fun(Result) -> @@ -192,22 +195,39 @@ t_async_query(Config) -> t_sync_query_aggregated(Config) -> DeviceId = iotdb_device(Config), Payload = [ - make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), - (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "37"))#{timestamp => <<"now_us">>}, - (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{timestamp => <<"now_ns">>}, - (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{ + make_iotdb_payload(DeviceId, "temp", "INT32", "36"), + make_iotdb_payload(DeviceId, "temp", "INT32", 36), + make_iotdb_payload(DeviceId, "temp", "INT32", 36.7), + (make_iotdb_payload(DeviceId, "temp", "INT32", "37"))#{timestamp => <<"now_us">>}, + (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{timestamp => <<"now_ns">>}, + (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ timestamp => <<"1685112026296">> }, - make_iotdb_payload(DeviceId, "charged", <<"BOOLEAN">>, "1"), - make_iotdb_payload(DeviceId, "stoked", <<"BOOLEAN">>, "true"), - make_iotdb_payload(DeviceId, "enriched", <<"BOOLEAN">>, <<"TRUE">>), - make_iotdb_payload(DeviceId, "gutted", <<"BOOLEAN">>, <<"True">>), - make_iotdb_payload(DeviceId, "drained", <<"BOOLEAN">>, "0"), - make_iotdb_payload(DeviceId, "dazzled", <<"BOOLEAN">>, "false"), - make_iotdb_payload(DeviceId, "unplugged", <<"BOOLEAN">>, <<"FALSE">>), - make_iotdb_payload(DeviceId, "unraveled", <<"BOOLEAN">>, <<"False">>), - make_iotdb_payload(DeviceId, "weight", <<"FLOAT">>, "87.3"), - make_iotdb_payload(DeviceId, "foo", <<"TEXT">>, <<"bar">>) + make_iotdb_payload(DeviceId, "temp", "INT64", "36"), + make_iotdb_payload(DeviceId, "temp", "INT64", 36), + make_iotdb_payload(DeviceId, "temp", "INT64", 36.7), + (make_iotdb_payload(DeviceId, "temp", "INT64", "37"))#{timestamp => <<"now_us">>}, + (make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{timestamp => <<"now_ns">>}, + (make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{ + timestamp => <<"1685112026296">> + }, + make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1"), + make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1), + make_iotdb_payload(DeviceId, "started", "BOOLEAN", true), + make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true"), + make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE"), + make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True"), + make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0"), + make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 0), + make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false), + make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false"), + make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE"), + make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False"), + make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", undefined), + make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3"), + make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3), + make_iotdb_payload(DeviceId, "weight", "FLOAT", 87), + make_iotdb_payload(DeviceId, "foo", "TEXT", "bar") ], MakeMessageFun = make_message_fun(DeviceId, Payload), IsSuccessCheck = @@ -218,7 +238,7 @@ t_sync_query_aggregated(Config) -> t_sync_query_fail(Config) -> DeviceId = iotdb_device(Config), - Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "Anton"), + Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"), MakeMessageFun = make_message_fun(DeviceId, Payload), IsSuccessCheck = fun(Result) -> From bd92116ceec3298c81aba9accb18d8191d9b770a Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 29 May 2023 14:21:53 +0200 Subject: [PATCH 05/12] test: fix data in calls to look like real world --- .../src/emqx_bridge_iotdb_impl.erl | 20 ++++--------------- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 4 ++-- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 289b8f027..2146fb51c 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -154,21 +154,9 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> %%-------------------------------------------------------------------- make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) -> - emqx_utils_json:decode(PayloadUnparsed, [return_maps]); + emqx_utils_json:decode(PayloadUnparsed); make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) -> - lists:map(fun make_parsed_payload/1, PayloadUnparsed); -make_parsed_payload( - #{ - measurement := Measurement, - data_type := DataType, - value := Value - } = Data -) -> - Data#{ - <<"measurement">> => Measurement, - <<"data_type">> => DataType, - <<"value">> => Value - }. + lists:map(fun make_parsed_payload/1, PayloadUnparsed). preproc_data_list(DataList) -> lists:foldl( @@ -270,6 +258,7 @@ replace_var(Val, _Data) -> Val. convert_bool(B) when is_boolean(B) -> B; +convert_bool(null) -> null; convert_bool(1) -> true; convert_bool(0) -> false; convert_bool(<<"1">>) -> true; @@ -279,8 +268,7 @@ convert_bool(<<"True">>) -> true; convert_bool(<<"TRUE">>) -> true; convert_bool(<<"false">>) -> false; convert_bool(<<"False">>) -> false; -convert_bool(<<"FALSE">>) -> false; -convert_bool(undefined) -> null. +convert_bool(<<"FALSE">>) -> false. convert_int(Int) when is_integer(Int) -> Int; convert_int(Float) when is_float(Float) -> floor(Float); diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 65bb977d1..0a6f7d777 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -156,7 +156,7 @@ make_message_fun(Topic, Payload) -> #{ topic => Topic, id => MsgId, - payload => Payload, + payload => emqx_utils_json:encode(Payload), retain => true } end. @@ -223,7 +223,7 @@ t_sync_query_aggregated(Config) -> make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false"), make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE"), make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False"), - make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", undefined), + make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null), make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3"), make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3), make_iotdb_payload(DeviceId, "weight", "FLOAT", 87), From a3021c58f15e2e663082b122125219d6555e54f9 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 29 May 2023 14:45:30 +0200 Subject: [PATCH 06/12] fix(emqx_bridge_iotdb): allow integer timestamp --- .../src/emqx_bridge_iotdb_impl.erl | 11 +++++++---- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 14 ++++++-------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 2146fb51c..e8eb8efb0 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -175,7 +175,7 @@ preproc_data( ) -> [ #{ - timestamp => emqx_plugin_libs_rule:preproc_tmpl( + timestamp => maybe_preproc_tmpl( maps:get(<<"timestamp">>, Data, <<"now">>) ), measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), @@ -217,9 +217,7 @@ proc_data(PreProcessedData, Msg) -> } ) -> #{ - timestamp => iot_timestamp( - emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows - ), + timestamp => iot_timestamp(TimestampTkn, Msg, Nows), measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg), data_type => DataType, value => proc_value(DataType, ValueTkn, Msg) @@ -228,6 +226,11 @@ proc_data(PreProcessedData, Msg) -> PreProcessedData ). +iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) -> + Timestamp; +iot_timestamp(TimestampTkn, Msg, Nows) -> + iot_timestamp(emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows). + iot_timestamp(Timestamp, #{now_ms := NowMs}) when Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>> -> diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 0a6f7d777..54f60bbc2 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -135,9 +135,6 @@ reset_service(Config) -> Body = #{sql => <<"delete from ", Device/binary, ".*">>}, {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Headers, Body, #{}). -make_iotdb_payload(DeviceId) -> - make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"). - make_iotdb_payload(DeviceId, Measurement, Type, Value) -> #{ measurement => s_to_b(Measurement), @@ -203,14 +200,12 @@ t_sync_query_aggregated(Config) -> (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ timestamp => <<"1685112026296">> }, + (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ + timestamp => 1685112026296 + }, make_iotdb_payload(DeviceId, "temp", "INT64", "36"), make_iotdb_payload(DeviceId, "temp", "INT64", 36), make_iotdb_payload(DeviceId, "temp", "INT64", 36.7), - (make_iotdb_payload(DeviceId, "temp", "INT64", "37"))#{timestamp => <<"now_us">>}, - (make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{timestamp => <<"now_ns">>}, - (make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{ - timestamp => <<"1685112026296">> - }, make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1"), make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1), make_iotdb_payload(DeviceId, "started", "BOOLEAN", true), @@ -227,6 +222,9 @@ t_sync_query_aggregated(Config) -> make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3"), make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3), make_iotdb_payload(DeviceId, "weight", "FLOAT", 87), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3"), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87), make_iotdb_payload(DeviceId, "foo", "TEXT", "bar") ], MakeMessageFun = make_message_fun(DeviceId, Payload), From 860d7b169ace09625158d16d61abc956bbb88faf Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 29 May 2023 16:37:09 +0200 Subject: [PATCH 07/12] test: check data at iotdb instance --- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 214 ++++++++++++------ 1 file changed, 148 insertions(+), 66 deletions(-) diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 54f60bbc2..a6569e067 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -67,7 +67,7 @@ end_per_group(_Group, _Config) -> init_per_testcase(TestCase, Config0) -> Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3), - reset_service(Config), + iotdb_reset(Config), Config. end_per_testcase(TestCase, Config) -> @@ -76,6 +76,13 @@ end_per_testcase(TestCase, Config) -> %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ +iotdb_server_url(Host, Port) -> + iolist_to_binary([ + "http://", + Host, + ":", + integer_to_binary(Port) + ]). bridge_config(TestCase, _TestGroup, Config) -> UniqueNum = integer_to_binary(erlang:unique_integer()), @@ -84,12 +91,7 @@ bridge_config(TestCase, _TestGroup, Config) -> Name = << (atom_to_binary(TestCase))/binary, UniqueNum/binary >>, - ServerURL = iolist_to_binary([ - "http://", - Host, - ":", - integer_to_binary(Port) - ]), + ServerURL = iotdb_server_url(Host, Port), ConfigString = io_lib:format( "bridges.iotdb.~s {\n" @@ -114,36 +116,19 @@ bridge_config(TestCase, _TestGroup, Config) -> ), {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}. -reset_service(Config) -> - _BridgeConfig = - #{ - <<"base_url">> := BaseURL, - <<"authentication">> := #{ - <<"username">> := Username, - <<"password">> := Password - } - } = - ?config(bridge_config, Config), - ct:pal("bridge config: ~p", [_BridgeConfig]), - Path = <>, - BasicToken = base64:encode(<>), - Headers = [ - {"Content-type", "application/json"}, - {"Authorization", binary_to_list(BasicToken)} - ], - Device = iotdb_device(Config), - Body = #{sql => <<"delete from ", Device/binary, ".*">>}, - {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Headers, Body, #{}). - make_iotdb_payload(DeviceId, Measurement, Type, Value) -> #{ measurement => s_to_b(Measurement), data_type => s_to_b(Type), value => s_to_b(Value), device_id => DeviceId, - is_aligned => false + is_aligned => true }. +make_iotdb_payload(DeviceId, Measurement, Type, Value, Timestamp) -> + Payload = make_iotdb_payload(DeviceId, Measurement, Type, Value), + Payload#{timestamp => Timestamp}. + s_to_b(S) when is_list(S) -> list_to_binary(S); s_to_b(V) -> V. @@ -163,6 +148,39 @@ iotdb_device(Config) -> Device = re:replace(MQTTTopic, "/", ".dev", [global, {return, binary}]), <<"root.", Device/binary>>. +iotdb_request(Config, Path, Body) -> + iotdb_request(Config, Path, Body, #{}). + +iotdb_request(Config, Path, Body, Opts) -> + _BridgeConfig = + #{ + <<"base_url">> := BaseURL, + <<"authentication">> := #{ + <<"username">> := Username, + <<"password">> := Password + } + } = + ?config(bridge_config, Config), + ct:pal("bridge config: ~p", [_BridgeConfig]), + URL = <>, + BasicToken = base64:encode(<>), + Headers = [ + {"Content-type", "application/json"}, + {"Authorization", binary_to_list(BasicToken)} + ], + emqx_mgmt_api_test_util:request_api(post, URL, "", Headers, Body, Opts). + +iotdb_reset(Config) -> + Device = iotdb_device(Config), + Body = #{sql => <<"delete from ", Device/binary, ".*">>}, + {ok, _} = iotdb_request(Config, <<"/rest/v2/nonQuery">>, Body). + +iotdb_query(Config, Query) -> + Path = <<"/rest/v2/query">>, + Opts = #{return_all => true}, + Body = #{sql => Query}, + iotdb_request(Config, Path, Body, Opts). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -175,7 +193,15 @@ t_sync_query_simple(Config) -> fun(Result) -> ?assertEqual(ok, element(1, Result)) end, - emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). + ok = emqx_bridge_testlib:t_sync_query( + Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query + ), + Query = <<"select temp from ", DeviceId/binary>>, + {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query), + ?assertMatch( + #{<<"values">> := [[36]]}, + emqx_utils_json:decode(IoTDBResult) + ). t_async_query(Config) -> DeviceId = iotdb_device(Config), @@ -185,54 +211,110 @@ t_async_query(Config) -> fun(Result) -> ?assertEqual(ok, element(1, Result)) end, - emqx_bridge_testlib:t_async_query( + ok = emqx_bridge_testlib:t_async_query( Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query_async + ), + Query = <<"select temp from ", DeviceId/binary>>, + {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query), + ?assertMatch( + #{<<"values">> := [[36]]}, + emqx_utils_json:decode(IoTDBResult) ). t_sync_query_aggregated(Config) -> DeviceId = iotdb_device(Config), Payload = [ - make_iotdb_payload(DeviceId, "temp", "INT32", "36"), - make_iotdb_payload(DeviceId, "temp", "INT32", 36), - make_iotdb_payload(DeviceId, "temp", "INT32", 36.7), - (make_iotdb_payload(DeviceId, "temp", "INT32", "37"))#{timestamp => <<"now_us">>}, - (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{timestamp => <<"now_ns">>}, - (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ - timestamp => <<"1685112026296">> - }, - (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ - timestamp => 1685112026296 - }, - make_iotdb_payload(DeviceId, "temp", "INT64", "36"), - make_iotdb_payload(DeviceId, "temp", "INT64", 36), - make_iotdb_payload(DeviceId, "temp", "INT64", 36.7), - make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1"), - make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1), - make_iotdb_payload(DeviceId, "started", "BOOLEAN", true), - make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true"), - make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE"), - make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True"), - make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0"), - make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 0), - make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false), - make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false"), - make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE"), - make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False"), - make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null), - make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3"), - make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3), - make_iotdb_payload(DeviceId, "weight", "FLOAT", 87), - make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3"), - make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3), - make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87), - make_iotdb_payload(DeviceId, "foo", "TEXT", "bar") + make_iotdb_payload(DeviceId, "temp", "INT32", "36", 1685112026290), + make_iotdb_payload(DeviceId, "temp", "INT32", 37, 1685112026291), + make_iotdb_payload(DeviceId, "temp", "INT32", 38.7, 1685112026292), + make_iotdb_payload(DeviceId, "temp", "INT32", "39", <<"1685112026293">>), + make_iotdb_payload(DeviceId, "temp", "INT64", "36", 1685112026294), + make_iotdb_payload(DeviceId, "temp", "INT64", 36, 1685112026295), + make_iotdb_payload(DeviceId, "temp", "INT64", 36.7, 1685112026296), + %% implicit 'now()' timestamp + make_iotdb_payload(DeviceId, "temp", "INT32", "40"), + %% [FIXME] neither nanoseconds nor microseconds don't seem to be supported by IoTDB + (make_iotdb_payload(DeviceId, "temp", "INT32", "41"))#{timestamp => <<"now_us">>}, + (make_iotdb_payload(DeviceId, "temp", "INT32", "42"))#{timestamp => <<"now_ns">>}, + + make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3", 1685112026290), + make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3, 1685112026291), + make_iotdb_payload(DeviceId, "weight", "FLOAT", 87, 1685112026292), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3", 1685112026293), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3, 1685112026294), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87, 1685112026295), + + make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1", 1685112026300), + make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1, 1685112026300), + make_iotdb_payload(DeviceId, "started", "BOOLEAN", true, 1685112026300), + make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true", 1685112026300), + make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE", 1685112026300), + make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True", 1685112026300), + make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0", 1685112026300), + make_iotdb_payload(DeviceId, "toasted", "BOOLEAN", 0, 1685112026300), + make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false, 1685112026300), + make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false", 1685112026300), + make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE", 1685112026300), + make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False", 1685112026300), + make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", null, 1685112026300), + + make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300) ], MakeMessageFun = make_message_fun(DeviceId, Payload), IsSuccessCheck = fun(Result) -> ?assertEqual(ok, element(1, Result)) end, - emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). + ok = emqx_bridge_testlib:t_sync_query( + Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query + ), + + %% check temp + QueryTemp = <<"select temp from ", DeviceId/binary>>, + {ok, {{_, 200, _}, _, ResultTemp}} = iotdb_query(Config, QueryTemp), + ?assertMatch( + #{<<"values">> := [[36, 37, 38, 39, 36, 36, 36, 40, 41, 42]]}, + emqx_utils_json:decode(ResultTemp) + ), + %% check weight + QueryWeight = <<"select weight from ", DeviceId/binary>>, + {ok, {{_, 200, _}, _, ResultWeight}} = iotdb_query(Config, QueryWeight), + ?assertMatch( + #{<<"values">> := [[87.3, 87.3, 87.0, 87.3, 87.3, 87.0]]}, + emqx_utils_json:decode(ResultWeight) + ), + %% check rest ts = 1685112026300 + QueryRest = <<"select * from ", DeviceId/binary, " where time = 1685112026300">>, + {ok, {{_, 200, _}, _, ResultRest}} = iotdb_query(Config, QueryRest), + #{<<"values">> := Values, <<"expressions">> := Expressions} = emqx_utils_json:decode( + ResultRest + ), + Results = maps:from_list(lists:zipwith(fun(K, [V]) -> {K, V} end, Expressions, Values)), + Exp = #{ + exp(DeviceId, "charged") => true, + exp(DeviceId, "floated") => true, + exp(DeviceId, "started") => true, + exp(DeviceId, "stoked") => true, + exp(DeviceId, "enriched") => true, + exp(DeviceId, "gutted") => true, + exp(DeviceId, "drained") => false, + exp(DeviceId, "toasted") => false, + exp(DeviceId, "uncharted") => false, + exp(DeviceId, "dazzled") => false, + exp(DeviceId, "unplugged") => false, + exp(DeviceId, "unraveled") => false, + exp(DeviceId, "undecided") => null, + exp(DeviceId, "foo") => <<"bar">>, + exp(DeviceId, "temp") => null, + exp(DeviceId, "weight") => null + }, + ?assertEqual(Exp, Results), + + ok. + +exp(Dev, M0) -> + M = s_to_b(M0), + <>. t_sync_query_fail(Config) -> DeviceId = iotdb_device(Config), From 938d62a66694e5494538238a509e32b806635cb9 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Tue, 30 May 2023 14:44:26 +0200 Subject: [PATCH 08/12] test: handle bad payload --- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 1 + .../src/emqx_bridge_iotdb_impl.erl | 2 -- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 30 +++++++++++++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 383290027..5b4323e7c 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -116,6 +116,7 @@ create_bridge(Config, Overrides) -> Name = ?config(bridge_name, Config), BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), + ct:pal("creating bridge with config: ~p", [BridgeConfig]), emqx_bridge:create(BridgeType, Name, BridgeConfig). create_bridge_api(Config) -> diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index e8eb8efb0..ec7250a8d 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -403,8 +403,6 @@ device_id(Message, State) -> case maps:get(payload, Message) of #{<<"device_id">> := DeviceId} -> DeviceId; - #{device_id := DeviceId} -> - DeviceId; _NotFound -> Topic = maps:get(topic, Message), case re:replace(Topic, "/", ".", [global, {return, binary}]) of diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index a6569e067..3b3f323b0 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -326,6 +326,36 @@ t_sync_query_fail(Config) -> end, emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). +t_sync_query_badpayload(Config) -> + DeviceId = iotdb_device(Config), + BadPayload = #{foo => bar}, + IsSuccessCheck = + fun(Result) -> + ?assertEqual({error, invalid_data}, Result) + end, + emqx_bridge_testlib:t_sync_query( + Config, + make_message_fun(DeviceId, BadPayload), + IsSuccessCheck, + iotdb_bridge_on_query + ), + ok. + +t_async_query_badpayload(Config) -> + DeviceId = iotdb_device(Config), + BadPayload = #{foo => bar}, + IsSuccessCheck = + fun(Result) -> + ?assertEqual({error, invalid_data}, Result) + end, + emqx_bridge_testlib:t_async_query( + Config, + make_message_fun(DeviceId, BadPayload), + IsSuccessCheck, + iotdb_bridge_on_query_async + ), + ok. + t_create_via_http(Config) -> emqx_bridge_testlib:t_create_via_http(Config). From 767f7c57e74399015ea0b8fcb752725ec184d09e Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Tue, 30 May 2023 15:54:17 +0200 Subject: [PATCH 09/12] test: check device_id is used from topic or config --- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 2 +- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 127 ++++++++++++++---- 2 files changed, 103 insertions(+), 26 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 5b4323e7c..025451988 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -32,7 +32,7 @@ init_per_group(TestGroup, BridgeType, Config) -> {ok, _} = application:ensure_all_started(emqx_connector), emqx_mgmt_api_test_util:init_suite(), UniqueNum = integer_to_binary(erlang:unique_integer([positive])), - MQTTTopic = <<"mqtt/topic/", UniqueNum/binary>>, + MQTTTopic = <<"mqtt/topic/abc", UniqueNum/binary>>, [ {proxy_host, ProxyHost}, {proxy_port, ProxyPort}, diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 3b3f323b0..0811807db 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -8,6 +8,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(BRIDGE_TYPE_BIN, <<"iotdb">>). -define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_iotdb]). @@ -143,9 +144,15 @@ make_message_fun(Topic, Payload) -> } end. +iotdb_topic(Config) -> + ?config(mqtt_topic, Config). + iotdb_device(Config) -> - MQTTTopic = ?config(mqtt_topic, Config), - Device = re:replace(MQTTTopic, "/", ".dev", [global, {return, binary}]), + Topic = iotdb_topic(Config), + topic_to_iotdb_device(Topic). + +topic_to_iotdb_device(Topic) -> + Device = re:replace(Topic, "/", ".", [global, {return, binary}]), <<"root.", Device/binary>>. iotdb_request(Config, Path, Body) -> @@ -172,6 +179,9 @@ iotdb_request(Config, Path, Body, Opts) -> iotdb_reset(Config) -> Device = iotdb_device(Config), + iotdb_reset(Config, Device). + +iotdb_reset(Config, Device) -> Body = #{sql => <<"delete from ", Device/binary, ".*">>}, {ok, _} = iotdb_request(Config, <<"/rest/v2/nonQuery">>, Body). @@ -181,6 +191,12 @@ iotdb_query(Config, Query) -> Body = #{sql => Query}, iotdb_request(Config, Path, Body, Opts). +is_success_check({ok, 200, _, Body}) -> + ?assert(is_code(200, emqx_utils_json:decode(Body))). + +is_code(Code, #{<<"code">> := Code}) -> true; +is_code(_, _) -> false. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -188,13 +204,9 @@ iotdb_query(Config, Query) -> t_sync_query_simple(Config) -> DeviceId = iotdb_device(Config), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), - MakeMessageFun = make_message_fun(DeviceId, Payload), - IsSuccessCheck = - fun(Result) -> - ?assertEqual(ok, element(1, Result)) - end, + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), ok = emqx_bridge_testlib:t_sync_query( - Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query + Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query ), Query = <<"select temp from ", DeviceId/binary>>, {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query), @@ -206,13 +218,9 @@ t_sync_query_simple(Config) -> t_async_query(Config) -> DeviceId = iotdb_device(Config), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), - MakeMessageFun = make_message_fun(DeviceId, Payload), - IsSuccessCheck = - fun(Result) -> - ?assertEqual(ok, element(1, Result)) - end, + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), ok = emqx_bridge_testlib:t_async_query( - Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query_async + Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async ), Query = <<"select temp from ", DeviceId/binary>>, {ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query), @@ -260,13 +268,9 @@ t_sync_query_aggregated(Config) -> make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300) ], - MakeMessageFun = make_message_fun(DeviceId, Payload), - IsSuccessCheck = - fun(Result) -> - ?assertEqual(ok, element(1, Result)) - end, + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), ok = emqx_bridge_testlib:t_sync_query( - Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query + Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query ), %% check temp @@ -319,7 +323,7 @@ exp(Dev, M0) -> t_sync_query_fail(Config) -> DeviceId = iotdb_device(Config), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"), - MakeMessageFun = make_message_fun(DeviceId, Payload), + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), IsSuccessCheck = fun(Result) -> ?assertEqual(error, element(1, Result)) @@ -327,7 +331,6 @@ t_sync_query_fail(Config) -> emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). t_sync_query_badpayload(Config) -> - DeviceId = iotdb_device(Config), BadPayload = #{foo => bar}, IsSuccessCheck = fun(Result) -> @@ -335,14 +338,13 @@ t_sync_query_badpayload(Config) -> end, emqx_bridge_testlib:t_sync_query( Config, - make_message_fun(DeviceId, BadPayload), + make_message_fun(iotdb_topic(Config), BadPayload), IsSuccessCheck, iotdb_bridge_on_query ), ok. t_async_query_badpayload(Config) -> - DeviceId = iotdb_device(Config), BadPayload = #{foo => bar}, IsSuccessCheck = fun(Result) -> @@ -350,7 +352,7 @@ t_async_query_badpayload(Config) -> end, emqx_bridge_testlib:t_async_query( Config, - make_message_fun(DeviceId, BadPayload), + make_message_fun(iotdb_topic(Config), BadPayload), IsSuccessCheck, iotdb_bridge_on_query_async ), @@ -364,3 +366,78 @@ t_start_stop(Config) -> t_on_get_status(Config) -> emqx_bridge_testlib:t_on_get_status(Config). + +t_device_id(Config) -> + ResourceId = emqx_bridge_testlib:resource_id(Config), + %% Create without device_id configured + ?assertMatch({ok, _}, emqx_bridge_testlib:create_bridge(Config)), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + ConfiguredDevice = <<"root.someOtherDevice234">>, + DeviceId = <<"root.deviceFooBar123">>, + Topic = <<"some/random/topic">>, + TopicDevice = topic_to_iotdb_device(Topic), + iotdb_reset(Config, DeviceId), + iotdb_reset(Config, TopicDevice), + iotdb_reset(Config, ConfiguredDevice), + Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true), + MessageF1 = make_message_fun(Topic, Payload1), + ?assertNotEqual(DeviceId, TopicDevice), + is_success_check( + emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) + ), + {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), + #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1), + ?assertNotEqual([], Values1_1), + {ok, {{_, 200, _}, _, Res1_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), + #{<<"values">> := Values1_2} = emqx_utils_json:decode(Res1_2), + ?assertEqual([], Values1_2), + + %% test without device_id in message, taking it from topic + iotdb_reset(Config, DeviceId), + iotdb_reset(Config, TopicDevice), + iotdb_reset(Config, ConfiguredDevice), + Payload2 = maps:remove(device_id, make_iotdb_payload(DeviceId, "root", "BOOLEAN", true)), + MessageF2 = make_message_fun(Topic, Payload2), + is_success_check( + emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF2()}) + ), + {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), + #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1), + ?assertEqual([], Values2_1), + {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), + #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2), + ?assertNotEqual([], Values2_2), + + iotdb_reset(Config, DeviceId), + iotdb_reset(Config, TopicDevice), + iotdb_reset(Config, ConfiguredDevice), + + %% reconfigure bridge with device_id + {ok, _} = + emqx_bridge_testlib:update_bridge_api(Config, #{<<"device_id">> => ConfiguredDevice}), + + is_success_check( + emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) + ), + + %% even though we had a device_id in the message it's not being used + {ok, {{_, 200, _}, _, Res3_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), + #{<<"values">> := Values3_1} = emqx_utils_json:decode(Res3_1), + ?assertEqual([], Values3_1), + {ok, {{_, 200, _}, _, Res3_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), + #{<<"values">> := Values3_2} = emqx_utils_json:decode(Res3_2), + ?assertEqual([], Values3_2), + {ok, {{_, 200, _}, _, Res3_3}} = iotdb_query( + Config, <<"select * from ", ConfiguredDevice/binary>> + ), + #{<<"values">> := Values3_3} = emqx_utils_json:decode(Res3_3), + ?assertNotEqual([], Values3_3), + + iotdb_reset(Config, DeviceId), + iotdb_reset(Config, TopicDevice), + iotdb_reset(Config, ConfiguredDevice), + ok. From d775a1e16ce9aa426941f31d9fd6dc9db6243d08 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Tue, 30 May 2023 16:32:38 +0200 Subject: [PATCH 10/12] test: add iotdb-0.13.4 container --- .../docker-compose-iotdb.yaml | 29 +++++++++++++++++++ .ci/docker-compose-file/toxiproxy.json | 6 ++++ 2 files changed, 35 insertions(+) diff --git a/.ci/docker-compose-file/docker-compose-iotdb.yaml b/.ci/docker-compose-file/docker-compose-iotdb.yaml index 2e1ea881e..ec050b9ff 100644 --- a/.ci/docker-compose-file/docker-compose-iotdb.yaml +++ b/.ci/docker-compose-file/docker-compose-iotdb.yaml @@ -29,3 +29,32 @@ services: # - "18080:18080" networks: - emqx_bridge + + iotdb_0_13: + container_name: iotdb_0_13 + hostname: iotdb013 + image: apache/iotdb:0.13.4-node + restart: always + environment: + - enable_rest_service=true + - cn_internal_address=iotdb013 + - cn_internal_port=10710 + - cn_consensus_port=10720 + - cn_target_config_node_list=iotdb013:10710 + - dn_rpc_address=iotdb013 + - dn_internal_address=iotdb013 + - dn_rpc_port=6667 + - dn_mpp_data_exchange_port=10740 + - dn_schema_region_consensus_port=10750 + - dn_data_region_consensus_port=10760 + - dn_target_config_node_list=iotdb013:10710 + # volumes: + # - ./data:/iotdb/data + # - ./logs:/iotdb/logs + expose: + - "18080" + # IoTDB's REST interface, uncomment for local testing + # ports: + # - "18080:18080" + networks: + - emqx_bridge diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index c266b2792..8695acde9 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -132,6 +132,12 @@ "upstream": "iotdb:18080", "enabled": true }, + { + "name": "iotdb013", + "listen": "0.0.0.0:38080", + "upstream": "iotdb013:18080", + "enabled": true + }, { "name": "minio_tcp", "listen": "0.0.0.0:19000", From 6c0fb0e2eaa9bb3c7b820fcc4b782ab41eee7238 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 31 May 2023 11:30:13 +0200 Subject: [PATCH 11/12] test: run tests against iotdb013 container --- .../docker-compose-iotdb.yaml | 5 +- .../docker-compose-toxiproxy.yaml | 1 + .../iotdb013/iotdb-rest.properties | 58 ++++++++++++++++ .../test/emqx_bridge_iotdb_impl_SUITE.erl | 69 +++++++++++++++---- 4 files changed, 117 insertions(+), 16 deletions(-) create mode 100644 .ci/docker-compose-file/iotdb013/iotdb-rest.properties diff --git a/.ci/docker-compose-file/docker-compose-iotdb.yaml b/.ci/docker-compose-file/docker-compose-iotdb.yaml index ec050b9ff..2a2b0e603 100644 --- a/.ci/docker-compose-file/docker-compose-iotdb.yaml +++ b/.ci/docker-compose-file/docker-compose-iotdb.yaml @@ -31,7 +31,7 @@ services: - emqx_bridge iotdb_0_13: - container_name: iotdb_0_13 + container_name: iotdb013 hostname: iotdb013 image: apache/iotdb:0.13.4-node restart: always @@ -48,7 +48,8 @@ services: - dn_schema_region_consensus_port=10750 - dn_data_region_consensus_port=10760 - dn_target_config_node_list=iotdb013:10710 - # volumes: + volumes: + - ./iotdb013/iotdb-rest.properties:/iotdb/conf/iotdb-rest.properties # - ./data:/iotdb/data # - ./logs:/iotdb/logs expose: diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index 88c2cb61a..f15e779db 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -46,6 +46,7 @@ services: # IOTDB - 14242:4242 - 28080:18080 + - 38080:38080 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/iotdb013/iotdb-rest.properties b/.ci/docker-compose-file/iotdb013/iotdb-rest.properties new file mode 100644 index 000000000..75d0ae1b0 --- /dev/null +++ b/.ci/docker-compose-file/iotdb013/iotdb-rest.properties @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#################### +### REST Service Configuration +#################### + +# Is the REST service enabled +enable_rest_service=true + +# the binding port of the REST service +# rest_service_port=18080 + +# the default row limit to a REST query response when the rowSize parameter is not given in request +# rest_query_default_row_size_limit=10000 + +# the expiration time of the user login information cache (in seconds) +# cache_expire_in_seconds=28800 + +# maximum number of users can be stored in the user login cache. +# cache_max_num=100 + +# init capacity of users can be stored in the user login cache. +# cache_init_num=10 + +# is SSL enabled +# enable_https=false + +# SSL key store path +# key_store_path= + +# SSL key store password +# key_store_pwd= + +# SSL trust store path +# trust_store_path= + +# SSL trust store password. +# trust_store_pwd= + +# SSL timeout (in seconds) +# idle_timeout_in_seconds=50000 diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 0811807db..55d3743d4 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -6,6 +6,7 @@ -compile(nowarn_export_all). -compile(export_all). +-include("emqx_bridge_iotdb.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -19,13 +20,15 @@ all() -> [ - {group, plain} + {group, plain}, + {group, legacy} ]. groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), [ - {plain, AllTCs} + {plain, AllTCs}, + {legacy, AllTCs} ]. init_per_suite(Config) -> @@ -44,7 +47,32 @@ init_per_group(plain = Type, Config0) -> [ {bridge_host, Host}, {bridge_port, Port}, - {proxy_name, ProxyName} + {proxy_name, ProxyName}, + {iotdb_version, ?VSN_1_1_X}, + {iotdb_rest_prefix, <<"/rest/v2/">>} + | Config + ]; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_iotdb); + _ -> + {skip, no_iotdb} + end + end; +init_per_group(legacy = Type, Config0) -> + Host = os:getenv("IOTDB_LEGACY_HOST", "toxiproxy.emqx.net"), + Port = list_to_integer(os:getenv("IOTDB_LEGACY_PORT", "38080")), + ProxyName = "iotdb013", + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of + true -> + Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), + [ + {bridge_host, Host}, + {bridge_port, Port}, + {proxy_name, ProxyName}, + {iotdb_version, ?VSN_0_13_X}, + {iotdb_rest_prefix, <<"/rest/v1/">>} | Config ]; false -> @@ -59,7 +87,8 @@ init_per_group(_Group, Config) -> Config. end_per_group(Group, Config) when - Group =:= plain + Group =:= plain; + Group =:= legacy -> emqx_bridge_testlib:end_per_group(Config), ok; @@ -89,6 +118,7 @@ bridge_config(TestCase, _TestGroup, Config) -> UniqueNum = integer_to_binary(erlang:unique_integer()), Host = ?config(bridge_host, Config), Port = ?config(bridge_port, Config), + Version = ?config(iotdb_version, Config), Name = << (atom_to_binary(TestCase))/binary, UniqueNum/binary >>, @@ -102,6 +132,7 @@ bridge_config(TestCase, _TestGroup, Config) -> " username = \"root\"\n" " password = \"root\"\n" " }\n" + "iotdb_version = \"~s\"\n" " pool_size = 1\n" " resource_opts = {\n" " auto_restart_interval = 5000\n" @@ -112,7 +143,8 @@ bridge_config(TestCase, _TestGroup, Config) -> "}\n", [ Name, - ServerURL + ServerURL, + Version ] ), {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}. @@ -182,11 +214,13 @@ iotdb_reset(Config) -> iotdb_reset(Config, Device). iotdb_reset(Config, Device) -> + Prefix = ?config(iotdb_rest_prefix, Config), Body = #{sql => <<"delete from ", Device/binary, ".*">>}, - {ok, _} = iotdb_request(Config, <<"/rest/v2/nonQuery">>, Body). + {ok, _} = iotdb_request(Config, <>, Body). iotdb_query(Config, Query) -> - Path = <<"/rest/v2/query">>, + Prefix = ?config(iotdb_rest_prefix, Config), + Path = <>, Opts = #{return_all => true}, Body = #{sql => Query}, iotdb_request(Config, Path, Body, Opts). @@ -390,11 +424,13 @@ t_device_id(Config) -> emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) ), {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), + ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]), #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1), - ?assertNotEqual([], Values1_1), + ?assertNot(is_empty(Values1_1)), {ok, {{_, 200, _}, _, Res1_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), + ct:pal("topic device result: ~p", [emqx_utils_json:decode(Res1_2)]), #{<<"values">> := Values1_2} = emqx_utils_json:decode(Res1_2), - ?assertEqual([], Values1_2), + ?assert(is_empty(Values1_2)), %% test without device_id in message, taking it from topic iotdb_reset(Config, DeviceId), @@ -407,10 +443,10 @@ t_device_id(Config) -> ), {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1), - ?assertEqual([], Values2_1), + ?assert(is_empty(Values2_1)), {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2), - ?assertNotEqual([], Values2_2), + ?assertNot(is_empty(Values2_2)), iotdb_reset(Config, DeviceId), iotdb_reset(Config, TopicDevice), @@ -427,17 +463,22 @@ t_device_id(Config) -> %% even though we had a device_id in the message it's not being used {ok, {{_, 200, _}, _, Res3_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), #{<<"values">> := Values3_1} = emqx_utils_json:decode(Res3_1), - ?assertEqual([], Values3_1), + ?assert(is_empty(Values3_1)), {ok, {{_, 200, _}, _, Res3_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), #{<<"values">> := Values3_2} = emqx_utils_json:decode(Res3_2), - ?assertEqual([], Values3_2), + ?assert(is_empty(Values3_2)), {ok, {{_, 200, _}, _, Res3_3}} = iotdb_query( Config, <<"select * from ", ConfiguredDevice/binary>> ), #{<<"values">> := Values3_3} = emqx_utils_json:decode(Res3_3), - ?assertNotEqual([], Values3_3), + ?assertNot(is_empty(Values3_3)), iotdb_reset(Config, DeviceId), iotdb_reset(Config, TopicDevice), iotdb_reset(Config, ConfiguredDevice), ok. + +is_empty(null) -> true; +is_empty([]) -> true; +is_empty([[]]) -> true; +is_empty(_) -> false. From 1e4cee05df2fe14cf005ad602d602e0d6d55bf74 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 31 May 2023 15:57:22 +0200 Subject: [PATCH 12/12] fix(emqx_bridge_iotdb): handle rule engine passed payload also remove topic logic as it's duplicated functionality via rule engine --- .../src/emqx_bridge_iotdb_impl.erl | 47 +++++----- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 89 ++++++++----------- 2 files changed, 58 insertions(+), 78 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index ec7250a8d..4e64f07b5 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -153,10 +153,17 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> %% Internal Functions %%-------------------------------------------------------------------- -make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) -> - emqx_utils_json:decode(PayloadUnparsed); -make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) -> - lists:map(fun make_parsed_payload/1, PayloadUnparsed). +get_payload(#{payload := Payload}) -> + Payload; +get_payload(#{<<"payload">> := Payload}) -> + Payload. + +parse_payload(ParsedPayload) when is_map(ParsedPayload) -> + ParsedPayload; +parse_payload(UnparsedPayload) when is_binary(UnparsedPayload) -> + emqx_utils_json:decode(UnparsedPayload); +parse_payload(UnparsedPayloads) when is_list(UnparsedPayloads) -> + lists:map(fun parse_payload/1, UnparsedPayloads). preproc_data_list(DataList) -> lists:foldl( @@ -297,16 +304,16 @@ convert_float(Str) when is_binary(Str) -> convert_float(undefined) -> null. -make_iotdb_insert_request(MessageUnparsedPayload, State) -> - Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload), +make_iotdb_insert_request(Message, State) -> + Payloads = to_list(parse_payload(get_payload(Message))), IsAligned = maps:get(is_aligned, State, false), - DeviceId = device_id(Message, State), IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X), - Payload = make_list(maps:get(payload, Message)), - case preproc_data_list(Payload) of - [] -> + case {device_id(Message, Payloads, State), preproc_data_list(Payloads)} of + {undefined, _} -> + {error, device_id_missing}; + {_, []} -> {error, invalid_data}; - PreProcessedData -> + {DeviceId, PreProcessedData} -> DataList = proc_data(PreProcessedData, Message), InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), @@ -394,22 +401,14 @@ iotdb_field_key(data_types, ?VSN_1_0_X) -> iotdb_field_key(data_types, ?VSN_0_13_X) -> <<"dataTypes">>. -make_list(List) when is_list(List) -> List; -make_list(Data) -> [Data]. +to_list(List) when is_list(List) -> List; +to_list(Data) -> [Data]. -device_id(Message, State) -> +device_id(Message, Payloads, State) -> case maps:get(device_id, State, undefined) of undefined -> - case maps:get(payload, Message) of - #{<<"device_id">> := DeviceId} -> - DeviceId; - _NotFound -> - Topic = maps:get(topic, Message), - case re:replace(Topic, "/", ".", [global, {return, binary}]) of - <<"root.", _/binary>> = Device -> Device; - Device -> <<"root.", Device/binary>> - end - end; + %% [FIXME] there could be conflicting device-ids in the Payloads + maps:get(<<"device_id">>, hd(Payloads), undefined); DeviceId -> DeviceIdTkn = emqx_plugin_libs_rule:preproc_tmpl(DeviceId), emqx_plugin_libs_rule:proc_tmpl(DeviceIdTkn, Message) diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 55d3743d4..d89b870d0 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -231,6 +231,11 @@ is_success_check({ok, 200, _, Body}) -> is_code(Code, #{<<"code">> := Code}) -> true; is_code(_, _) -> false. +is_error_check(Reason) -> + fun(Result) -> + ?assertEqual({error, Reason}, Result) + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -364,33 +369,37 @@ t_sync_query_fail(Config) -> end, emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). -t_sync_query_badpayload(Config) -> - BadPayload = #{foo => bar}, - IsSuccessCheck = - fun(Result) -> - ?assertEqual({error, invalid_data}, Result) - end, +t_sync_device_id_missing(Config) -> emqx_bridge_testlib:t_sync_query( Config, - make_message_fun(iotdb_topic(Config), BadPayload), - IsSuccessCheck, + make_message_fun(iotdb_topic(Config), #{foo => bar}), + is_error_check(device_id_missing), iotdb_bridge_on_query - ), - ok. + ). -t_async_query_badpayload(Config) -> - BadPayload = #{foo => bar}, - IsSuccessCheck = - fun(Result) -> - ?assertEqual({error, invalid_data}, Result) - end, +t_sync_invalid_data(Config) -> + emqx_bridge_testlib:t_sync_query( + Config, + make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}), + is_error_check(invalid_data), + iotdb_bridge_on_query + ). + +t_async_device_id_missing(Config) -> emqx_bridge_testlib:t_async_query( Config, - make_message_fun(iotdb_topic(Config), BadPayload), - IsSuccessCheck, + make_message_fun(iotdb_topic(Config), #{foo => bar}), + is_error_check(device_id_missing), iotdb_bridge_on_query_async - ), - ok. + ). + +t_async_invalid_data(Config) -> + emqx_bridge_testlib:t_async_query( + Config, + make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}), + is_error_check(invalid_data), + iotdb_bridge_on_query_async + ). t_create_via_http(Config) -> emqx_bridge_testlib:t_create_via_http(Config). @@ -413,13 +422,10 @@ t_device_id(Config) -> ConfiguredDevice = <<"root.someOtherDevice234">>, DeviceId = <<"root.deviceFooBar123">>, Topic = <<"some/random/topic">>, - TopicDevice = topic_to_iotdb_device(Topic), iotdb_reset(Config, DeviceId), - iotdb_reset(Config, TopicDevice), iotdb_reset(Config, ConfiguredDevice), Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true), MessageF1 = make_message_fun(Topic, Payload1), - ?assertNotEqual(DeviceId, TopicDevice), is_success_check( emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) ), @@ -427,29 +433,8 @@ t_device_id(Config) -> ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]), #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1), ?assertNot(is_empty(Values1_1)), - {ok, {{_, 200, _}, _, Res1_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), - ct:pal("topic device result: ~p", [emqx_utils_json:decode(Res1_2)]), - #{<<"values">> := Values1_2} = emqx_utils_json:decode(Res1_2), - ?assert(is_empty(Values1_2)), - - %% test without device_id in message, taking it from topic - iotdb_reset(Config, DeviceId), - iotdb_reset(Config, TopicDevice), - iotdb_reset(Config, ConfiguredDevice), - Payload2 = maps:remove(device_id, make_iotdb_payload(DeviceId, "root", "BOOLEAN", true)), - MessageF2 = make_message_fun(Topic, Payload2), - is_success_check( - emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF2()}) - ), - {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), - #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1), - ?assert(is_empty(Values2_1)), - {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), - #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2), - ?assertNot(is_empty(Values2_2)), iotdb_reset(Config, DeviceId), - iotdb_reset(Config, TopicDevice), iotdb_reset(Config, ConfiguredDevice), %% reconfigure bridge with device_id @@ -461,20 +446,16 @@ t_device_id(Config) -> ), %% even though we had a device_id in the message it's not being used - {ok, {{_, 200, _}, _, Res3_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), - #{<<"values">> := Values3_1} = emqx_utils_json:decode(Res3_1), - ?assert(is_empty(Values3_1)), - {ok, {{_, 200, _}, _, Res3_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), - #{<<"values">> := Values3_2} = emqx_utils_json:decode(Res3_2), - ?assert(is_empty(Values3_2)), - {ok, {{_, 200, _}, _, Res3_3}} = iotdb_query( + {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), + #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1), + ?assert(is_empty(Values2_1)), + {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query( Config, <<"select * from ", ConfiguredDevice/binary>> ), - #{<<"values">> := Values3_3} = emqx_utils_json:decode(Res3_3), - ?assertNot(is_empty(Values3_3)), + #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2), + ?assertNot(is_empty(Values2_2)), iotdb_reset(Config, DeviceId), - iotdb_reset(Config, TopicDevice), iotdb_reset(Config, ConfiguredDevice), ok.