diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 2146fb51c..e8eb8efb0 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -175,7 +175,7 @@ preproc_data( ) -> [ #{ - timestamp => emqx_plugin_libs_rule:preproc_tmpl( + timestamp => maybe_preproc_tmpl( maps:get(<<"timestamp">>, Data, <<"now">>) ), measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), @@ -217,9 +217,7 @@ proc_data(PreProcessedData, Msg) -> } ) -> #{ - timestamp => iot_timestamp( - emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows - ), + timestamp => iot_timestamp(TimestampTkn, Msg, Nows), measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg), data_type => DataType, value => proc_value(DataType, ValueTkn, Msg) @@ -228,6 +226,11 @@ proc_data(PreProcessedData, Msg) -> PreProcessedData ). +iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) -> + Timestamp; +iot_timestamp(TimestampTkn, Msg, Nows) -> + iot_timestamp(emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows). + iot_timestamp(Timestamp, #{now_ms := NowMs}) when Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>> -> diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 0a6f7d777..54f60bbc2 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -135,9 +135,6 @@ reset_service(Config) -> Body = #{sql => <<"delete from ", Device/binary, ".*">>}, {ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Headers, Body, #{}). -make_iotdb_payload(DeviceId) -> - make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"). - make_iotdb_payload(DeviceId, Measurement, Type, Value) -> #{ measurement => s_to_b(Measurement), @@ -203,14 +200,12 @@ t_sync_query_aggregated(Config) -> (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ timestamp => <<"1685112026296">> }, + (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ + timestamp => 1685112026296 + }, make_iotdb_payload(DeviceId, "temp", "INT64", "36"), make_iotdb_payload(DeviceId, "temp", "INT64", 36), make_iotdb_payload(DeviceId, "temp", "INT64", 36.7), - (make_iotdb_payload(DeviceId, "temp", "INT64", "37"))#{timestamp => <<"now_us">>}, - (make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{timestamp => <<"now_ns">>}, - (make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{ - timestamp => <<"1685112026296">> - }, make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1"), make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1), make_iotdb_payload(DeviceId, "started", "BOOLEAN", true), @@ -227,6 +222,9 @@ t_sync_query_aggregated(Config) -> make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3"), make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3), make_iotdb_payload(DeviceId, "weight", "FLOAT", 87), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3"), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3), + make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87), make_iotdb_payload(DeviceId, "foo", "TEXT", "bar") ], MakeMessageFun = make_message_fun(DeviceId, Payload),