chore(streams): add stream methods, optimize streams

This commit is contained in:
Ilya Averyanov 2024-05-15 18:57:28 +03:00
parent 552b62236c
commit e1ce6377f3
3 changed files with 139 additions and 26 deletions

View File

@ -449,14 +449,8 @@ group_match_spec(UserGroup, QString) ->
%% parse import file/data %% parse import file/data
parse_import_users(Filename, FileData, Convertor) -> parse_import_users(Filename, FileData, Convertor) ->
Eval = fun _Eval(F) -> UserStream = reader_fn(Filename, FileData),
case F() of Users = emqx_utils_stream:consume(emqx_utils_stream:map(Convertor, UserStream)),
[] -> [];
[User | F1] -> [Convertor(User) | _Eval(F1)]
end
end,
ReaderFn = reader_fn(Filename, FileData),
Users = Eval(ReaderFn),
NewUsersCount = NewUsersCount =
lists:foldl( lists:foldl(
fun( fun(

View File

@ -23,21 +23,23 @@
const/1, const/1,
mqueue/1, mqueue/1,
map/2, map/2,
filter/2,
foreach/2,
transpose/1, transpose/1,
chain/1, chain/1,
chain/2, chain/2,
repeat/1, repeat/1,
interleave/2, interleave/2,
limit_length/2 limit_length/2,
filter/2,
drop/2,
chainmap/2
]). ]).
%% Evaluating %% Evaluating
-export([ -export([
next/1, next/1,
consume/1, consume/1,
consume/2 consume/2,
foreach/2
]). ]).
%% Streams from ETS tables %% Streams from ETS tables
@ -53,8 +55,9 @@
-export_type([stream/1]). -export_type([stream/1]).
%% @doc A stream is essentially a lazy list. %% @doc A stream is essentially a lazy list.
-type stream(T) :: fun(() -> next(T) | []). -type stream_tail(T) :: fun(() -> next(T) | []).
-type next(T) :: nonempty_improper_list(T, stream(T)). -type stream(T) :: list(T) | nonempty_improper_list(T, stream_tail(T)) | stream_tail(T).
-type next(T) :: nonempty_improper_list(T, stream_tail(T)).
-dialyzer(no_improper_lists). -dialyzer(no_improper_lists).
@ -65,15 +68,12 @@
%% @doc Make a stream that produces no values. %% @doc Make a stream that produces no values.
-spec empty() -> stream(none()). -spec empty() -> stream(none()).
empty() -> empty() ->
fun() -> [] end. [].
%% @doc Make a stream out of the given list. %% @doc Make a stream out of the given list.
%% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`. %% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`.
-spec list([T]) -> stream(T). -spec list([T]) -> stream(T).
list([]) -> list(L) -> L.
empty();
list([X | Rest]) ->
fun() -> [X | list(Rest)] end.
%% @doc Make a stream with a single element infinitely repeated %% @doc Make a stream with a single element infinitely repeated
-spec const(T) -> stream(T). -spec const(T) -> stream(T).
@ -107,7 +107,7 @@ map(F, S) ->
%% @doc Make a stream by filtering the underlying stream with a predicate function. %% @doc Make a stream by filtering the underlying stream with a predicate function.
filter(F, S) -> filter(F, S) ->
FilterNext = fun FilterNext(St) -> FilterNext = fun FilterNext(St) ->
case emqx_utils_stream:next(St) of case next(St) of
[X | Rest] -> [X | Rest] ->
case F(X) of case F(X) of
true -> true ->
@ -123,7 +123,7 @@ filter(F, S) ->
%% @doc Consumes the stream and applies the given function to each element. %% @doc Consumes the stream and applies the given function to each element.
foreach(F, S) -> foreach(F, S) ->
case emqx_utils_stream:next(S) of case next(S) of
[X | Rest] -> [X | Rest] ->
F(X), F(X),
foreach(F, Rest); foreach(F, Rest);
@ -131,6 +131,37 @@ foreach(F, S) ->
ok ok
end. end.
%% @doc Drops N first elements from the stream
-spec drop(non_neg_integer(), stream(T)) -> stream(T).
drop(N, S) ->
DropNext = fun DropNext(M, St) ->
case next(St) of
[_X | Rest] when M > 0 ->
DropNext(M - 1, Rest);
Next ->
Next
end
end,
fun() -> DropNext(N, S) end.
%% @doc Stream version of flatmap.
-spec chainmap(fun((X) -> stream(Y)), stream(X)) -> stream(Y).
chainmap(F, S) ->
ChainNext = fun ChainNext(St) ->
case next(St) of
[X | Rest] ->
case next(F(X)) of
[Y | YRest] ->
[Y | chain(YRest, chainmap(F, Rest))];
[] ->
ChainNext(Rest)
end;
[] ->
[]
end
end,
fun() -> ChainNext(S) end.
%% @doc Transpose a list of streams into a stream producing lists of their respective values. %% @doc Transpose a list of streams into a stream producing lists of their respective values.
%% The resulting stream is as long as the shortest of the input streams. %% The resulting stream is as long as the shortest of the input streams.
-spec transpose([stream(X)]) -> stream([X]). -spec transpose([stream(X)]) -> stream([X]).
@ -201,7 +232,7 @@ repeat(S) ->
interleave(L0, ContinueAtEmpty) -> interleave(L0, ContinueAtEmpty) ->
L = lists:map( L = lists:map(
fun fun
(Stream) when is_function(Stream) -> (Stream) when is_function(Stream) or is_list(Stream) ->
{1, Stream}; {1, Stream};
(A = {N, _}) when N >= 0 -> (A = {N, _}) when N >= 0 ->
A A
@ -230,8 +261,12 @@ limit_length(N, S) when N >= 0 ->
%% @doc Produce the next value from the stream. %% @doc Produce the next value from the stream.
-spec next(stream(T)) -> next(T) | []. -spec next(stream(T)) -> next(T) | [].
next(S) -> next(EvalNext) when is_function(EvalNext) ->
S(). EvalNext();
next([_ | _Rest] = EvaluatedNext) ->
EvaluatedNext;
next([]) ->
[].
%% @doc Consume the stream and return a list of all produced values. %% @doc Consume the stream and return a list of all produced values.
-spec consume(stream(T)) -> [T]. -spec consume(stream(T)) -> [T].
@ -279,9 +314,9 @@ ets(Cont, ContF) ->
fun() -> fun() ->
case ContF(Cont) of case ContF(Cont) of
{Records, '$end_of_table'} -> {Records, '$end_of_table'} ->
next(list(Records)); next(Records);
{Records, NCont} -> {Records, NCont} ->
next(chain(list(Records), ets(NCont, ContF))); next(chain(Records, ets(NCont, ContF)));
'$end_of_table' -> '$end_of_table' ->
[] []
end end

View File

@ -74,6 +74,72 @@ chain_list_map_test() ->
emqx_utils_stream:consume(S) emqx_utils_stream:consume(S)
). ).
filter_test() ->
S = emqx_utils_stream:filter(
fun(N) -> N rem 2 =:= 0 end,
emqx_utils_stream:chain(
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:chain(
emqx_utils_stream:empty(),
emqx_utils_stream:list([4, 5, 6])
)
)
),
?assertEqual(
[2, 4, 6],
emqx_utils_stream:consume(S)
).
drop_test() ->
S = emqx_utils_stream:drop(2, emqx_utils_stream:list([1, 2, 3, 4, 5])),
?assertEqual(
[3, 4, 5],
emqx_utils_stream:consume(S)
).
foreach_test() ->
Self = self(),
ok = emqx_utils_stream:foreach(
fun(N) -> erlang:send(Self, N) end,
emqx_utils_stream:chain(
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:chain(
emqx_utils_stream:empty(),
emqx_utils_stream:list([4, 5, 6])
)
)
),
?assertEqual(
[1, 2, 3, 4, 5, 6],
emqx_utils_stream:consume(emqx_utils_stream:mqueue(100))
).
chainmap_test() ->
S = emqx_utils_stream:chainmap(
fun(N) ->
case N rem 2 of
1 ->
emqx_utils_stream:chain(
emqx_utils_stream:chain(emqx_utils_stream:list([N]), []),
emqx_utils_stream:list([N + 1])
);
0 ->
emqx_utils_stream:empty()
end
end,
emqx_utils_stream:chain(
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:chain(
emqx_utils_stream:empty(),
emqx_utils_stream:list([4, 5, 6])
)
)
),
?assertEqual(
[1, 2, 3, 4, 5, 6],
emqx_utils_stream:consume(S)
).
transpose_test() -> transpose_test() ->
S = emqx_utils_stream:transpose([ S = emqx_utils_stream:transpose([
emqx_utils_stream:list([1, 2, 3]), emqx_utils_stream:list([1, 2, 3]),
@ -173,6 +239,24 @@ interleave_stop_test() ->
emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false)) emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false))
). ).
ets_test() ->
T = ets:new(tab, [ordered_set]),
Objects = [{N, N} || N <- lists:seq(1, 10)],
lists:foreach(
fun(Object) -> ets:insert(T, Object) end,
Objects
),
S = emqx_utils_stream:ets(
fun
(undefined) -> ets:match_object(T, '_', 4);
(Cont) -> ets:match_object(Cont)
end
),
?assertEqual(
Objects,
emqx_utils_stream:consume(S)
).
csv_test() -> csv_test() ->
Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>,
?assertEqual( ?assertEqual(