Merge pull request #12335 from lafirest/fix/type_clause
fix(iotdb): enhances type checking when converting value
This commit is contained in:
commit
6d6242c27a
|
@ -444,19 +444,22 @@ proc_data(
|
||||||
DataType = list_to_binary(
|
DataType = list_to_binary(
|
||||||
string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
|
string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
|
||||||
),
|
),
|
||||||
case proc_value(DataType, ValueTkn, Msg) of
|
try
|
||||||
{ok, Value} ->
|
|
||||||
proc_data(T, Msg, Nows, [
|
proc_data(T, Msg, Nows, [
|
||||||
#{
|
#{
|
||||||
timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
|
timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
|
||||||
measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
|
measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
|
||||||
data_type => DataType,
|
data_type => DataType,
|
||||||
value => Value
|
value => proc_value(DataType, ValueTkn, Msg)
|
||||||
}
|
}
|
||||||
| Acc
|
| Acc
|
||||||
]);
|
])
|
||||||
Error ->
|
catch
|
||||||
Error
|
throw:Reason ->
|
||||||
|
{error, Reason};
|
||||||
|
Error:Reason:Stacktrace ->
|
||||||
|
?SLOG(debug, #{exception => Error, reason => Reason, stacktrace => Stacktrace}),
|
||||||
|
{error, invalid_data}
|
||||||
end;
|
end;
|
||||||
proc_data([], _Msg, _Nows, Acc) ->
|
proc_data([], _Msg, _Nows, Acc) ->
|
||||||
{ok, lists:reverse(Acc)}.
|
{ok, lists:reverse(Acc)}.
|
||||||
|
@ -478,19 +481,18 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
|
||||||
binary_to_integer(Timestamp).
|
binary_to_integer(Timestamp).
|
||||||
|
|
||||||
proc_value(<<"TEXT">>, ValueTkn, Msg) ->
|
proc_value(<<"TEXT">>, ValueTkn, Msg) ->
|
||||||
{ok,
|
|
||||||
case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
|
case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
|
||||||
<<"undefined">> -> null;
|
<<"undefined">> -> null;
|
||||||
Val -> Val
|
Val -> Val
|
||||||
end};
|
end;
|
||||||
proc_value(<<"BOOLEAN">>, ValueTkn, Msg) ->
|
proc_value(<<"BOOLEAN">>, ValueTkn, Msg) ->
|
||||||
{ok, convert_bool(replace_var(ValueTkn, Msg))};
|
convert_bool(replace_var(ValueTkn, Msg));
|
||||||
proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> ->
|
proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> ->
|
||||||
{ok, convert_int(replace_var(ValueTkn, Msg))};
|
convert_int(replace_var(ValueTkn, Msg));
|
||||||
proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> ->
|
proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> ->
|
||||||
{ok, convert_float(replace_var(ValueTkn, Msg))};
|
convert_float(replace_var(ValueTkn, Msg));
|
||||||
proc_value(Type, _, _) ->
|
proc_value(Type, _, _) ->
|
||||||
{error, {invalid_type, Type}}.
|
throw(#{reason => invalid_type, type => Type}).
|
||||||
|
|
||||||
replace_var(Tokens, Data) when is_list(Tokens) ->
|
replace_var(Tokens, Data) when is_list(Tokens) ->
|
||||||
[Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
|
[Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
|
||||||
|
|
|
@ -664,7 +664,16 @@ t_sync_query_invalid_type(Config) ->
|
||||||
DeviceId = iotdb_device(Config),
|
DeviceId = iotdb_device(Config),
|
||||||
Payload = make_iotdb_payload(DeviceId, "temp", "IxT32", "36"),
|
Payload = make_iotdb_payload(DeviceId, "temp", "IxT32", "36"),
|
||||||
MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
|
MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
|
||||||
IsInvalidType = fun(Result) -> ?assertMatch({error, {invalid_type, _}}, Result) end,
|
IsInvalidType = fun(Result) -> ?assertMatch({error, #{reason := invalid_type}}, Result) end,
|
||||||
|
ok = emqx_bridge_v2_testlib:t_sync_query(
|
||||||
|
Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
|
||||||
|
).
|
||||||
|
|
||||||
|
t_sync_query_unmatched_type(Config) ->
|
||||||
|
DeviceId = iotdb_device(Config),
|
||||||
|
Payload = make_iotdb_payload(DeviceId, "temp", "BOOLEAN", "not boolean"),
|
||||||
|
MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
|
||||||
|
IsInvalidType = fun(Result) -> ?assertMatch({error, invalid_data}, Result) end,
|
||||||
ok = emqx_bridge_v2_testlib:t_sync_query(
|
ok = emqx_bridge_v2_testlib:t_sync_query(
|
||||||
Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
|
Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
|
||||||
).
|
).
|
||||||
|
|
Loading…
Reference in New Issue