From d19ddb1832fbc7c15e900e9f047ed6097e369518 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 22 May 2023 13:32:49 +0200 Subject: [PATCH 1/3] fix: IoTDB bridge incoming payload needs to be parsed as JSON There was an incorrect assumption that the data incoming to the IoTDB bridge has already been parsed. This is fixed by parsing the payload as JSON data if the payload is not already a map. Fixes: https://emqx.atlassian.net/browse/EMQX-9854 --- .../src/emqx_bridge_iotdb.app.src | 2 +- .../src/emqx_bridge_iotdb_impl.erl | 61 +++++++++++++------ 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index 9c5108307..cebf60cb1 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_impl diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 2f8794560..416e19a3a 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -143,24 +143,42 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> %% Internal Functions %%-------------------------------------------------------------------- -preproc_data(DataList) -> +make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) -> + emqx_utils_json:decode(PayloadUnparsed, [return_maps]); +make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) -> + lists:map(fun make_parsed_payload/1, PayloadUnparsed); +make_parsed_payload( + #{ + measurement := Measurement, + data_type := DataType, + value := Value + } = Data +) -> + Data#{ + <<"measurement">> => Measurement, + <<"data_type">> => DataType, + <<"value">> => Value + }. + +preproc_data( + #{ + <<"measurement">> := Measurement, + <<"data_type">> := DataType, + <<"value">> := Value + } = Data +) -> + #{ + timestamp => emqx_plugin_libs_rule:preproc_tmpl( + maps:get(<<"timestamp">>, Data, <<"now">>) + ), + measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + data_type => DataType, + value => emqx_plugin_libs_rule:preproc_tmpl(Value) + }. + +preproc_data_list(DataList) -> lists:map( - fun( - #{ - measurement := Measurement, - data_type := DataType, - value := Value - } = Data - ) -> - #{ - timestamp => emqx_plugin_libs_rule:preproc_tmpl( - maps:get(<<"timestamp">>, Data, <<"now">>) - ), - measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), - data_type => DataType, - value => emqx_plugin_libs_rule:preproc_tmpl(Value) - } - end, + fun preproc_data/1, DataList ). @@ -258,12 +276,15 @@ convert_float(Str) when is_binary(Str) -> convert_float(undefined) -> null. -make_iotdb_insert_request(Message, State) -> +make_iotdb_insert_request(MessageUnparsedPayload, State) -> + PayloadUnparsed = maps:get(payload, MessageUnparsedPayload), + PayloadParsed = make_parsed_payload(PayloadUnparsed), + Message = MessageUnparsedPayload#{payload => PayloadParsed}, IsAligned = maps:get(is_aligned, State, false), DeviceId = device_id(Message, State), IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X), Payload = make_list(maps:get(payload, Message)), - PreProcessedData = preproc_data(Payload), + PreProcessedData = preproc_data_list(Payload), DataList = proc_data(PreProcessedData, Message), InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), @@ -350,6 +371,8 @@ device_id(Message, State) -> case maps:get(device_id, State, undefined) of undefined -> case maps:get(payload, Message) of + #{<<"device_id">> := DeviceId} -> + DeviceId; #{device_id := DeviceId} -> DeviceId; _NotFound -> From 89ea1646bedacd18e4fe5991874a5db435ab3d7d Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 May 2023 10:57:05 +0200 Subject: [PATCH 2/3] fix: IoTDB name for version option The previous name for the version option was v1.0.x which is clearly wrong since this option was tested against IoTDB version v1.1.0. This commit fixes this by renaming the option to v1.x. Fixes: https://emqx.atlassian.net/browse/EMQX-9926 --- apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl | 2 +- apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl | 6 +++--- apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl index 5e6bf9ac5..5d693547a 100644 --- a/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl +++ b/apps/emqx_bridge_iotdb/include/emqx_bridge_iotdb.hrl @@ -5,7 +5,7 @@ -ifndef(EMQX_BRIDGE_IOTDB_HRL). -define(EMQX_BRIDGE_IOTDB_HRL, true). --define(VSN_1_0_X, 'v1.0.x'). +-define(VSN_1_X, 'v1.x'). -define(VSN_0_13_X, 'v0.13.x'). -endif. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 90e8d18a4..aa2c32589 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -109,10 +109,10 @@ basic_config() -> )}, {iotdb_version, mk( - hoconsc:enum([?VSN_1_0_X, ?VSN_0_13_X]), + hoconsc:enum([?VSN_1_X, ?VSN_0_13_X]), #{ desc => ?DESC("config_iotdb_version"), - default => ?VSN_1_0_X + default => ?VSN_1_X } )} ] ++ resource_creation_opts() ++ @@ -217,7 +217,7 @@ conn_bridge_example(_Method, Type) -> is_aligned => false, device_id => <<"my_device">>, base_url => <<"http://iotdb.local:18080/">>, - iotdb_version => ?VSN_1_0_X, + iotdb_version => ?VSN_1_X, connect_timeout => <<"15s">>, pool_type => <<"random">>, pool_size => 8, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 416e19a3a..9014fde0b 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -282,7 +282,7 @@ make_iotdb_insert_request(MessageUnparsedPayload, State) -> Message = MessageUnparsedPayload#{payload => PayloadParsed}, IsAligned = maps:get(is_aligned, State, false), DeviceId = device_id(Message, State), - IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X), + IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X), Payload = make_list(maps:get(payload, Message)), PreProcessedData = preproc_data_list(Payload), DataList = proc_data(PreProcessedData, Message), @@ -351,15 +351,15 @@ insert_value(1, Data, [Value | Values]) -> insert_value(Index, Data, [Value | Values]) -> [[null | Value] | insert_value(Index - 1, Data, Values)]. -iotdb_field_key(is_aligned, ?VSN_1_0_X) -> +iotdb_field_key(is_aligned, ?VSN_1_X) -> <<"is_aligned">>; iotdb_field_key(is_aligned, ?VSN_0_13_X) -> <<"isAligned">>; -iotdb_field_key(device_id, ?VSN_1_0_X) -> +iotdb_field_key(device_id, ?VSN_1_X) -> <<"device">>; iotdb_field_key(device_id, ?VSN_0_13_X) -> <<"deviceId">>; -iotdb_field_key(data_types, ?VSN_1_0_X) -> +iotdb_field_key(data_types, ?VSN_1_X) -> <<"data_types">>; iotdb_field_key(data_types, ?VSN_0_13_X) -> <<"dataTypes">>. From 63180c87be875e651c3781b742715cbb4c2d9a16 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 23 May 2023 18:53:14 +0200 Subject: [PATCH 3/3] style: simplify code for better readability Co-authored-by: Thales Macedo Garitezi --- apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 9014fde0b..8331e715f 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -277,9 +277,7 @@ convert_float(undefined) -> null. make_iotdb_insert_request(MessageUnparsedPayload, State) -> - PayloadUnparsed = maps:get(payload, MessageUnparsedPayload), - PayloadParsed = make_parsed_payload(PayloadUnparsed), - Message = MessageUnparsedPayload#{payload => PayloadParsed}, + Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload), IsAligned = maps:get(is_aligned, State, false), DeviceId = device_id(Message, State), IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X),