diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index fec50f779..d7b45c18d 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -444,19 +444,22 @@ proc_data( DataType = list_to_binary( string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg))) ), - case proc_value(DataType, ValueTkn, Msg) of - {ok, Value} -> - proc_data(T, Msg, Nows, [ - #{ - timestamp => iot_timestamp(TimestampTkn, Msg, Nows), - measurement => emqx_placeholder:proc_tmpl(Measurement, Msg), - data_type => DataType, - value => Value - } - | Acc - ]); - Error -> - Error + try + proc_data(T, Msg, Nows, [ + #{ + timestamp => iot_timestamp(TimestampTkn, Msg, Nows), + measurement => emqx_placeholder:proc_tmpl(Measurement, Msg), + data_type => DataType, + value => proc_value(DataType, ValueTkn, Msg) + } + | Acc + ]) + catch + throw:Reason -> + {error, Reason}; + Error:Reason:Stacktrace -> + ?SLOG(debug, #{exception => Error, reason => Reason, stacktrace => Stacktrace}), + {error, invalid_data} end; proc_data([], _Msg, _Nows, Acc) -> {ok, lists:reverse(Acc)}. @@ -478,19 +481,18 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) -> binary_to_integer(Timestamp). proc_value(<<"TEXT">>, ValueTkn, Msg) -> - {ok, - case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of - <<"undefined">> -> null; - Val -> Val - end}; + case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of + <<"undefined">> -> null; + Val -> Val + end; proc_value(<<"BOOLEAN">>, ValueTkn, Msg) -> - {ok, convert_bool(replace_var(ValueTkn, Msg))}; + convert_bool(replace_var(ValueTkn, Msg)); proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> -> - {ok, convert_int(replace_var(ValueTkn, Msg))}; + convert_int(replace_var(ValueTkn, Msg)); proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> -> - {ok, convert_float(replace_var(ValueTkn, Msg))}; + convert_float(replace_var(ValueTkn, Msg)); proc_value(Type, _, _) -> - {error, {invalid_type, Type}}. + throw(#{reason => invalid_type, type => Type}). replace_var(Tokens, Data) when is_list(Tokens) -> [Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 8145faf33..1093993b2 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -664,7 +664,16 @@ t_sync_query_invalid_type(Config) -> DeviceId = iotdb_device(Config), Payload = make_iotdb_payload(DeviceId, "temp", "IxT32", "36"), MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), - IsInvalidType = fun(Result) -> ?assertMatch({error, {invalid_type, _}}, Result) end, + IsInvalidType = fun(Result) -> ?assertMatch({error, #{reason := invalid_type}}, Result) end, + ok = emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query + ). + +t_sync_query_unmatched_type(Config) -> + DeviceId = iotdb_device(Config), + Payload = make_iotdb_payload(DeviceId, "temp", "BOOLEAN", "not boolean"), + MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), + IsInvalidType = fun(Result) -> ?assertMatch({error, invalid_data}, Result) end, ok = emqx_bridge_v2_testlib:t_sync_query( Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query ).