Merge pull request #10770 from kjellwinblad/kjell/fix/iotdb_need_to_parse_input_bug/EMQX-9854

fix: IoTDB bridge incoming payload needs to be parsed as JSON
This commit is contained in:
JianBo He 2023-05-24 10:17:23 +08:00 committed by GitHub
commit 0d3de36f4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 28 deletions

View File

@ -5,7 +5,7 @@
-ifndef(EMQX_BRIDGE_IOTDB_HRL). -ifndef(EMQX_BRIDGE_IOTDB_HRL).
-define(EMQX_BRIDGE_IOTDB_HRL, true). -define(EMQX_BRIDGE_IOTDB_HRL, true).
-define(VSN_1_0_X, 'v1.0.x'). -define(VSN_1_X, 'v1.x').
-define(VSN_0_13_X, 'v0.13.x'). -define(VSN_0_13_X, 'v0.13.x').
-endif. -endif.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_iotdb, [ {application, emqx_bridge_iotdb, [
{description, "EMQX Enterprise Apache IoTDB Bridge"}, {description, "EMQX Enterprise Apache IoTDB Bridge"},
{vsn, "0.1.0"}, {vsn, "0.1.1"},
{modules, [ {modules, [
emqx_bridge_iotdb, emqx_bridge_iotdb,
emqx_bridge_iotdb_impl emqx_bridge_iotdb_impl

View File

@ -109,10 +109,10 @@ basic_config() ->
)}, )},
{iotdb_version, {iotdb_version,
mk( mk(
hoconsc:enum([?VSN_1_0_X, ?VSN_0_13_X]), hoconsc:enum([?VSN_1_X, ?VSN_0_13_X]),
#{ #{
desc => ?DESC("config_iotdb_version"), desc => ?DESC("config_iotdb_version"),
default => ?VSN_1_0_X default => ?VSN_1_X
} }
)} )}
] ++ resource_creation_opts() ++ ] ++ resource_creation_opts() ++
@ -217,7 +217,7 @@ conn_bridge_example(_Method, Type) ->
is_aligned => false, is_aligned => false,
device_id => <<"my_device">>, device_id => <<"my_device">>,
base_url => <<"http://iotdb.local:18080/">>, base_url => <<"http://iotdb.local:18080/">>,
iotdb_version => ?VSN_1_0_X, iotdb_version => ?VSN_1_X,
connect_timeout => <<"15s">>, connect_timeout => <<"15s">>,
pool_type => <<"random">>, pool_type => <<"random">>,
pool_size => 8, pool_size => 8,

View File

@ -143,14 +143,29 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
%% Internal Functions %% Internal Functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
preproc_data(DataList) -> make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) ->
lists:map( emqx_utils_json:decode(PayloadUnparsed, [return_maps]);
fun( make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) ->
lists:map(fun make_parsed_payload/1, PayloadUnparsed);
make_parsed_payload(
#{ #{
measurement := Measurement, measurement := Measurement,
data_type := DataType, data_type := DataType,
value := Value value := Value
} = Data } = Data
) ->
Data#{
<<"measurement">> => Measurement,
<<"data_type">> => DataType,
<<"value">> => Value
}.
preproc_data(
#{
<<"measurement">> := Measurement,
<<"data_type">> := DataType,
<<"value">> := Value
} = Data
) -> ) ->
#{ #{
timestamp => emqx_plugin_libs_rule:preproc_tmpl( timestamp => emqx_plugin_libs_rule:preproc_tmpl(
@ -159,8 +174,11 @@ preproc_data(DataList) ->
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
data_type => DataType, data_type => DataType,
value => emqx_plugin_libs_rule:preproc_tmpl(Value) value => emqx_plugin_libs_rule:preproc_tmpl(Value)
} }.
end,
preproc_data_list(DataList) ->
lists:map(
fun preproc_data/1,
DataList DataList
). ).
@ -258,12 +276,13 @@ convert_float(Str) when is_binary(Str) ->
convert_float(undefined) -> convert_float(undefined) ->
null. null.
make_iotdb_insert_request(Message, State) -> make_iotdb_insert_request(MessageUnparsedPayload, State) ->
Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
IsAligned = maps:get(is_aligned, State, false), IsAligned = maps:get(is_aligned, State, false),
DeviceId = device_id(Message, State), DeviceId = device_id(Message, State),
IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_0_X), IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X),
Payload = make_list(maps:get(payload, Message)), Payload = make_list(maps:get(payload, Message)),
PreProcessedData = preproc_data(Payload), PreProcessedData = preproc_data_list(Payload),
DataList = proc_data(PreProcessedData, Message), DataList = proc_data(PreProcessedData, Message),
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
@ -330,15 +349,15 @@ insert_value(1, Data, [Value | Values]) ->
insert_value(Index, Data, [Value | Values]) -> insert_value(Index, Data, [Value | Values]) ->
[[null | Value] | insert_value(Index - 1, Data, Values)]. [[null | Value] | insert_value(Index - 1, Data, Values)].
iotdb_field_key(is_aligned, ?VSN_1_0_X) -> iotdb_field_key(is_aligned, ?VSN_1_X) ->
<<"is_aligned">>; <<"is_aligned">>;
iotdb_field_key(is_aligned, ?VSN_0_13_X) -> iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
<<"isAligned">>; <<"isAligned">>;
iotdb_field_key(device_id, ?VSN_1_0_X) -> iotdb_field_key(device_id, ?VSN_1_X) ->
<<"device">>; <<"device">>;
iotdb_field_key(device_id, ?VSN_0_13_X) -> iotdb_field_key(device_id, ?VSN_0_13_X) ->
<<"deviceId">>; <<"deviceId">>;
iotdb_field_key(data_types, ?VSN_1_0_X) -> iotdb_field_key(data_types, ?VSN_1_X) ->
<<"data_types">>; <<"data_types">>;
iotdb_field_key(data_types, ?VSN_0_13_X) -> iotdb_field_key(data_types, ?VSN_0_13_X) ->
<<"dataTypes">>. <<"dataTypes">>.
@ -350,6 +369,8 @@ device_id(Message, State) ->
case maps:get(device_id, State, undefined) of case maps:get(device_id, State, undefined) of
undefined -> undefined ->
case maps:get(payload, Message) of case maps:get(payload, Message) of
#{<<"device_id">> := DeviceId} ->
DeviceId;
#{device_id := DeviceId} -> #{device_id := DeviceId} ->
DeviceId; DeviceId;
_NotFound -> _NotFound ->