chore(authn_mnesia): use emqx_utils_stream module to imporve reusability

This commit is contained in:
JianBo He 2024-01-29 10:20:35 +08:00
parent bcbd48ae58
commit 9915c85b0b
2 changed files with 73 additions and 72 deletions

View File

@ -190,6 +190,7 @@ import_users({PasswordType, Filename, FileData}, State) ->
warning, warning,
#{ #{
msg => "import_users_failed", msg => "import_users_failed",
reason => Reason1,
type => PasswordType, type => PasswordType,
filename => Filename, filename => Filename,
stacktrace => Stk stacktrace => Stk
@ -451,11 +452,11 @@ group_match_spec(UserGroup, QString) ->
parse_import_users(Filename, FileData, Convertor) -> parse_import_users(Filename, FileData, Convertor) ->
Eval = fun _Eval(F) -> Eval = fun _Eval(F) ->
case F() of case F() of
eof -> []; [] -> [];
{User, F1} -> [User | _Eval(F1)] [User | F1] -> [Convertor(User) | _Eval(F1)]
end end
end, end,
ReaderFn = reader_fn(Filename, FileData, Convertor), ReaderFn = reader_fn(Filename, FileData),
Users = lists:reverse(Eval(ReaderFn)), Users = lists:reverse(Eval(ReaderFn)),
NewUsersCount = NewUsersCount =
lists:foldl( lists:foldl(
@ -478,65 +479,30 @@ parse_import_users(Filename, FileData, Convertor) ->
), ),
{NewUsersCount, Users}. {NewUsersCount, Users}.
reader_fn(prepared_user_list, Data, Convertor) when is_list(Data) -> reader_fn(prepared_user_list, List) when is_list(List) ->
reader(prepared_user_list, Data, Convertor); %% Example: [#{<<"user_id">> => <<>>, ...}]
reader_fn(Filename0, Data, Convertor) -> emqx_utils_stream:list(List);
Filename = to_binary(Filename0), reader_fn(Filename0, Data) ->
case filename:extension(Filename) of case filename:extension(to_binary(Filename0)) of
<<".json">> -> <<".json">> ->
reader(json, Data, Convertor); %% Example: data/user-credentials.json
case emqx_utils_json:safe_decode(Data, [return_maps]) of
{ok, List} when is_list(List) ->
emqx_utils_stream:list(List);
{ok, _} ->
error(unknown_file_format);
{error, Reason} ->
error(Reason)
end;
<<".csv">> -> <<".csv">> ->
reader(csv, Data, Convertor); %% Example: data/user-credentials.csv
emqx_utils_stream:csv(Data);
<<>> -> <<>> ->
error(unknown_file_format); error(unknown_file_format);
Extension -> Extension ->
error({unsupported_file_format, Extension}) error({unsupported_file_format, Extension})
end. end.
%% Example: data/user-credentials.json
reader(json, Data, Convertor) when is_binary(Data) ->
case emqx_utils_json:safe_decode(Data, [return_maps]) of
{ok, List} ->
reader(prepared_user_list, List, Convertor);
{error, Reason} ->
error(Reason)
end;
%% Example: data/user-credentials.csv
reader(csv, Data, Convertor) when is_binary(Data) ->
CSVData = csv_data(Data),
Reader = fun _Iter(Headers, Lines) ->
case csv_read_line(Lines) of
{ok, Line, Rest} ->
%% XXX: not support ' ' for a field?
Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [
global, trim_all
]),
case length(Fields) == length(Headers) of
true ->
User = maps:from_list(lists:zip(Headers, Fields)),
{Convertor(User), fun() -> _Iter(Headers, Rest) end};
false ->
error(bad_format)
end;
eof ->
eof
end
end,
case get_csv_header(CSVData) of
{ok, CSVHeaders, CSVLines} ->
fun() -> Reader(CSVHeaders, CSVLines) end;
{error, Reason} ->
error(Reason)
end;
%% Example: [#{<<"user_id">> => <<>>, ...}]
reader(prepared_user_list, Data, Convertor) when is_list(Data) ->
Reader =
fun
_Iter([]) -> eof;
_Iter([User | Rest]) -> {Convertor(User), fun() -> _Iter(Rest) end}
end,
fun() -> Reader(Data) end.
convertor(PasswordType, State) -> convertor(PasswordType, State) ->
fun(User) -> fun(User) ->
convert_user(User, PasswordType, State) convert_user(User, PasswordType, State)
@ -568,21 +534,3 @@ find_password_hash(_, _, _) ->
is_superuser(#{<<"is_superuser">> := <<"true">>}) -> true; is_superuser(#{<<"is_superuser">> := <<"true">>}) -> true;
is_superuser(#{<<"is_superuser">> := true}) -> true; is_superuser(#{<<"is_superuser">> := true}) -> true;
is_superuser(_) -> false. is_superuser(_) -> false.
csv_data(Data) ->
Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]),
{csv_data, Lines}.
get_csv_header(CSV) ->
case csv_read_line(CSV) of
{ok, Line, NewCSV} ->
Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]),
{ok, Seq, NewCSV};
eof ->
{error, empty_file}
end.
csv_read_line({csv_data, [Line | Lines]}) ->
{ok, Line, {csv_data, Lines}};
csv_read_line({csv_data, []}) ->
eof.

View File

@ -37,6 +37,11 @@
ets/1 ets/1
]). ]).
%% Streams from .csv data
-export([
csv/1
]).
-export_type([stream/1]). -export_type([stream/1]).
%% @doc A stream is essentially a lazy list. %% @doc A stream is essentially a lazy list.
@ -157,3 +162,51 @@ ets(Cont, ContF) ->
[] []
end end
end. end.
%% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once.
%% The .csv binary is assumed to be in UTF-8 encoding and to have a header row.
-spec csv(binary()) -> stream(map()).
csv(Bin) when is_binary(Bin) ->
CSVData = csv_data(Bin),
Reader = fun _Iter(Headers, Lines) ->
case csv_read_line(Lines) of
{ok, Line, Rest} ->
%% XXX: not support ' ' for a field?
Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [
global, trim_all
]),
case length(Fields) == length(Headers) of
true ->
User = maps:from_list(lists:zip(Headers, Fields)),
[User | fun() -> _Iter(Headers, Rest) end];
false ->
error(bad_format)
end;
eof ->
[]
end
end,
case get_csv_header(CSVData) of
{ok, CSVHeaders, CSVLines} ->
fun() -> Reader(CSVHeaders, CSVLines) end;
{error, Reason} ->
error(Reason)
end.
csv_data(Data) ->
Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]),
{csv_data, Lines}.
get_csv_header(CSV) ->
case csv_read_line(CSV) of
{ok, Line, NewCSV} ->
Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]),
{ok, Seq, NewCSV};
eof ->
{error, empty_file}
end.
csv_read_line({csv_data, [Line | Lines]}) ->
{ok, Line, {csv_data, Lines}};
csv_read_line({csv_data, []}) ->
eof.