From 41f1fd8ebce3a0bf3c66d474513013a0b42d88f5 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 27 Feb 2024 21:14:02 +0100 Subject: [PATCH] feat(utils-stream): add a few more stream combinators --- apps/emqx_utils/src/emqx_utils_stream.erl | 42 ++++++++++- .../test/emqx_utils_stream_tests.erl | 74 +++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 5fd3515ad..693be74f9 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -22,7 +22,9 @@ list/1, mqueue/1, map/2, - chain/2 + zip/1, + chain/2, + repeat/1 ]). %% Evaluating @@ -91,6 +93,31 @@ map(F, S) -> end end. +%% @doc Zip a list of streams into a stream producing lists of their respective values. +%% The resulting stream is as long as the shortest of the input streams. +-spec zip([stream(X)]) -> stream([X]). +zip([S]) -> + map(fun(X) -> [X] end, S); +zip([S | Streams]) -> + ziptail(S, zip(Streams)); +zip([]) -> + empty(). + +ziptail(S, Tail) -> + fun() -> + case next(S) of + [X | SRest] -> + case next(Tail) of + [Xs | TailRest] -> + [[X | Xs] | ziptail(SRest, TailRest)]; + [] -> + [] + end; + [] -> + [] + end + end. + %% @doc Make a stream by chaining (concatenating) two streams. %% The second stream begins to produce values only after the first one is exhausted. -spec chain(stream(X), stream(Y)) -> stream(X | Y). @@ -104,6 +131,19 @@ chain(SFirst, SThen) -> end end. +%% @doc Make an infinite stream out of repeats of given stream. +%% If the given stream is empty, the resulting stream is also empty. +-spec repeat(stream(X)) -> stream(X). +repeat(S) -> + fun() -> + case next(S) of + [X | SRest] -> + [X | chain(SRest, repeat(S))]; + [] -> + [] + end + end. + %% %% @doc Produce the next value from the stream. diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 89ca92d20..6ec661e33 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -74,6 +74,80 @@ chain_list_map_test() -> emqx_utils_stream:consume(S) ). +zip_test() -> + S = emqx_utils_stream:zip([ + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6, 7]) + ]), + ?assertEqual( + [[1, 4], [2, 5], [3, 6]], + emqx_utils_stream:consume(S) + ). + +zip_none_test() -> + ?assertEqual( + [], + emqx_utils_stream:consume(emqx_utils_stream:zip([])) + ). + +zip_one_test() -> + S = emqx_utils_stream:zip([emqx_utils_stream:list([1, 2, 3])]), + ?assertEqual( + [[1], [2], [3]], + emqx_utils_stream:consume(S) + ). + +zip_many_test() -> + S = emqx_utils_stream:zip([ + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6, 7]), + emqx_utils_stream:list([8, 9]) + ]), + ?assertEqual( + [[1, 4, 8], [2, 5, 9]], + emqx_utils_stream:consume(S) + ). + +zip_many_empty_test() -> + S = emqx_utils_stream:zip([ + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:list([4, 5, 6, 7]), + emqx_utils_stream:empty() + ]), + ?assertEqual( + [], + emqx_utils_stream:consume(S) + ). + +repeat_test() -> + S = emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2, 3])), + ?assertMatch( + {[1, 2, 3, 1, 2, 3, 1, 2], _}, + emqx_utils_stream:consume(8, S) + ), + {_, SRest} = emqx_utils_stream:consume(8, S), + ?assertMatch( + {[3, 1, 2, 3, 1, 2, 3, 1], _}, + emqx_utils_stream:consume(8, SRest) + ). + +repeat_empty_test() -> + S = emqx_utils_stream:repeat(emqx_utils_stream:list([])), + ?assertEqual( + [], + emqx_utils_stream:consume(8, S) + ). + +zip_repeat_test() -> + S = emqx_utils_stream:zip([ + emqx_utils_stream:repeat(emqx_utils_stream:list([1, 2])), + emqx_utils_stream:list([4, 5, 6, 7, 8]) + ]), + ?assertEqual( + [[1, 4], [2, 5], [1, 6], [2, 7], [1, 8]], + emqx_utils_stream:consume(S) + ). + mqueue_test() -> _ = erlang:send_after(1, self(), 1), _ = erlang:send_after(100, self(), 2),