diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index f19fc0839..abb09ad5c 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -110,15 +110,19 @@ on_query(InstanceId, {send_message, Message}, State) -> send_message => Message, state => emqx_utils:redact(State) }), - IoTDBPayload = make_iotdb_insert_request(Message, State), - handle_response( - emqx_connector_http:on_query( - InstanceId, {send_message, IoTDBPayload}, State - ) - ). + case make_iotdb_insert_request(Message, State) of + {ok, IoTDBPayload} -> + handle_response( + emqx_connector_http:on_query( + InstanceId, {send_message, IoTDBPayload}, State + ) + ); + Error -> + Error + end. -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) -> - {ok, pid()}. + {ok, pid()} | {error, empty_request}. on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> ?SLOG(debug, #{ msg => "iotdb_bridge_on_query_async_called", @@ -126,18 +130,22 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> send_message => Message, state => emqx_utils:redact(State) }), - IoTDBPayload = make_iotdb_insert_request(Message, State), - ReplyFunAndArgs = - { - fun(Result) -> - Response = handle_response(Result), - emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) - end, - [] - }, - emqx_connector_http:on_query_async( - InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State - ). + case make_iotdb_insert_request(Message, State) of + {ok, IoTDBPayload} -> + ReplyFunAndArgs = + { + fun(Result) -> + Response = handle_response(Result), + emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response) + end, + [] + }, + emqx_connector_http:on_query_async( + InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State + ); + Error -> + Error + end. %%-------------------------------------------------------------------- %% Internal Functions @@ -160,27 +168,42 @@ make_parsed_payload( <<"value">> => Value }. +preproc_data_list(DataList) -> + lists:foldl( + fun preproc_data/2, + [], + DataList + ). + preproc_data( #{ <<"measurement">> := Measurement, <<"data_type">> := DataType, <<"value">> := Value - } = Data + } = Data, + Acc ) -> - #{ - timestamp => emqx_plugin_libs_rule:preproc_tmpl( - maps:get(<<"timestamp">>, Data, <<"now">>) - ), - measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), - data_type => DataType, - value => emqx_plugin_libs_rule:preproc_tmpl(Value) - }. - -preproc_data_list(DataList) -> - lists:map( - fun preproc_data/1, - DataList - ). + [ + #{ + timestamp => emqx_plugin_libs_rule:preproc_tmpl( + maps:get(<<"timestamp">>, Data, <<"now">>) + ), + measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + data_type => DataType, + value => emqx_plugin_libs_rule:preproc_tmpl(Value) + } + | Acc + ]; +preproc_data(_NoMatch, Acc) -> + ?SLOG( + warning, + #{ + msg => "iotdb_bridge_preproc_data_failed", + required_fields => ['measurement', 'data_type', 'value'], + received => _NoMatch + } + ), + Acc. proc_data(PreProcessedData, Msg) -> NowNS = erlang:system_time(nanosecond), @@ -282,18 +305,23 @@ make_iotdb_insert_request(MessageUnparsedPayload, State) -> DeviceId = device_id(Message, State), IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X), Payload = make_list(maps:get(payload, Message)), - PreProcessedData = preproc_data_list(Payload), - DataList = proc_data(PreProcessedData, Message), - InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, - Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), - maps:merge(Rows, #{ - iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, - iotdb_field_key(device_id, IotDBVsn) => DeviceId - }). + case preproc_data_list(Payload) of + [] -> + {error, invalid_data}; + PreProcessedData -> + DataList = proc_data(PreProcessedData, Message), + InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, + Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), + {ok, + maps:merge(Rows, #{ + iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, + iotdb_field_key(device_id, IotDBVsn) => DeviceId + })} + end. -replace_dtypes(Rows, IotDBVsn) -> - {Types, Map} = maps:take(dtypes, Rows), - Map#{iotdb_field_key(data_types, IotDBVsn) => Types}. +replace_dtypes(Rows0, IotDBVsn) -> + {Types, Rows} = maps:take(dtypes, Rows0), + Rows#{iotdb_field_key(data_types, IotDBVsn) => Types}. aggregate_rows(DataList, InitAcc) -> lists:foldr(