From d30c99512a43d2b52a31c542e6a7e4326bcb4eec Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 27 Feb 2024 21:14:02 +0100 Subject: [PATCH] feat(utils-stream): add a few more stream combinators --- apps/emqx_utils/src/emqx_utils_stream.erl | 42 ++++++++++- .../test/emqx_utils_stream_tests.erl | 74 +++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 791a4de16..fac536532 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -22,7 +22,9 @@ list/1, mqueue/1, map/2, - chain/2 + transpose/1, + chain/2, + repeat/1 ]). %% Evaluating @@ -91,6 +93,31 @@ map(F, S) -> end end. +%% @doc Transpose a list of streams into a stream producing lists of their respective values. +%% The resulting stream is as long as the shortest of the input streams. +-spec transpose([stream(X)]) -> stream([X]). +transpose([S]) -> + map(fun(X) -> [X] end, S); +transpose([S | Streams]) -> + transpose_tail(S, transpose(Streams)); +transpose([]) -> + empty(). + +transpose_tail(S, Tail) -> + fun() -> + case next(S) of + [X | SRest] -> + case next(Tail) of + [Xs | TailRest] -> + [[X | Xs] | transpose_tail(SRest, TailRest)]; + [] -> + [] + end; + [] -> + [] + end + end. + %% @doc Make a stream by chaining (concatenating) two streams. %% The second stream begins to produce values only after the first one is exhausted. -spec chain(stream(X), stream(Y)) -> stream(X | Y). @@ -104,6 +131,19 @@ chain(SFirst, SThen) -> end end. +%% @doc Make an infinite stream out of repeats of given stream. +%% If the given stream is empty, the resulting stream is also empty. +-spec repeat(stream(X)) -> stream(X). +repeat(S) -> + fun() -> + case next(S) of + [X | SRest] -> + [X | chain(SRest, repeat(S))]; + [] -> + [] + end + end. + %% %% @doc Produce the next value from the stream. diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 814d6fe0d..60b67a4ff 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -74,6 +74,80 @@ chain_list_map_test() -> emqx_utils_stream:consume(S) ). +transpose_test() -> + S = emqx_utils_stream:transpose([ + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6, 7]) + ]), + ?assertEqual( + [[1, 4], [2, 5], [3, 6]], + emqx_utils_stream:consume(S) + ). + +transpose_none_test() -> + ?assertEqual( + [], + emqx_utils_stream:consume(emqx_utils_stream:transpose([])) + ). + +transpose_one_test() -> + S = emqx_utils_stream:transpose([emqx_utils_stream:list([1, 2, 3])]), + ?assertEqual( + [[1], [2], [3]], + emqx_utils_stream:consume(S) + ). + +transpose_many_test() -> + S = emqx_utils_stream:transpose([ + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6, 7]), + emqx_utils_stream:list([8, 9]) + ]), + ?assertEqual( + [[1, 4, 8], [2, 5, 9]], + emqx_utils_stream:consume(S) + ). + +transpose_many_empty_test() -> + S = emqx_utils_stream:transpose([ + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6, 7]), + emqx_utils_stream:empty() + ]), + ?assertEqual( + [], + emqx_utils_stream:consume(S) + ). + +repeat_test() -> + S = emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2, 3])), + ?assertMatch( + {[1, 2, 3, 1, 2, 3, 1, 2], _}, + emqx_utils_stream:consume(8, S) + ), + {_, SRest} = emqx_utils_stream:consume(8, S), + ?assertMatch( + {[3, 1, 2, 3, 1, 2, 3, 1], _}, + emqx_utils_stream:consume(8, SRest) + ). + +repeat_empty_test() -> + S = emqx_utils_stream:repeat(emqx_utils_stream:list([])), + ?assertEqual( + [], + emqx_utils_stream:consume(8, S) + ). + +transpose_repeat_test() -> + S = emqx_utils_stream:transpose([ + emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2])), + emqx_utils_stream:list([4, 5, 6, 7, 8]) + ]), + ?assertEqual( + [[1, 4], [2, 5], [1, 6], [2, 7], [1, 8]], + emqx_utils_stream:consume(S) + ). + mqueue_test() -> _ = erlang:send_after(1, self(), 1), _ = erlang:send_after(100, self(), 2),