diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl index 0c9631896..8a50bd19f 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl @@ -449,14 +449,8 @@ group_match_spec(UserGroup, QString) -> %% parse import file/data parse_import_users(Filename, FileData, Convertor) -> - Eval = fun _Eval(F) -> - case F() of - [] -> []; - [User | F1] -> [Convertor(User) | _Eval(F1)] - end - end, - ReaderFn = reader_fn(Filename, FileData), - Users = Eval(ReaderFn), + UserStream = reader_fn(Filename, FileData), + Users = emqx_utils_stream:consume(emqx_utils_stream:map(Convertor, UserStream)), NewUsersCount = lists:foldl( fun( diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 6ff43528b..510b3e377 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -23,21 +23,23 @@ const/1, mqueue/1, map/2, - filter/2, - foreach/2, transpose/1, chain/1, chain/2, repeat/1, interleave/2, - limit_length/2 + limit_length/2, + filter/2, + drop/2, + chainmap/2 ]). %% Evaluating -export([ next/1, consume/1, - consume/2 + consume/2, + foreach/2 ]). %% Streams from ETS tables @@ -53,8 +55,9 @@ -export_type([stream/1]). %% @doc A stream is essentially a lazy list. --type stream(T) :: fun(() -> next(T) | []). --type next(T) :: nonempty_improper_list(T, stream(T)). +-type stream_tail(T) :: fun(() -> next(T) | []). +-type stream(T) :: list(T) | nonempty_improper_list(T, stream_tail(T)) | stream_tail(T). +-type next(T) :: nonempty_improper_list(T, stream_tail(T)). -dialyzer(no_improper_lists). @@ -65,15 +68,12 @@ %% @doc Make a stream that produces no values. -spec empty() -> stream(none()). empty() -> - fun() -> [] end. + []. %% @doc Make a stream out of the given list. %% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`. -spec list([T]) -> stream(T). -list([]) -> - empty(); -list([X | Rest]) -> - fun() -> [X | list(Rest)] end. +list(L) -> L. %% @doc Make a stream with a single element infinitely repeated -spec const(T) -> stream(T). @@ -107,7 +107,7 @@ map(F, S) -> %% @doc Make a stream by filtering the underlying stream with a predicate function. filter(F, S) -> FilterNext = fun FilterNext(St) -> - case emqx_utils_stream:next(St) of + case next(St) of [X | Rest] -> case F(X) of true -> @@ -123,7 +123,7 @@ filter(F, S) -> %% @doc Consumes the stream and applies the given function to each element. foreach(F, S) -> - case emqx_utils_stream:next(S) of + case next(S) of [X | Rest] -> F(X), foreach(F, Rest); @@ -131,6 +131,37 @@ foreach(F, S) -> ok end. +%% @doc Drops N first elements from the stream +-spec drop(non_neg_integer(), stream(T)) -> stream(T). +drop(N, S) -> + DropNext = fun DropNext(M, St) -> + case next(St) of + [_X | Rest] when M > 0 -> + DropNext(M - 1, Rest); + Next -> + Next + end + end, + fun() -> DropNext(N, S) end. + +%% @doc Stream version of flatmap. +-spec chainmap(fun((X) -> stream(Y)), stream(X)) -> stream(Y). +chainmap(F, S) -> + ChainNext = fun ChainNext(St) -> + case next(St) of + [X | Rest] -> + case next(F(X)) of + [Y | YRest] -> + [Y | chain(YRest, chainmap(F, Rest))]; + [] -> + ChainNext(Rest) + end; + [] -> + [] + end + end, + fun() -> ChainNext(S) end. + %% @doc Transpose a list of streams into a stream producing lists of their respective values. %% The resulting stream is as long as the shortest of the input streams. -spec transpose([stream(X)]) -> stream([X]). @@ -201,7 +232,7 @@ repeat(S) -> interleave(L0, ContinueAtEmpty) -> L = lists:map( fun - (Stream) when is_function(Stream) -> + (Stream) when is_function(Stream) or is_list(Stream) -> {1, Stream}; (A = {N, _}) when N >= 0 -> A @@ -230,8 +261,12 @@ limit_length(N, S) when N >= 0 -> %% @doc Produce the next value from the stream. -spec next(stream(T)) -> next(T) | []. -next(S) -> - S(). +next(EvalNext) when is_function(EvalNext) -> + EvalNext(); +next([_ | _Rest] = EvaluatedNext) -> + EvaluatedNext; +next([]) -> + []. %% @doc Consume the stream and return a list of all produced values. -spec consume(stream(T)) -> [T]. @@ -279,9 +314,9 @@ ets(Cont, ContF) -> fun() -> case ContF(Cont) of {Records, '$end_of_table'} -> - next(list(Records)); + next(Records); {Records, NCont} -> - next(chain(list(Records), ets(NCont, ContF))); + next(chain(Records, ets(NCont, ContF))); '$end_of_table' -> [] end diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index fe340d3ee..a0787dd1d 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -74,6 +74,72 @@ chain_list_map_test() -> emqx_utils_stream:consume(S) ). +filter_test() -> + S = emqx_utils_stream:filter( + fun(N) -> N rem 2 =:= 0 end, + emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:list([4, 5, 6]) + ) + ) + ), + ?assertEqual( + [2, 4, 6], + emqx_utils_stream:consume(S) + ). + +drop_test() -> + S = emqx_utils_stream:drop(2, emqx_utils_stream:list([1, 2, 3, 4, 5])), + ?assertEqual( + [3, 4, 5], + emqx_utils_stream:consume(S) + ). + +foreach_test() -> + Self = self(), + ok = emqx_utils_stream:foreach( + fun(N) -> erlang:send(Self, N) end, + emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:list([4, 5, 6]) + ) + ) + ), + ?assertEqual( + [1, 2, 3, 4, 5, 6], + emqx_utils_stream:consume(emqx_utils_stream:mqueue(100)) + ). + +chainmap_test() -> + S = emqx_utils_stream:chainmap( + fun(N) -> + case N rem 2 of + 1 -> + emqx_utils_stream:chain( + emqx_utils_stream:chain(emqx_utils_stream:list([N]), []), + emqx_utils_stream:list([N + 1]) + ); + 0 -> + emqx_utils_stream:empty() + end + end, + emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:list([4, 5, 6]) + ) + ) + ), + ?assertEqual( + [1, 2, 3, 4, 5, 6], + emqx_utils_stream:consume(S) + ). + transpose_test() -> S = emqx_utils_stream:transpose([ emqx_utils_stream:list([1, 2, 3]), @@ -173,6 +239,24 @@ interleave_stop_test() -> emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false)) ). +ets_test() -> + T = ets:new(tab, [ordered_set]), + Objects = [{N, N} || N <- lists:seq(1, 10)], + lists:foreach( + fun(Object) -> ets:insert(T, Object) end, + Objects + ), + S = emqx_utils_stream:ets( + fun + (undefined) -> ets:match_object(T, '_', 4); + (Cont) -> ets:match_object(Cont) + end + ), + ?assertEqual( + Objects, + emqx_utils_stream:consume(S) + ). + csv_test() -> Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, ?assertEqual(