feat(emqx_bridge_iotdb): handle bad message format gracefully

This commit is contained in:
Stefan Strigler 2023-05-26 13:52:16 +02:00
parent 0f080cda66
commit 6f54220a51
1 changed files with 73 additions and 45 deletions

View File

@ -110,15 +110,19 @@ on_query(InstanceId, {send_message, Message}, State) ->
send_message => Message,
state => emqx_utils:redact(State)
}),
IoTDBPayload = make_iotdb_insert_request(Message, State),
handle_response(
emqx_connector_http:on_query(
InstanceId, {send_message, IoTDBPayload}, State
)
).
case make_iotdb_insert_request(Message, State) of
{ok, IoTDBPayload} ->
handle_response(
emqx_connector_http:on_query(
InstanceId, {send_message, IoTDBPayload}, State
)
);
Error ->
Error
end.
-spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) ->
{ok, pid()}.
{ok, pid()} | {error, empty_request}.
on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
?SLOG(debug, #{
msg => "iotdb_bridge_on_query_async_called",
@ -126,18 +130,22 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
send_message => Message,
state => emqx_utils:redact(State)
}),
IoTDBPayload = make_iotdb_insert_request(Message, State),
ReplyFunAndArgs =
{
fun(Result) ->
Response = handle_response(Result),
emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
end,
[]
},
emqx_connector_http:on_query_async(
InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State
).
case make_iotdb_insert_request(Message, State) of
{ok, IoTDBPayload} ->
ReplyFunAndArgs =
{
fun(Result) ->
Response = handle_response(Result),
emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
end,
[]
},
emqx_connector_http:on_query_async(
InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State
);
Error ->
Error
end.
%%--------------------------------------------------------------------
%% Internal Functions
@ -160,27 +168,42 @@ make_parsed_payload(
<<"value">> => Value
}.
preproc_data_list(DataList) ->
lists:foldl(
fun preproc_data/2,
[],
DataList
).
preproc_data(
#{
<<"measurement">> := Measurement,
<<"data_type">> := DataType,
<<"value">> := Value
} = Data
} = Data,
Acc
) ->
#{
timestamp => emqx_plugin_libs_rule:preproc_tmpl(
maps:get(<<"timestamp">>, Data, <<"now">>)
),
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
data_type => DataType,
value => emqx_plugin_libs_rule:preproc_tmpl(Value)
}.
preproc_data_list(DataList) ->
lists:map(
fun preproc_data/1,
DataList
).
[
#{
timestamp => emqx_plugin_libs_rule:preproc_tmpl(
maps:get(<<"timestamp">>, Data, <<"now">>)
),
measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement),
data_type => DataType,
value => emqx_plugin_libs_rule:preproc_tmpl(Value)
}
| Acc
];
preproc_data(_NoMatch, Acc) ->
?SLOG(
warning,
#{
msg => "iotdb_bridge_preproc_data_failed",
required_fields => ['measurement', 'data_type', 'value'],
received => _NoMatch
}
),
Acc.
proc_data(PreProcessedData, Msg) ->
NowNS = erlang:system_time(nanosecond),
@ -282,18 +305,23 @@ make_iotdb_insert_request(MessageUnparsedPayload, State) ->
DeviceId = device_id(Message, State),
IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X),
Payload = make_list(maps:get(payload, Message)),
PreProcessedData = preproc_data_list(Payload),
DataList = proc_data(PreProcessedData, Message),
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
maps:merge(Rows, #{
iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
iotdb_field_key(device_id, IotDBVsn) => DeviceId
}).
case preproc_data_list(Payload) of
[] ->
{error, invalid_data};
PreProcessedData ->
DataList = proc_data(PreProcessedData, Message),
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
{ok,
maps:merge(Rows, #{
iotdb_field_key(is_aligned, IotDBVsn) => IsAligned,
iotdb_field_key(device_id, IotDBVsn) => DeviceId
})}
end.
replace_dtypes(Rows, IotDBVsn) ->
{Types, Map} = maps:take(dtypes, Rows),
Map#{iotdb_field_key(data_types, IotDBVsn) => Types}.
replace_dtypes(Rows0, IotDBVsn) ->
{Types, Rows} = maps:take(dtypes, Rows0),
Rows#{iotdb_field_key(data_types, IotDBVsn) => Types}.
aggregate_rows(DataList, InitAcc) ->
lists:foldr(