diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index 9c5108307..cebf60cb1 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_impl diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 2f8794560..416e19a3a 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -143,24 +143,42 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> %% Internal Functions %%-------------------------------------------------------------------- -preproc_data(DataList) -> +make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) -> + emqx_utils_json:decode(PayloadUnparsed, [return_maps]); +make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) -> + lists:map(fun make_parsed_payload/1, PayloadUnparsed); +make_parsed_payload( + #{ + measurement := Measurement, + data_type := DataType, + value := Value + } = Data +) -> + Data#{ + <<"measurement">> => Measurement, + <<"data_type">> => DataType, + <<"value">> => Value + }. + +preproc_data( + #{ + <<"measurement">> := Measurement, + <<"data_type">> := DataType, + <<"value">> := Value + } = Data +) -> + #{ + timestamp => emqx_plugin_libs_rule:preproc_tmpl( + maps:get(<<"timestamp">>, Data, <<"now">>) + ), + measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + data_type => DataType, + value => emqx_plugin_libs_rule:preproc_tmpl(Value) + }. + +preproc_data_list(DataList) -> lists:map( - fun( - #{ - measurement := Measurement, - data_type := DataType, - value := Value - } = Data - ) -> - #{ - timestamp => emqx_plugin_libs_rule:preproc_tmpl( - maps:get(<<"timestamp">>, Data, <<"now">>) - ), - measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), - data_type => DataType, - value => emqx_plugin_libs_rule:preproc_tmpl(Value) - } - end, + fun preproc_data/1, DataList ). @@ -258,12 +276,15 @@ convert_float(Str) when is_binary(Str) -> convert_float(undefined) -> null. -make_iotdb_insert_request(Message, State) -> +make_iotdb_insert_request(MessageUnparsedPayload, State) -> + PayloadUnparsed = maps:get(payload, MessageUnparsedPayload), + PayloadParsed = make_parsed_payload(PayloadUnparsed), + Message = MessageUnparsedPayload#{payload => PayloadParsed}, IsAligned = maps:get(is_aligned, State, false), DeviceId = device_id(Message, State), IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X), Payload = make_list(maps:get(payload, Message)), - PreProcessedData = preproc_data(Payload), + PreProcessedData = preproc_data_list(Payload), DataList = proc_data(PreProcessedData, Message), InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), @@ -350,6 +371,8 @@ device_id(Message, State) -> case maps:get(device_id, State, undefined) of undefined -> case maps:get(payload, Message) of + #{<<"device_id">> := DeviceId} -> + DeviceId; #{device_id := DeviceId} -> DeviceId; _NotFound ->