fix(iotdb): robustify type verification

1. let the type is not case sensitive
2. return error if type is invalid
This commit is contained in:
firest 2024-01-12 18:49:55 +08:00
parent 0f1aaa65bc
commit b15106c753
2 changed files with 80 additions and 44 deletions

View File

@ -128,7 +128,7 @@ fields("connection_fields") ->
mk( mk(
hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
#{ #{
desc => ?DESC(emqx_bridge_iotdb, "iotdb_version"), desc => ?DESC(emqx_bridge_iotdb, "config_iotdb_version"),
default => ?VSN_1_1_X default => ?VSN_1_1_X
} }
)}, )},
@ -422,25 +422,41 @@ proc_data(PreProcessedData, Msg) ->
now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond), now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond),
now_ns => NowNS now_ns => NowNS
}, },
lists:map( proc_data(PreProcessedData, Msg, Nows, []).
fun(
proc_data(
[
#{ #{
timestamp := TimestampTkn, timestamp := TimestampTkn,
measurement := Measurement, measurement := Measurement,
data_type := DataType0, data_type := DataType0,
value := ValueTkn value := ValueTkn
} }
) -> | T
DataType = emqx_placeholder:proc_tmpl(DataType0, Msg), ],
Msg,
Nows,
Acc
) ->
DataType = list_to_binary(
string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
),
case proc_value(DataType, ValueTkn, Msg) of
{ok, Value} ->
proc_data(T, Msg, Nows, [
#{ #{
timestamp => iot_timestamp(TimestampTkn, Msg, Nows), timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
measurement => emqx_placeholder:proc_tmpl(Measurement, Msg), measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
data_type => DataType, data_type => DataType,
value => proc_value(DataType, ValueTkn, Msg) value => Value
} }
end, | Acc
PreProcessedData ]);
). Error ->
Error
end;
proc_data([], _Msg, _Nows, Acc) ->
{ok, lists:reverse(Acc)}.
iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) -> iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
Timestamp; Timestamp;
@ -459,16 +475,19 @@ iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
binary_to_integer(Timestamp). binary_to_integer(Timestamp).
proc_value(<<"TEXT">>, ValueTkn, Msg) -> proc_value(<<"TEXT">>, ValueTkn, Msg) ->
{ok,
case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
<<"undefined">> -> null; <<"undefined">> -> null;
Val -> Val Val -> Val
end; end};
proc_value(<<"BOOLEAN">>, ValueTkn, Msg) -> proc_value(<<"BOOLEAN">>, ValueTkn, Msg) ->
convert_bool(replace_var(ValueTkn, Msg)); {ok, convert_bool(replace_var(ValueTkn, Msg))};
proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> -> proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> ->
convert_int(replace_var(ValueTkn, Msg)); {ok, convert_int(replace_var(ValueTkn, Msg))};
proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> -> proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> ->
convert_float(replace_var(ValueTkn, Msg)). {ok, convert_float(replace_var(ValueTkn, Msg))};
proc_value(Type, _, _) ->
{error, {invalid_type, Type}}.
replace_var(Tokens, Data) when is_list(Tokens) -> replace_var(Tokens, Data) when is_list(Tokens) ->
[Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), [Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
@ -630,9 +649,9 @@ eval_response_body(Body, Resp) ->
preproc_data_template(DataList) -> preproc_data_template(DataList) ->
Atom2Bin = fun Atom2Bin = fun
(Atom, Converter) when is_atom(Atom) -> (Atom) when is_atom(Atom) ->
Converter(Atom); erlang:atom_to_binary(Atom);
(Bin, _) -> (Bin) ->
Bin Bin
end, end,
lists:map( lists:map(
@ -645,18 +664,9 @@ preproc_data_template(DataList) ->
} }
) -> ) ->
#{ #{
timestamp => emqx_placeholder:preproc_tmpl( timestamp => emqx_placeholder:preproc_tmpl(Atom2Bin(Timestamp)),
Atom2Bin(Timestamp, fun erlang:atom_to_binary/1)
),
measurement => emqx_placeholder:preproc_tmpl(Measurement), measurement => emqx_placeholder:preproc_tmpl(Measurement),
data_type => emqx_placeholder:preproc_tmpl( data_type => emqx_placeholder:preproc_tmpl(Atom2Bin(DataType)),
Atom2Bin(
DataType,
fun(Atom) ->
erlang:list_to_binary(string:uppercase(erlang:atom_to_list(Atom)))
end
)
),
value => emqx_placeholder:preproc_tmpl(Value) value => emqx_placeholder:preproc_tmpl(Value)
} }
end, end,
@ -681,9 +691,12 @@ render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message)
[] -> [] ->
{error, invalid_data}; {error, invalid_data};
DataTemplate -> DataTemplate ->
DataList = proc_data(DataTemplate, Message), case proc_data(DataTemplate, Message) of
{ok, DataList} ->
make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn);
Error ->
Error
end
end end
end. end.

View File

@ -646,6 +646,29 @@ t_template(Config) ->
iotdb_reset(Config, TemplateDeviceId), iotdb_reset(Config, TemplateDeviceId),
ok. ok.
t_sync_query_case(Config) ->
DeviceId = iotdb_device(Config),
Payload = make_iotdb_payload(DeviceId, "temp", "InT32", "36"),
MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
ok = emqx_bridge_v2_testlib:t_sync_query(
Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query
),
Query = <<"select temp from ", DeviceId/binary>>,
{ok, {{_, 200, _}, _, IoTDBResult}} = iotdb_query(Config, Query),
?assertMatch(
#{<<"values">> := [[36]]},
emqx_utils_json:decode(IoTDBResult)
).
t_sync_query_invalid_type(Config) ->
DeviceId = iotdb_device(Config),
Payload = make_iotdb_payload(DeviceId, "temp", "IxT32", "36"),
MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload),
IsInvalidType = fun(Result) -> ?assertMatch({error, {invalid_type, _}}, Result) end,
ok = emqx_bridge_v2_testlib:t_sync_query(
Config, MakeMessageFun, IsInvalidType, iotdb_bridge_on_query
).
is_empty(null) -> true; is_empty(null) -> true;
is_empty([]) -> true; is_empty([]) -> true;
is_empty([[]]) -> true; is_empty([[]]) -> true;