feat(utils-stream): add a few more stream combinators

This commit is contained in:
Andrew Mayorov 2024-02-27 21:14:02 +01:00
parent 574746b190
commit 41f1fd8ebc
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 115 additions and 1 deletions

View File

@ -22,7 +22,9 @@
list/1,
mqueue/1,
map/2,
chain/2
zip/1,
chain/2,
repeat/1
]).
%% Evaluating
@ -91,6 +93,31 @@ map(F, S) ->
end
end.
%% @doc Zip a list of streams into a stream producing lists of their respective values.
%% The resulting stream is as long as the shortest of the input streams.
-spec zip([stream(X)]) -> stream([X]).
zip([S]) ->
map(fun(X) -> [X] end, S);
zip([S | Streams]) ->
ziptail(S, zip(Streams));
zip([]) ->
empty().
ziptail(S, Tail) ->
fun() ->
case next(S) of
[X | SRest] ->
case next(Tail) of
[Xs | TailRest] ->
[[X | Xs] | ziptail(SRest, TailRest)];
[] ->
[]
end;
[] ->
[]
end
end.
%% @doc Make a stream by chaining (concatenating) two streams.
%% The second stream begins to produce values only after the first one is exhausted.
-spec chain(stream(X), stream(Y)) -> stream(X | Y).
@ -104,6 +131,19 @@ chain(SFirst, SThen) ->
end
end.
%% @doc Make an infinite stream out of repeats of given stream.
%% If the given stream is empty, the resulting stream is also empty.
-spec repeat(stream(X)) -> stream(X).
repeat(S) ->
fun() ->
case next(S) of
[X | SRest] ->
[X | chain(SRest, repeat(S))];
[] ->
[]
end
end.
%%
%% @doc Produce the next value from the stream.

View File

@ -74,6 +74,80 @@ chain_list_map_test() ->
emqx_utils_stream:consume(S)
).
zip_test() ->
S = emqx_utils_stream:zip([
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:list([4, 5, 6, 7])
]),
?assertEqual(
[[1, 4], [2, 5], [3, 6]],
emqx_utils_stream:consume(S)
).
zip_none_test() ->
?assertEqual(
[],
emqx_utils_stream:consume(emqx_utils_stream:zip([]))
).
zip_one_test() ->
S = emqx_utils_stream:zip([emqx_utils_stream:list([1, 2, 3])]),
?assertEqual(
[[1], [2], [3]],
emqx_utils_stream:consume(S)
).
zip_many_test() ->
S = emqx_utils_stream:zip([
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:list([4, 5, 6, 7]),
emqx_utils_stream:list([8, 9])
]),
?assertEqual(
[[1, 4, 8], [2, 5, 9]],
emqx_utils_stream:consume(S)
).
zip_many_empty_test() ->
S = emqx_utils_stream:zip([
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:list([4, 5, 6, 7]),
emqx_utils_stream:empty()
]),
?assertEqual(
[],
emqx_utils_stream:consume(S)
).
repeat_test() ->
S = emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2, 3])),
?assertMatch(
{[1, 2, 3, 1, 2, 3, 1, 2], _},
emqx_utils_stream:consume(8, S)
),
{_, SRest} = emqx_utils_stream:consume(8, S),
?assertMatch(
{[3, 1, 2, 3, 1, 2, 3, 1], _},
emqx_utils_stream:consume(8, SRest)
).
repeat_empty_test() ->
S = emqx_utils_stream:repeat(emqx_utils_stream:list([])),
?assertEqual(
[],
emqx_utils_stream:consume(8, S)
).
zip_repeat_test() ->
S = emqx_utils_stream:zip([
emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2])),
emqx_utils_stream:list([4, 5, 6, 7, 8])
]),
?assertEqual(
[[1, 4], [2, 5], [1, 6], [2, 7], [1, 8]],
emqx_utils_stream:consume(S)
).
mqueue_test() ->
_ = erlang:send_after(1, self(), 1),
_ = erlang:send_after(100, self(), 2),