fix(emqx_bridge_iotdb): allow non-binary values

This commit is contained in:
Stefan Strigler 2023-05-29 12:07:18 +02:00
parent 64d582770d
commit 1381b54a8d
2 changed files with 46 additions and 21 deletions

View File

@ -192,7 +192,7 @@ preproc_data(
), ),
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
data_type => DataType, data_type => DataType,
value => emqx_plugin_libs_rule:preproc_tmpl(Value) value => maybe_preproc_tmpl(Value)
} }
| Acc | Acc
]; ];
@ -207,6 +207,11 @@ preproc_data(_NoMatch, Acc) ->
), ),
Acc. Acc.
maybe_preproc_tmpl(Value) when is_binary(Value) ->
emqx_plugin_libs_rule:preproc_tmpl(Value);
maybe_preproc_tmpl(Value) ->
Value.
proc_data(PreProcessedData, Msg) -> proc_data(PreProcessedData, Msg) ->
NowNS = erlang:system_time(nanosecond), NowNS = erlang:system_time(nanosecond),
Nows = #{ Nows = #{

View File

@ -140,13 +140,16 @@ make_iotdb_payload(DeviceId) ->
make_iotdb_payload(DeviceId, Measurement, Type, Value) -> make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
#{ #{
measurement => Measurement, measurement => s_to_b(Measurement),
data_type => Type, data_type => s_to_b(Type),
value => Value, value => s_to_b(Value),
device_id => DeviceId, device_id => DeviceId,
is_aligned => false is_aligned => false
}. }.
s_to_b(S) when is_list(S) -> list_to_binary(S);
s_to_b(V) -> V.
make_message_fun(Topic, Payload) -> make_message_fun(Topic, Payload) ->
fun() -> fun() ->
MsgId = erlang:unique_integer([positive]), MsgId = erlang:unique_integer([positive]),
@ -169,7 +172,7 @@ iotdb_device(Config) ->
t_sync_query_simple(Config) -> t_sync_query_simple(Config) ->
DeviceId = iotdb_device(Config), DeviceId = iotdb_device(Config),
Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
MakeMessageFun = make_message_fun(DeviceId, Payload), MakeMessageFun = make_message_fun(DeviceId, Payload),
IsSuccessCheck = IsSuccessCheck =
fun(Result) -> fun(Result) ->
@ -179,7 +182,7 @@ t_sync_query_simple(Config) ->
t_async_query(Config) -> t_async_query(Config) ->
DeviceId = iotdb_device(Config), DeviceId = iotdb_device(Config),
Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
MakeMessageFun = make_message_fun(DeviceId, Payload), MakeMessageFun = make_message_fun(DeviceId, Payload),
IsSuccessCheck = IsSuccessCheck =
fun(Result) -> fun(Result) ->
@ -192,22 +195,39 @@ t_async_query(Config) ->
t_sync_query_aggregated(Config) -> t_sync_query_aggregated(Config) ->
DeviceId = iotdb_device(Config), DeviceId = iotdb_device(Config),
Payload = [ Payload = [
make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "36"), make_iotdb_payload(DeviceId, "temp", "INT32", "36"),
(make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "37"))#{timestamp => <<"now_us">>}, make_iotdb_payload(DeviceId, "temp", "INT32", 36),
(make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{timestamp => <<"now_ns">>}, make_iotdb_payload(DeviceId, "temp", "INT32", 36.7),
(make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "38"))#{ (make_iotdb_payload(DeviceId, "temp", "INT32", "37"))#{timestamp => <<"now_us">>},
(make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{timestamp => <<"now_ns">>},
(make_iotdb_payload(DeviceId, "temp", "INT32", "38"))#{
timestamp => <<"1685112026296">> timestamp => <<"1685112026296">>
}, },
make_iotdb_payload(DeviceId, "charged", <<"BOOLEAN">>, "1"), make_iotdb_payload(DeviceId, "temp", "INT64", "36"),
make_iotdb_payload(DeviceId, "stoked", <<"BOOLEAN">>, "true"), make_iotdb_payload(DeviceId, "temp", "INT64", 36),
make_iotdb_payload(DeviceId, "enriched", <<"BOOLEAN">>, <<"TRUE">>), make_iotdb_payload(DeviceId, "temp", "INT64", 36.7),
make_iotdb_payload(DeviceId, "gutted", <<"BOOLEAN">>, <<"True">>), (make_iotdb_payload(DeviceId, "temp", "INT64", "37"))#{timestamp => <<"now_us">>},
make_iotdb_payload(DeviceId, "drained", <<"BOOLEAN">>, "0"), (make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{timestamp => <<"now_ns">>},
make_iotdb_payload(DeviceId, "dazzled", <<"BOOLEAN">>, "false"), (make_iotdb_payload(DeviceId, "temp", "INT64", "38"))#{
make_iotdb_payload(DeviceId, "unplugged", <<"BOOLEAN">>, <<"FALSE">>), timestamp => <<"1685112026296">>
make_iotdb_payload(DeviceId, "unraveled", <<"BOOLEAN">>, <<"False">>), },
make_iotdb_payload(DeviceId, "weight", <<"FLOAT">>, "87.3"), make_iotdb_payload(DeviceId, "charged", "BOOLEAN", "1"),
make_iotdb_payload(DeviceId, "foo", <<"TEXT">>, <<"bar">>) make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 1),
make_iotdb_payload(DeviceId, "started", "BOOLEAN", true),
make_iotdb_payload(DeviceId, "stoked", "BOOLEAN", "true"),
make_iotdb_payload(DeviceId, "enriched", "BOOLEAN", "TRUE"),
make_iotdb_payload(DeviceId, "gutted", "BOOLEAN", "True"),
make_iotdb_payload(DeviceId, "drained", "BOOLEAN", "0"),
make_iotdb_payload(DeviceId, "floated", "BOOLEAN", 0),
make_iotdb_payload(DeviceId, "uncharted", "BOOLEAN", false),
make_iotdb_payload(DeviceId, "dazzled", "BOOLEAN", "false"),
make_iotdb_payload(DeviceId, "unplugged", "BOOLEAN", "FALSE"),
make_iotdb_payload(DeviceId, "unraveled", "BOOLEAN", "False"),
make_iotdb_payload(DeviceId, "undecided", "BOOLEAN", undefined),
make_iotdb_payload(DeviceId, "weight", "FLOAT", "87.3"),
make_iotdb_payload(DeviceId, "weight", "FLOAT", 87.3),
make_iotdb_payload(DeviceId, "weight", "FLOAT", 87),
make_iotdb_payload(DeviceId, "foo", "TEXT", "bar")
], ],
MakeMessageFun = make_message_fun(DeviceId, Payload), MakeMessageFun = make_message_fun(DeviceId, Payload),
IsSuccessCheck = IsSuccessCheck =
@ -218,7 +238,7 @@ t_sync_query_aggregated(Config) ->
t_sync_query_fail(Config) -> t_sync_query_fail(Config) ->
DeviceId = iotdb_device(Config), DeviceId = iotdb_device(Config),
Payload = make_iotdb_payload(DeviceId, "temp", <<"INT32">>, "Anton"), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "Anton"),
MakeMessageFun = make_message_fun(DeviceId, Payload), MakeMessageFun = make_message_fun(DeviceId, Payload),
IsSuccessCheck = IsSuccessCheck =
fun(Result) -> fun(Result) ->