diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 70f4ea6f8..5fd3515ad 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -169,14 +169,9 @@ ets(Cont, ContF) -> %% The .csv binary is assumed to be in UTF-8 encoding and to have a header row. -spec csv(binary()) -> stream(map()). csv(Bin) when is_binary(Bin) -> - CSVData = csv_data(Bin), Reader = fun _Iter(Headers, Lines) -> case csv_read_line(Lines) of - {ok, Line, Rest} -> - %% XXX: not support ' ' for a field? - Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [ - global, trim_all - ]), + {Fields, Rest} -> case length(Fields) == length(Headers) of true -> User = maps:from_list(lists:zip(Headers, Fields)), @@ -188,27 +183,17 @@ csv(Bin) when is_binary(Bin) -> [] end end, - case get_csv_header(CSVData) of - {ok, CSVHeaders, CSVLines} -> + HeadersAndLines = binary:split(Bin, [<<"\r">>, <<"\n">>], [global, trim_all]), + case csv_read_line(HeadersAndLines) of + {CSVHeaders, CSVLines} -> fun() -> Reader(CSVHeaders, CSVLines) end; - error -> + eof -> empty() end. -csv_data(Data) -> - Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]), - {csv_data, Lines}. - -get_csv_header(CSV) -> - case csv_read_line(CSV) of - {ok, Line, NewCSV} -> - Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), - {ok, Seq, NewCSV}; - eof -> - error - end. - -csv_read_line({csv_data, [Line | Lines]}) -> - {ok, Line, {csv_data, Lines}}; -csv_read_line({csv_data, []}) -> +csv_read_line([Line | Lines]) -> + %% XXX: not support ' ' for the field value + Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), + {Fields, Lines}; +csv_read_line([]) -> eof.