fix(emqx_bridge_iotdb): allow integer timestamp
This commit is contained in:
parent
bd92116cee
commit
a3021c58f1
|
@ -175,7 +175,7 @@ preproc_data(
|
||||||
) ->
|
) ->
|
||||||
[
|
[
|
||||||
#{
|
#{
|
||||||
timestamp => emqx_plugin_libs_rule:preproc_tmpl(
|
timestamp => maybe_preproc_tmpl(
|
||||||
maps:get(<<"timestamp">>, Data, <<"now">>)
|
maps:get(<<"timestamp">>, Data, <<"now">>)
|
||||||
),
|
),
|
||||||
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
|
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
|
||||||
|
@ -217,9 +217,7 @@ proc_data(PreProcessedData, Msg) ->
|
||||||
}
|
}
|
||||||
) ->
|
) ->
|
||||||
#{
|
#{
|
||||||
timestamp => iot_timestamp(
|
timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
|
||||||
emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows
|
|
||||||
),
|
|
||||||
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg),
|
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Msg),
|
||||||
data_type => DataType,
|
data_type => DataType,
|
||||||
value => proc_value(DataType, ValueTkn, Msg)
|
value => proc_value(DataType, ValueTkn, Msg)
|
||||||
|
@ -228,6 +226,11 @@ proc_data(PreProcessedData, Msg) ->
|
||||||
PreProcessedData
|
PreProcessedData
|
||||||
).
|
).
|
||||||
|
|
||||||
|
iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
|
||||||
|
Timestamp;
|
||||||
|
iot_timestamp(TimestampTkn, Msg, Nows) ->
|
||||||
|
iot_timestamp(emqx_plugin_libs_rule:proc_tmpl(TimestampTkn, Msg), Nows).
|
||||||
|
|
||||||
iot_timestamp(Timestamp, #{now_ms := NowMs}) when
|
iot_timestamp(Timestamp, #{now_ms := NowMs}) when
|
||||||
Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
|
Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
|
||||||
->
|
->
|
||||||
|
|
|
@ -135,9 +135,6 @@ reset_service(Config) ->
|
||||||
Body = #{sql => <<"delete from ", Device/binary, ".*">>},
|
Body = #{sql => <<"delete from ", Device/binary, ".*">>},
|
||||||
{ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Headers, Body, #{}).
|
{ok, _} = emqx_mgmt_api_test_util:request_api(post, Path, "", Headers, Body, #{}).
|
||||||
|
|
||||||
make_iotdb_payload(DeviceId) ->
|
|
||||||
make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36").
|
|
||||||
|
|
||||||
make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
|
make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
|
||||||
#{
|
#{
|
||||||
measurement => s_to_b(Measurement),
|
measurement => s_to_b(Measurement),
|
||||||
|
@ -203,14 +200,12 @@ t_sync_query_aggregated(Config) ->
|
||||||
(make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{
|
(make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{
|
||||||
timestamp => <<"1685112026296">>
|
timestamp => <<"1685112026296">>
|
||||||
},
|
},
|
||||||
|
(make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{
|
||||||
|
timestamp => 1685112026296
|
||||||
|
},
|
||||||
make_iotdb_payload(DeviceId, "temp", "INT64", "36"),
|
make_iotdb_payload(DeviceId, "temp", "INT64", "36"),
|
||||||
make_iotdb_payload(DeviceId, "temp", "INT64", 36),
|
make_iotdb_payload(DeviceId, "temp", "INT64", 36),
|
||||||
make_iotdb_payload(DeviceId, "temp", "INT64", 36.7),
|
make_iotdb_payload(DeviceId, "temp", "INT64", 36.7),
|
||||||
(make_iotdb_payload(DeviceId, "temp", "INT64", "37"))#{timestamp => <<"now_us">>},
|
|
||||||
(make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{timestamp => <<"now_ns">>},
|
|
||||||
(make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{
|
|
||||||
timestamp => <<"1685112026296">>
|
|
||||||
},
|
|
||||||
make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1"),
|
make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1"),
|
||||||
make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1),
|
make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1),
|
||||||
make_iotdb_payload(DeviceId, "started", "BOOLEAN", true),
|
make_iotdb_payload(DeviceId, "started", "BOOLEAN", true),
|
||||||
|
@ -227,6 +222,9 @@ t_sync_query_aggregated(Config) ->
|
||||||
make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3"),
|
make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3"),
|
||||||
make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3),
|
make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3),
|
||||||
make_iotdb_payload(DeviceId, "weight", "FLOAT", 87),
|
make_iotdb_payload(DeviceId, "weight", "FLOAT", 87),
|
||||||
|
make_iotdb_payload(DeviceId, "weight", "DOUBLE", "87.3"),
|
||||||
|
make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87.3),
|
||||||
|
make_iotdb_payload(DeviceId, "weight", "DOUBLE", 87),
|
||||||
make_iotdb_payload(DeviceId, "foo", "TEXT", "bar")
|
make_iotdb_payload(DeviceId, "foo", "TEXT", "bar")
|
||||||
],
|
],
|
||||||
MakeMessageFun = make_message_fun(DeviceId, Payload),
|
MakeMessageFun = make_message_fun(DeviceId, Payload),
|
||||||
|
|
Loading…
Reference in New Issue