fix(utils): rename `emqx_utils_stream:take/2` to `consume/2`

Which is more neutral and harder to confuse with a destructive `take` in
collections.
This commit is contained in:
Andrew Mayorov 2023-11-15 17:20:40 +07:00
parent 6812ee9d0f
commit 8919b08207
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 35 additions and 20 deletions

View File

@ -479,7 +479,7 @@ select_v2(_Spec, Limit, Stream) ->
select_next(Limit, Stream).
select_next(N, Stream) ->
case emqx_utils_stream:take(N, Stream) of
case emqx_utils_stream:consume(N, Stream) of
{Routes, SRest} ->
{Routes, SRest};
Routes ->

View File

@ -27,8 +27,8 @@
%% Evaluating
-export([
next/1,
take/2,
consume/1
consume/1,
consume/2
]).
%% Streams from ETS tables
@ -46,16 +46,20 @@
%%
%% @doc Make a stream that produces no values.
-spec empty() -> stream(none()).
empty() ->
fun() -> [] end.
%% @doc Make a stream out of the given list.
%% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`.
-spec list([T]) -> stream(T).
list([]) ->
empty();
list([X | Rest]) ->
fun() -> [X | list(Rest)] end.
%% @doc Make a stream by applying a function to each element of the underlying stream.
-spec map(fun((X) -> Y), stream(X)) -> stream(Y).
map(F, S) ->
fun() ->
@ -67,6 +71,8 @@ map(F, S) ->
end
end.
%% @doc Make a stream by chaining (concatenating) two streams.
%% The second stream begins to produce values only after the first one is exhausted.
-spec chain(stream(X), stream(Y)) -> stream(X | Y).
chain(SFirst, SThen) ->
fun() ->
@ -80,24 +86,12 @@ chain(SFirst, SThen) ->
%%
%% @doc Produce the next value from the stream.
-spec next(stream(T)) -> next(T) | [].
next(S) ->
S().
-spec take(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T].
take(N, S) ->
take(N, S, []).
take(0, S, Acc) ->
{lists:reverse(Acc), S};
take(N, S, Acc) ->
case next(S) of
[X | SRest] ->
take(N - 1, SRest, [X | Acc]);
[] ->
lists:reverse(Acc)
end.
%% @doc Consume the stream and return a list of all produced values.
-spec consume(stream(T)) -> [T].
consume(S) ->
case next(S) of
@ -107,6 +101,22 @@ consume(S) ->
[]
end.
%% @doc Consume N values from the stream and return a list of them and the rest of the stream.
%% If the stream is exhausted before N values are produced, return just a list of these values.
-spec consume(non_neg_integer(), stream(T)) -> {[T], stream(T)} | [T].
consume(N, S) ->
consume(N, S, []).
consume(0, S, Acc) ->
{lists:reverse(Acc), S};
consume(N, S, Acc) ->
case next(S) of
[X | SRest] ->
consume(N - 1, SRest, [X | Acc]);
[] ->
lists:reverse(Acc)
end.
%%
-type select_result(Record, Cont) ::
@ -114,6 +124,11 @@ consume(S) ->
| {[Record], '$end_of_table'}
| '$end_of_table'.
%% @doc Make a stream out of an ETS table, where the ETS table is scanned through in chunks,
%% with the given continuation function. The function is assumed to return a result of a call to:
%% * `ets:select/1` / `ets:select/3`
%% * `ets:match/1` / `ets:match/3`
%% * `ets:match_object/1` / `ets:match_object/3`
-spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record).
ets(ContF) ->
ets(undefined, ContF).

View File

@ -50,12 +50,12 @@ chain_take_test() ->
),
?assertMatch(
{[1, 2, 3, 4, 5], _SRest},
emqx_utils_stream:take(5, S)
emqx_utils_stream:consume(5, S)
),
{_, SRest} = emqx_utils_stream:take(5, S),
{_, SRest} = emqx_utils_stream:consume(5, S),
?assertEqual(
[6, 7, 8],
emqx_utils_stream:take(5, SRest)
emqx_utils_stream:consume(5, SRest)
).
chain_list_map_test() ->