diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index 667266c82..289b8f027 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -192,7 +192,7 @@ preproc_data( ), measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), data_type => DataType, - value => emqx_plugin_libs_rule:preproc_tmpl(Value) + value => maybe_preproc_tmpl(Value) } | Acc ]; @@ -207,6 +207,11 @@ preproc_data(_NoMatch, Acc) -> ), Acc. +maybe_preproc_tmpl(Value) when is_binary(Value) -> + emqx_plugin_libs_rule:preproc_tmpl(Value); +maybe_preproc_tmpl(Value) -> + Value. + proc_data(PreProcessedData, Msg) -> NowNS = erlang:system_time(nanosecond), Nows = #{ diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 5062fb6b9..65bb977d1 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -140,13 +140,16 @@ make_iotdb_payload(DeviceId) -> make_iotdb_payload(DeviceId, Measurement, Type, Value) -> #{ - measurement => Measurement, - data_type => Type, - value => Value, + measurement => s_to_b(Measurement), + data_type => s_to_b(Type), + value => s_to_b(Value), device_id => DeviceId, is_aligned => false }. +s_to_b(S) when is_list(S) -> list_to_binary(S); +s_to_b(V) -> V. + make_message_fun(Topic, Payload) -> fun() -> MsgId = erlang:unique_integer([positive]), @@ -169,7 +172,7 @@ iotdb_device(Config) -> t_sync_query_simple(Config) -> DeviceId = iotdb_device(Config), - Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), + Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), MakeMessageFun = make_message_fun(DeviceId, Payload), IsSuccessCheck = fun(Result) -> @@ -179,7 +182,7 @@ t_sync_query_simple(Config) -> t_async_query(Config) -> DeviceId = iotdb_device(Config), - Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), + Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), MakeMessageFun = make_message_fun(DeviceId, Payload), IsSuccessCheck = fun(Result) -> @@ -192,22 +195,39 @@ t_async_query(Config) -> t_sync_query_aggregated(Config) -> DeviceId = iotdb_device(Config), Payload = [ - make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), - (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "37"))#{timestamp => <<"now_us">>}, - (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{timestamp => <<"now_ns">>}, - (make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{ + make_iotdb_payload(DeviceId, "temp", "INT32", "36"), + make_iotdb_payload(DeviceId, "temp", "INT32", 36), + make_iotdb_payload(DeviceId, "temp", "INT32", 36.7), + (make_iotdb_payload(DeviceId, "temp", "INT32", "37"))#{timestamp => <<"now_us">>}, + (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{timestamp => <<"now_ns">>}, + (make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{ timestamp => <<"1685112026296">> }, - make_iotdb_payload(DeviceId, "charged", <<"BOOLEAN">>, "1"), - make_iotdb_payload(DeviceId, "stoked", <<"BOOLEAN">>, "true"), - make_iotdb_payload(DeviceId, "enriched", <<"BOOLEAN">>, <<"TRUE">>), - make_iotdb_payload(DeviceId, "gutted", <<"BOOLEAN">>, <<"True">>), - make_iotdb_payload(DeviceId, "drained", <<"BOOLEAN">>, "0"), - make_iotdb_payload(DeviceId, "dazzled", <<"BOOLEAN">>, "false"), - make_iotdb_payload(DeviceId, "unplugged", <<"BOOLEAN">>, <<"FALSE">>), - make_iotdb_payload(DeviceId, "unraveled", <<"BOOLEAN">>, <<"False">>), - make_iotdb_payload(DeviceId, "weight", <<"FLOAT">>, "87.3"), - make_iotdb_payload(DeviceId, "foo", <<"TEXT">>, <<"bar">>) + make_iotdb_payload(DeviceId, "temp", "INT64", "36"), + make_iotdb_payload(DeviceId, "temp", "INT64", 36), + make_iotdb_payload(DeviceId, "temp", "INT64", 36.7), + (make_iotdb_payload(DeviceId, "temp", "INT64", "37"))#{timestamp => <<"now_us">>}, + (make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{timestamp => <<"now_ns">>}, + (make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{ + timestamp => <<"1685112026296">> + }, + make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1"), + make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1), + make_iotdb_payload(DeviceId, "started", "BOOLEAN", true), + make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true"), + make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE"), + make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True"), + make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0"), + make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 0), + make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false), + make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false"), + make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE"), + make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False"), + make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", undefined), + make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3"), + make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3), + make_iotdb_payload(DeviceId, "weight", "FLOAT", 87), + make_iotdb_payload(DeviceId, "foo", "TEXT", "bar") ], MakeMessageFun = make_message_fun(DeviceId, Payload), IsSuccessCheck = @@ -218,7 +238,7 @@ t_sync_query_aggregated(Config) -> t_sync_query_fail(Config) -> DeviceId = iotdb_device(Config), - Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "Anton"), + Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"), MakeMessageFun = make_message_fun(DeviceId, Payload), IsSuccessCheck = fun(Result) ->