diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 892a4e5ba..1aebb1b21 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -479,7 +479,7 @@ select_v2(_Spec, Limit, Stream) -> select_next(Limit, Stream). select_next(N, Stream) -> - case emqx_utils_stream:take(N, Stream) of + case emqx_utils_stream:consume(N, Stream) of {Routes, SRest} -> {Routes, SRest}; Routes -> diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index e7374c861..79ce5ce7b 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -27,8 +27,8 @@ %% Evaluating -export([ next/1, - take/2, - consume/1 + consume/1, + consume/2 ]). %% Streams from ETS tables @@ -46,16 +46,20 @@ %% +%% @doc Make a stream that produces no values. -spec empty() -> stream(none()). empty() -> fun() -> [] end. +%% @doc Make a stream out of the given list. +%% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`. -spec list([T]) -> stream(T). list([]) -> empty(); list([X | Rest]) -> fun() -> [X | list(Rest)] end. +%% @doc Make a stream by applying a function to each element of the underlying stream. -spec map(fun((X) -> Y), stream(X)) -> stream(Y). map(F, S) -> fun() -> @@ -67,6 +71,8 @@ map(F, S) -> end end. +%% @doc Make a stream by chaining (concatenating) two streams. +%% The second stream begins to produce values only after the first one is exhausted. -spec chain(stream(X), stream(Y)) -> stream(X | Y). chain(SFirst, SThen) -> fun() -> @@ -80,24 +86,12 @@ chain(SFirst, SThen) -> %% +%% @doc Produce the next value from the stream. -spec next(stream(T)) -> next(T) | []. next(S) -> S(). --spec take(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T]. -take(N, S) -> - take(N, S, []). - -take(0, S, Acc) -> - {lists:reverse(Acc), S}; -take(N, S, Acc) -> - case next(S) of - [X | SRest] -> - take(N - 1, SRest, [X | Acc]); - [] -> - lists:reverse(Acc) - end. - +%% @doc Consume the stream and return a list of all produced values. -spec consume(stream(T)) -> [T]. consume(S) -> case next(S) of @@ -107,6 +101,22 @@ consume(S) -> [] end. +%% @doc Consume N values from the stream and return a list of them and the rest of the stream. +%% If the stream is exhausted before N values are produced, return just a list of these values. +-spec consume(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T]. +consume(N, S) -> + consume(N, S, []). + +consume(0, S, Acc) -> + {lists:reverse(Acc), S}; +consume(N, S, Acc) -> + case next(S) of + [X | SRest] -> + consume(N - 1, SRest, [X | Acc]); + [] -> + lists:reverse(Acc) + end. + %% -type select_result(Record, Cont) :: @@ -114,6 +124,11 @@ consume(S) -> | {[Record], '$end_of_table'} | '$end_of_table'. +%% @doc Make a stream out of an ETS table, where the ETS table is scanned through in chunks, +%% with the given continuation function. The function is assumed to return a result of a call to: +%% * `ets:select/1` / `ets:select/3` +%% * `ets:match/1` / `ets:match/3` +%% * `ets:match_object/1` / `ets:match_object/3` -spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record). ets(ContF) -> ets(undefined, ContF). diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 0f98bae21..4a48ae45d 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -50,12 +50,12 @@ chain_take_test() -> ), ?assertMatch( {[1, 2, 3, 4, 5], _SRest}, - emqx_utils_stream:take(5, S) + emqx_utils_stream:consume(5, S) ), - {_, SRest} = emqx_utils_stream:take(5, S), + {_, SRest} = emqx_utils_stream:consume(5, S), ?assertEqual( [6, 7, 8], - emqx_utils_stream:take(5, SRest) + emqx_utils_stream:consume(5, SRest) ). chain_list_map_test() ->