diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 79ce5ce7b..21321400d 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -20,6 +20,7 @@ -export([ empty/0, list/1, + mqueue/1, map/2, chain/2 ]). @@ -59,6 +60,18 @@ list([]) -> list([X | Rest]) -> fun() -> [X | list(Rest)] end. +%% @doc Make a stream out of process message queue. +-spec mqueue(timeout()) -> stream(any()). +mqueue(Timeout) -> + fun() -> + receive + X -> + [X | mqueue(Timeout)] + after Timeout -> + [] + end + end. + %% @doc Make a stream by applying a function to each element of the underlying stream. -spec map(fun((X) -> Y), stream(X)) -> stream(Y). map(F, S) -> diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 4a48ae45d..ef8185a94 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -73,3 +73,12 @@ chain_list_map_test() -> ["1", "2", "3", "4", "5", "6"], emqx_utils_stream:consume(S) ). + +mqueue_test() -> + _ = erlang:send_after(1, self(), 1), + _ = erlang:send_after(100, self(), 2), + _ = erlang:send_after(20, self(), 42), + ?assertEqual( + [1, 42, 2], + emqx_utils_stream:consume(emqx_utils_stream:mqueue(400)) + ).