From 9915c85b0be14832382b5a32aaa5f5f32ff61696 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 29 Jan 2024 10:20:35 +0800 Subject: [PATCH] chore(authn_mnesia): use emqx_utils_stream module to imporve reusability --- .../src/emqx_authn_mnesia.erl | 92 ++++--------------- apps/emqx_utils/src/emqx_utils_stream.erl | 53 +++++++++++ 2 files changed, 73 insertions(+), 72 deletions(-) diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl index b6a3588bf..282fda194 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl @@ -190,6 +190,7 @@ import_users({PasswordType, Filename, FileData}, State) -> warning, #{ msg => "import_users_failed", + reason => Reason1, type => PasswordType, filename => Filename, stacktrace => Stk @@ -451,11 +452,11 @@ group_match_spec(UserGroup, QString) -> parse_import_users(Filename, FileData, Convertor) -> Eval = fun _Eval(F) -> case F() of - eof -> []; - {User, F1} -> [User | _Eval(F1)] + [] -> []; + [User | F1] -> [Convertor(User) | _Eval(F1)] end end, - ReaderFn = reader_fn(Filename, FileData, Convertor), + ReaderFn = reader_fn(Filename, FileData), Users = lists:reverse(Eval(ReaderFn)), NewUsersCount = lists:foldl( @@ -478,65 +479,30 @@ parse_import_users(Filename, FileData, Convertor) -> ), {NewUsersCount, Users}. -reader_fn(prepared_user_list, Data, Convertor) when is_list(Data) -> - reader(prepared_user_list, Data, Convertor); -reader_fn(Filename0, Data, Convertor) -> - Filename = to_binary(Filename0), - case filename:extension(Filename) of +reader_fn(prepared_user_list, List) when is_list(List) -> + %% Example: [#{<<"user_id">> => <<>>, ...}] + emqx_utils_stream:list(List); +reader_fn(Filename0, Data) -> + case filename:extension(to_binary(Filename0)) of <<".json">> -> - reader(json, Data, Convertor); + %% Example: data/user-credentials.json + case emqx_utils_json:safe_decode(Data, [return_maps]) of + {ok, List} when is_list(List) -> + emqx_utils_stream:list(List); + {ok, _} -> + error(unknown_file_format); + {error, Reason} -> + error(Reason) + end; <<".csv">> -> - reader(csv, Data, Convertor); + %% Example: data/user-credentials.csv + emqx_utils_stream:csv(Data); <<>> -> error(unknown_file_format); Extension -> error({unsupported_file_format, Extension}) end. -%% Example: data/user-credentials.json -reader(json, Data, Convertor) when is_binary(Data) -> - case emqx_utils_json:safe_decode(Data, [return_maps]) of - {ok, List} -> - reader(prepared_user_list, List, Convertor); - {error, Reason} -> - error(Reason) - end; -%% Example: data/user-credentials.csv -reader(csv, Data, Convertor) when is_binary(Data) -> - CSVData = csv_data(Data), - Reader = fun _Iter(Headers, Lines) -> - case csv_read_line(Lines) of - {ok, Line, Rest} -> - %% XXX: not support ' ' for a field? - Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [ - global, trim_all - ]), - case length(Fields) == length(Headers) of - true -> - User = maps:from_list(lists:zip(Headers, Fields)), - {Convertor(User), fun() -> _Iter(Headers, Rest) end}; - false -> - error(bad_format) - end; - eof -> - eof - end - end, - case get_csv_header(CSVData) of - {ok, CSVHeaders, CSVLines} -> - fun() -> Reader(CSVHeaders, CSVLines) end; - {error, Reason} -> - error(Reason) - end; -%% Example: [#{<<"user_id">> => <<>>, ...}] -reader(prepared_user_list, Data, Convertor) when is_list(Data) -> - Reader = - fun - _Iter([]) -> eof; - _Iter([User | Rest]) -> {Convertor(User), fun() -> _Iter(Rest) end} - end, - fun() -> Reader(Data) end. - convertor(PasswordType, State) -> fun(User) -> convert_user(User, PasswordType, State) @@ -568,21 +534,3 @@ find_password_hash(_, _, _) -> is_superuser(#{<<"is_superuser">> := <<"true">>}) -> true; is_superuser(#{<<"is_superuser">> := true}) -> true; is_superuser(_) -> false. - -csv_data(Data) -> - Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]), - {csv_data, Lines}. - -get_csv_header(CSV) -> - case csv_read_line(CSV) of - {ok, Line, NewCSV} -> - Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), - {ok, Seq, NewCSV}; - eof -> - {error, empty_file} - end. - -csv_read_line({csv_data, [Line | Lines]}) -> - {ok, Line, {csv_data, Lines}}; -csv_read_line({csv_data, []}) -> - eof. diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 21321400d..dbaf542bf 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -37,6 +37,11 @@ ets/1 ]). +%% Streams from .csv data +-export([ + csv/1 +]). + -export_type([stream/1]). %% @doc A stream is essentially a lazy list. @@ -157,3 +162,51 @@ ets(Cont, ContF) -> [] end end. + +%% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once. +%% The .csv binary is assumed to be in UTF-8 encoding and to have a header row. +-spec csv(binary()) -> stream(map()). +csv(Bin) when is_binary(Bin) -> + CSVData = csv_data(Bin), + Reader = fun _Iter(Headers, Lines) -> + case csv_read_line(Lines) of + {ok, Line, Rest} -> + %% XXX: not support ' ' for a field? + Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [ + global, trim_all + ]), + case length(Fields) == length(Headers) of + true -> + User = maps:from_list(lists:zip(Headers, Fields)), + [User | fun() -> _Iter(Headers, Rest) end]; + false -> + error(bad_format) + end; + eof -> + [] + end + end, + case get_csv_header(CSVData) of + {ok, CSVHeaders, CSVLines} -> + fun() -> Reader(CSVHeaders, CSVLines) end; + {error, Reason} -> + error(Reason) + end. + +csv_data(Data) -> + Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]), + {csv_data, Lines}. + +get_csv_header(CSV) -> + case csv_read_line(CSV) of + {ok, Line, NewCSV} -> + Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), + {ok, Seq, NewCSV}; + eof -> + {error, empty_file} + end. + +csv_read_line({csv_data, [Line | Lines]}) -> + {ok, Line, {csv_data, Lines}}; +csv_read_line({csv_data, []}) -> + eof.