Merge pull request #13025 from savonarola/0513-fix-cursor-usage

Get rid of qlc in retainer
This commit is contained in:
Ilia Averianov 2024-05-16 17:06:53 +03:00 committed by GitHub
commit 22d5f17de8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 304 additions and 214 deletions

View File

@ -449,14 +449,8 @@ group_match_spec(UserGroup, QString) ->
%% parse import file/data %% parse import file/data
parse_import_users(Filename, FileData, Convertor) -> parse_import_users(Filename, FileData, Convertor) ->
Eval = fun _Eval(F) -> UserStream = reader_fn(Filename, FileData),
case F() of Users = emqx_utils_stream:consume(emqx_utils_stream:map(Convertor, UserStream)),
[] -> [];
[User | F1] -> [Convertor(User) | _Eval(F1)]
end
end,
ReaderFn = reader_fn(Filename, FileData),
Users = Eval(ReaderFn),
NewUsersCount = NewUsersCount =
lists:foldl( lists:foldl(
fun( fun(

View File

@ -23,7 +23,6 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-include_lib("stdlib/include/qlc.hrl").
%% emqx_retainer callbacks %% emqx_retainer callbacks
-export([ -export([
@ -64,12 +63,13 @@
-define(META_KEY, index_meta). -define(META_KEY, index_meta).
-define(CLEAR_BATCH_SIZE, 1000).
-define(REINDEX_BATCH_SIZE, 1000). -define(REINDEX_BATCH_SIZE, 1000).
-define(REINDEX_DISPATCH_WAIT, 30000). -define(REINDEX_DISPATCH_WAIT, 30000).
-define(REINDEX_RPC_RETRY_INTERVAL, 1000). -define(REINDEX_RPC_RETRY_INTERVAL, 1000).
-define(REINDEX_INDEX_UPDATE_WAIT, 30000). -define(REINDEX_INDEX_UPDATE_WAIT, 30000).
-define(MESSAGE_SCAN_BATCH_SIZE, 100).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Management API %% Management API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -177,15 +177,20 @@ clear_expired(_) ->
clear_expired() -> clear_expired() ->
NowMs = erlang:system_time(millisecond), NowMs = erlang:system_time(millisecond),
QH = qlc:q([ S0 = ets_stream(?TAB_MESSAGE),
RetainedMsg S1 = emqx_utils_stream:filter(
|| #retained_message{ fun(#retained_message{expiry_time = ExpiryTime}) ->
expiry_time = ExpiryTime ExpiryTime =/= 0 andalso ExpiryTime < NowMs
} = RetainedMsg <- ets:table(?TAB_MESSAGE), end,
(ExpiryTime =/= 0) and (ExpiryTime < NowMs) S0
]), ),
QC = qlc:cursor(QH), DirtyWriteIndices = dirty_indices(write),
clear_batch(dirty_indices(write), QC). emqx_utils_stream:foreach(
fun(RetainedMsg) ->
delete_message_with_indices(RetainedMsg, DirtyWriteIndices)
end,
S1
).
delete_message(_State, Topic) -> delete_message(_State, Topic) ->
Tokens = topic_to_tokens(Topic), Tokens = topic_to_tokens(Topic),
@ -193,13 +198,11 @@ delete_message(_State, Topic) ->
false -> false ->
ok = delete_message_by_topic(Tokens, dirty_indices(write)); ok = delete_message_by_topic(Tokens, dirty_indices(write));
true -> true ->
QH = search_table(Tokens, 0), S = search_stream(Tokens, 0),
qlc:fold( DirtyWriteIndices = dirty_indices(write),
fun(RetainedMsg, _) -> emqx_utils_stream:foreach(
ok = delete_message_with_indices(RetainedMsg, dirty_indices(write)) fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end,
end, S
undefined,
QH
) )
end. end.
@ -209,59 +212,48 @@ read_message(_State, Topic) ->
match_messages(State, Topic, undefined) -> match_messages(State, Topic, undefined) ->
Tokens = topic_to_tokens(Topic), Tokens = topic_to_tokens(Topic),
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
QH = msg_table(search_table(Tokens, Now)), S = msg_stream(search_stream(Tokens, Now)),
case batch_read_number() of case batch_read_number() of
all_remaining -> all_remaining ->
{ok, qlc:eval(QH), undefined}; {ok, emqx_utils_stream:consume(S), undefined};
BatchNum when is_integer(BatchNum) -> BatchNum when is_integer(BatchNum) ->
Cursor = qlc:cursor(QH), match_messages(State, Topic, {S, BatchNum})
match_messages(State, Topic, {Cursor, BatchNum})
end; end;
match_messages(_State, _Topic, {Cursor, BatchNum}) -> match_messages(_State, _Topic, {S0, BatchNum}) ->
case qlc_next_answers(Cursor, BatchNum) of case emqx_utils_stream:consume(BatchNum, S0) of
{closed, Rows} -> {Rows, S1} ->
{ok, Rows, undefined}; {ok, Rows, {S1, BatchNum}};
{more, Rows} -> Rows when is_list(Rows) ->
{ok, Rows, {Cursor, BatchNum}} {ok, Rows, undefined}
end. end.
delete_cursor(_State, {Cursor, _}) -> delete_cursor(_State, _Cursor) ->
qlc:delete_cursor(Cursor);
delete_cursor(_State, undefined) ->
ok. ok.
page_read(_State, Topic, Page, Limit) -> page_read(_State, Topic, Page, Limit) ->
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
QH = S0 =
case Topic of case Topic of
undefined -> undefined ->
msg_table(search_table(undefined, ['#'], Now)); msg_stream(search_stream(undefined, ['#'], Now));
_ -> _ ->
Tokens = topic_to_tokens(Topic), Tokens = topic_to_tokens(Topic),
msg_table(search_table(Tokens, Now)) msg_stream(search_stream(Tokens, Now))
end, end,
OrderedQH = qlc:sort(QH, {order, fun compare_message/2}), %% This is very inefficient, but we are limited with inherited API
Cursor = qlc:cursor(OrderedQH), S1 = emqx_utils_stream:list(
lists:sort(
fun compare_message/2,
emqx_utils_stream:consume(S0)
)
),
NSkip = (Page - 1) * Limit, NSkip = (Page - 1) * Limit,
SkipResult = S2 = emqx_utils_stream:drop(NSkip, S1),
case NSkip > 0 of case emqx_utils_stream:consume(Limit, S2) of
true -> {Rows, _S3} ->
{Result, _} = qlc_next_answers(Cursor, NSkip), {ok, true, Rows};
Result; Rows when is_list(Rows) ->
false -> {ok, false, Rows}
more
end,
case SkipResult of
closed ->
{ok, false, []};
more ->
case qlc_next_answers(Cursor, Limit) of
{closed, Rows} ->
{ok, false, Rows};
{more, Rows} ->
qlc:delete_cursor(Cursor),
{ok, true, Rows}
end
end. end.
clean(_) -> clean(_) ->
@ -333,58 +325,56 @@ do_store_retained_index(Key, ExpiryTime) ->
}, },
mnesia:write(?TAB_INDEX, RetainedIndex, write). mnesia:write(?TAB_INDEX, RetainedIndex, write).
msg_table(SearchTable) -> msg_stream(SearchStream) ->
qlc:q([ emqx_utils_stream:map(
Msg fun(#retained_message{msg = Msg}) -> Msg end,
|| #retained_message{ SearchStream
msg = Msg ).
} <- SearchTable
]).
search_table(Tokens, Now) -> search_stream(Tokens, Now) ->
Indices = dirty_indices(read), Indices = dirty_indices(read),
Index = emqx_retainer_index:select_index(Tokens, Indices), Index = emqx_retainer_index:select_index(Tokens, Indices),
search_table(Index, Tokens, Now). search_stream(Index, Tokens, Now).
search_table(undefined, Tokens, Now) -> search_stream(undefined, Tokens, Now) ->
Ms = make_message_match_spec(Tokens, Now), Ms = make_message_match_spec(Tokens, Now),
ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]); emqx_utils_stream:ets(
search_table(Index, FilterTokens, Now) -> fun
(undefined) -> ets:select(?TAB_MESSAGE, Ms, 1);
(Cont) -> ets:select(Cont)
end
);
search_stream(Index, FilterTokens, Now) ->
{Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now), {Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now),
Topics = [ IndexRecordStream = emqx_utils_stream:ets(
emqx_retainer_index:restore_topic(Key) fun
|| #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms) (undefined) -> ets:select(?TAB_INDEX, Ms, 1);
], (Cont) -> ets:select(Cont)
RetainedMsgQH = qlc:q([ end
ets:lookup(?TAB_MESSAGE, TopicTokens) ),
|| TopicTokens <- Topics, match(IsExactMs, TopicTokens, FilterTokens) TopicStream = emqx_utils_stream:map(
]), fun(#retained_index{key = Key}) -> emqx_retainer_index:restore_topic(Key) end,
qlc:q([ IndexRecordStream
RetainedMsg ),
|| [ MatchingTopicStream = emqx_utils_stream:filter(
#retained_message{ fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end,
expiry_time = ExpiryTime TopicStream
} = RetainedMsg ),
] <- RetainedMsgQH, RetainMsgStream = emqx_utils_stream:chainmap(
(ExpiryTime == 0) or (ExpiryTime > Now) fun(TopicTokens) -> emqx_utils_stream:list(ets:lookup(?TAB_MESSAGE, TopicTokens)) end,
]). MatchingTopicStream
),
ValidRetainMsgStream = emqx_utils_stream:filter(
fun(#retained_message{expiry_time = ExpiryTime}) ->
ExpiryTime =:= 0 orelse ExpiryTime > Now
end,
RetainMsgStream
),
ValidRetainMsgStream.
match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true; match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true;
match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens). match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens).
clear_batch(Indices, QC) ->
{Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE),
lists:foreach(
fun(RetainedMsg) ->
delete_message_with_indices(RetainedMsg, Indices)
end,
Rows
),
case Result of
closed -> ok;
more -> clear_batch(Indices, QC)
end.
delete_message_by_topic(TopicTokens, Indices) -> delete_message_by_topic(TopicTokens, Indices) ->
case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of
[] -> ok; [] -> ok;
@ -424,21 +414,6 @@ read_messages(Topic) ->
end end
end. end.
qlc_next_answers(QC, N) ->
case qlc:next_answers(QC, N) of
NextAnswers when
is_list(NextAnswers) andalso
length(NextAnswers) < N
->
qlc:delete_cursor(QC),
{closed, NextAnswers};
NextAnswers when is_list(NextAnswers) ->
{more, NextAnswers};
{error, Module, Reason} ->
qlc:delete_cursor(QC),
error({qlc_error, Module, Reason})
end.
make_message_match_spec(Tokens, NowMs) -> make_message_match_spec(Tokens, NowMs) ->
Cond = emqx_retainer_index:condition(Tokens), Cond = emqx_retainer_index:condition(Tokens),
MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'}, MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'},
@ -567,9 +542,11 @@ reindex(NewIndices, Force, StatusFun) when
{atomic, ok} = mria:clear_table(?TAB_INDEX), {atomic, ok} = mria:clear_table(?TAB_INDEX),
%% Fill index records in batches. %% Fill index records in batches.
QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]), TopicStream = emqx_utils_stream:map(
fun(#retained_message{topic = Topic}) -> Topic end,
ok = reindex_batch(qlc:cursor(QH), 0, StatusFun), ets_stream(?TAB_MESSAGE)
),
ok = reindex_batch(TopicStream, 0, StatusFun),
%% Enable read indices and unlock reindexing. %% Enable read indices and unlock reindexing.
finalize_reindex(); finalize_reindex();
@ -647,12 +624,12 @@ reindex_topic(Indices, Topic) ->
ok ok
end. end.
reindex_batch(QC, Done, StatusFun) -> reindex_batch(Stream0, Done, StatusFun) ->
case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [QC, Done]) of case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [Stream0, Done]) of
{atomic, {more, NewDone}} -> {atomic, {more, NewDone, Stream1}} ->
_ = StatusFun(NewDone), _ = StatusFun(NewDone),
reindex_batch(QC, NewDone, StatusFun); reindex_batch(Stream1, NewDone, StatusFun);
{atomic, {closed, NewDone}} -> {atomic, {done, NewDone}} ->
_ = StatusFun(NewDone), _ = StatusFun(NewDone),
ok; ok;
{aborted, Reason} -> {aborted, Reason} ->
@ -663,14 +640,26 @@ reindex_batch(QC, Done, StatusFun) ->
{error, Reason} {error, Reason}
end. end.
do_reindex_batch(QC, Done) -> do_reindex_batch(Stream0, Done) ->
Indices = db_indices(write), Indices = db_indices(write),
{Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE), Result = emqx_utils_stream:consume(?REINDEX_BATCH_SIZE, Stream0),
Topics =
case Result of
{Rows, _Stream1} ->
Rows;
Rows when is_list(Rows) ->
Rows
end,
ok = lists:foreach( ok = lists:foreach(
fun(Topic) -> reindex_topic(Indices, Topic) end, fun(Topic) -> reindex_topic(Indices, Topic) end,
Topics Topics
), ),
{Status, Done + length(Topics)}. case Result of
{_Rows, Stream1} ->
{more, Done + length(Topics), Stream1};
_Rows ->
{done, Done + length(Topics)}
end.
wait_dispatch_complete(Timeout) -> wait_dispatch_complete(Timeout) ->
Nodes = mria:running_nodes(), Nodes = mria:running_nodes(),
@ -706,3 +695,11 @@ are_indices_updated(Indices) ->
_ -> _ ->
false false
end. end.
ets_stream(Tab) ->
emqx_utils_stream:ets(
fun
(undefined) -> ets:match_object(Tab, '_', ?MESSAGE_SCAN_BATCH_SIZE);
(Cont) -> ets:match_object(Cont)
end
).

View File

@ -78,19 +78,10 @@ end_per_suite(Config) ->
emqx_cth_suite:stop(?config(suite_apps, Config)). emqx_cth_suite:stop(?config(suite_apps, Config)).
init_per_group(mnesia_without_indices, Config) -> init_per_group(mnesia_without_indices, Config) ->
mnesia:clear_table(?TAB_INDEX_META), [{index, false} | Config];
mnesia:clear_table(?TAB_INDEX),
mnesia:clear_table(?TAB_MESSAGE),
Config;
init_per_group(mnesia_reindex, Config) -> init_per_group(mnesia_reindex, Config) ->
emqx_retainer_mnesia:populate_index_meta(),
mnesia:clear_table(?TAB_INDEX),
mnesia:clear_table(?TAB_MESSAGE),
Config; Config;
init_per_group(_, Config) -> init_per_group(_, Config) ->
emqx_retainer_mnesia:populate_index_meta(),
mnesia:clear_table(?TAB_INDEX),
mnesia:clear_table(?TAB_MESSAGE),
Config. Config.
end_per_group(_Group, Config) -> end_per_group(_Group, Config) ->
@ -98,9 +89,13 @@ end_per_group(_Group, Config) ->
Config. Config.
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
mnesia:clear_table(?TAB_INDEX), case ?config(index, Config) of
mnesia:clear_table(?TAB_MESSAGE), false ->
emqx_retainer_mnesia:populate_index_meta(), mnesia:clear_table(?TAB_INDEX_META);
_ ->
emqx_retainer_mnesia:populate_index_meta()
end,
emqx_retainer:clean(),
Config. Config.
end_per_testcase(t_flow_control, _Config) -> end_per_testcase(t_flow_control, _Config) ->
@ -315,7 +310,7 @@ t_message_expiry(Config) ->
ok = emqtt:disconnect(C1) ok = emqtt:disconnect(C1)
end, end,
with_conf(ConfMod, Case). with_conf(Config, ConfMod, Case).
t_message_expiry_2(Config) -> t_message_expiry_2(Config) ->
ConfMod = fun(Conf) -> ConfMod = fun(Conf) ->
@ -337,9 +332,9 @@ t_message_expiry_2(Config) ->
ok = emqtt:disconnect(C1) ok = emqtt:disconnect(C1)
end, end,
with_conf(ConfMod, Case). with_conf(Config, ConfMod, Case).
t_table_full(_) -> t_table_full(Config) ->
ConfMod = fun(Conf) -> ConfMod = fun(Conf) ->
Conf#{<<"backend">> => #{<<"max_retained_messages">> => <<"1">>}} Conf#{<<"backend">> => #{<<"max_retained_messages">> => <<"1">>}}
end, end,
@ -356,7 +351,7 @@ t_table_full(_) ->
ok = emqtt:disconnect(C1) ok = emqtt:disconnect(C1)
end, end,
with_conf(ConfMod, Case). with_conf(Config, ConfMod, Case).
t_clean(Config) -> t_clean(Config) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
@ -440,59 +435,14 @@ t_flow_control(_) ->
Diff = End - Begin, Diff = End - Begin,
?assert( ?assert(
Diff > timer:seconds(2.1) andalso Diff < timer:seconds(3.9), Diff > timer:seconds(2.1) andalso Diff < timer:seconds(4),
lists:flatten(io_lib:format("Diff is :~p~n", [Diff])) lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))
), ),
ok = emqtt:disconnect(C1), ok = emqtt:disconnect(C1),
ok. ok.
t_cursor_cleanup(_) -> t_clear_expired(Config) ->
setup_slow_delivery(),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
lists:foreach(
fun(I) ->
emqtt:publish(
C1,
<<"retained/", (integer_to_binary(I))/binary>>,
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]
)
end,
lists:seq(1, 5)
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
snabbkaffe:start_trace(),
?assertWaitEvent(
emqtt:disconnect(C1),
#{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>},
2000
),
?assertEqual(0, qlc_process_count()),
{Pid, Ref} = spawn_monitor(fun() -> ok end),
receive
{'DOWN', Ref, _, _, _} -> ok
after 1000 -> ct:fail("should receive 'DOWN' message")
end,
?assertWaitEvent(
emqx_retainer_dispatcher:dispatch(emqx_retainer:context(), <<"retained/1">>, Pid),
#{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/1">>},
2000
),
?assertEqual(0, qlc_process_count()),
snabbkaffe:stop(),
ok.
t_clear_expired(_) ->
ConfMod = fun(Conf) -> ConfMod = fun(Conf) ->
Conf#{ Conf#{
<<"msg_clear_interval">> := <<"1s">>, <<"msg_clear_interval">> := <<"1s">>,
@ -528,9 +478,9 @@ t_clear_expired(_) ->
ok = emqtt:disconnect(C1) ok = emqtt:disconnect(C1)
end, end,
with_conf(ConfMod, Case). with_conf(Config, ConfMod, Case).
t_max_payload_size(_) -> t_max_payload_size(Config) ->
ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := <<"1kb">>} end, ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := <<"1kb">>} end,
Case = fun() -> Case = fun() ->
emqx_retainer:clean(), emqx_retainer:clean(),
@ -559,7 +509,7 @@ t_max_payload_size(_) ->
ok = emqtt:disconnect(C1) ok = emqtt:disconnect(C1)
end, end,
with_conf(ConfMod, Case). with_conf(Config, ConfMod, Case).
t_page_read(_) -> t_page_read(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
@ -866,10 +816,11 @@ receive_messages(Count, Msgs) ->
Msgs Msgs
end. end.
with_conf(ConfMod, Case) -> with_conf(CTConfig, ConfMod, Case) ->
Conf = emqx:get_raw_config([retainer]), Conf = emqx:get_raw_config([retainer]),
NewConf = ConfMod(Conf), NewConf = ConfMod(Conf),
emqx_retainer:update_config(NewConf), emqx_retainer:update_config(NewConf),
?config(index, CTConfig) =:= false andalso mria:clear_table(?TAB_INDEX_META),
try try
Case(), Case(),
{ok, _} = emqx_retainer:update_config(Conf) {ok, _} = emqx_retainer:update_config(Conf)

View File

@ -30,6 +30,7 @@
read_message/2, read_message/2,
page_read/4, page_read/4,
match_messages/3, match_messages/3,
delete_cursor/2,
clear_expired/1, clear_expired/1,
clean/1, clean/1,
size/1 size/1
@ -63,6 +64,8 @@ page_read(_Context, _Topic, _Offset, _Limit) -> {ok, false, []}.
match_messages(_Context, _Topic, _Cursor) -> {ok, [], 0}. match_messages(_Context, _Topic, _Cursor) -> {ok, [], 0}.
delete_cursor(_Context, _Cursor) -> ok.
clear_expired(_Context) -> ok. clear_expired(_Context) -> ok.
clean(_Context) -> ok. clean(_Context) -> ok.

View File

@ -28,14 +28,18 @@
chain/2, chain/2,
repeat/1, repeat/1,
interleave/2, interleave/2,
limit_length/2 limit_length/2,
filter/2,
drop/2,
chainmap/2
]). ]).
%% Evaluating %% Evaluating
-export([ -export([
next/1, next/1,
consume/1, consume/1,
consume/2 consume/2,
foreach/2
]). ]).
%% Streams from ETS tables %% Streams from ETS tables
@ -51,8 +55,9 @@
-export_type([stream/1]). -export_type([stream/1]).
%% @doc A stream is essentially a lazy list. %% @doc A stream is essentially a lazy list.
-type stream(T) :: fun(() -> next(T) | []). -type stream_tail(T) :: fun(() -> next(T) | []).
-type next(T) :: nonempty_improper_list(T, stream(T)). -type stream(T) :: list(T) | nonempty_improper_list(T, stream_tail(T)) | stream_tail(T).
-type next(T) :: nonempty_improper_list(T, stream_tail(T)).
-dialyzer(no_improper_lists). -dialyzer(no_improper_lists).
@ -63,15 +68,12 @@
%% @doc Make a stream that produces no values. %% @doc Make a stream that produces no values.
-spec empty() -> stream(none()). -spec empty() -> stream(none()).
empty() -> empty() ->
fun() -> [] end. [].
%% @doc Make a stream out of the given list. %% @doc Make a stream out of the given list.
%% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`. %% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`.
-spec list([T]) -> stream(T). -spec list([T]) -> stream(T).
list([]) -> list(L) -> L.
empty();
list([X | Rest]) ->
fun() -> [X | list(Rest)] end.
%% @doc Make a stream with a single element infinitely repeated %% @doc Make a stream with a single element infinitely repeated
-spec const(T) -> stream(T). -spec const(T) -> stream(T).
@ -102,6 +104,64 @@ map(F, S) ->
end end
end. end.
%% @doc Make a stream by filtering the underlying stream with a predicate function.
filter(F, S) ->
FilterNext = fun FilterNext(St) ->
case next(St) of
[X | Rest] ->
case F(X) of
true ->
[X | filter(F, Rest)];
false ->
FilterNext(Rest)
end;
[] ->
[]
end
end,
fun() -> FilterNext(S) end.
%% @doc Consumes the stream and applies the given function to each element.
foreach(F, S) ->
case next(S) of
[X | Rest] ->
F(X),
foreach(F, Rest);
[] ->
ok
end.
%% @doc Drops N first elements from the stream
-spec drop(non_neg_integer(), stream(T)) -> stream(T).
drop(N, S) ->
DropNext = fun DropNext(M, St) ->
case next(St) of
[_X | Rest] when M > 0 ->
DropNext(M - 1, Rest);
Next ->
Next
end
end,
fun() -> DropNext(N, S) end.
%% @doc Stream version of flatmap.
-spec chainmap(fun((X) -> stream(Y)), stream(X)) -> stream(Y).
chainmap(F, S) ->
ChainNext = fun ChainNext(St) ->
case next(St) of
[X | Rest] ->
case next(F(X)) of
[Y | YRest] ->
[Y | chain(YRest, chainmap(F, Rest))];
[] ->
ChainNext(Rest)
end;
[] ->
[]
end
end,
fun() -> ChainNext(S) end.
%% @doc Transpose a list of streams into a stream producing lists of their respective values. %% @doc Transpose a list of streams into a stream producing lists of their respective values.
%% The resulting stream is as long as the shortest of the input streams. %% The resulting stream is as long as the shortest of the input streams.
-spec transpose([stream(X)]) -> stream([X]). -spec transpose([stream(X)]) -> stream([X]).
@ -172,7 +232,7 @@ repeat(S) ->
interleave(L0, ContinueAtEmpty) -> interleave(L0, ContinueAtEmpty) ->
L = lists:map( L = lists:map(
fun fun
(Stream) when is_function(Stream) -> (Stream) when is_function(Stream) or is_list(Stream) ->
{1, Stream}; {1, Stream};
(A = {N, _}) when N >= 0 -> (A = {N, _}) when N >= 0 ->
A A
@ -201,8 +261,12 @@ limit_length(N, S) when N >= 0 ->
%% @doc Produce the next value from the stream. %% @doc Produce the next value from the stream.
-spec next(stream(T)) -> next(T) | []. -spec next(stream(T)) -> next(T) | [].
next(S) -> next(EvalNext) when is_function(EvalNext) ->
S(). EvalNext();
next([_ | _Rest] = EvaluatedNext) ->
EvaluatedNext;
next([]) ->
[].
%% @doc Consume the stream and return a list of all produced values. %% @doc Consume the stream and return a list of all produced values.
-spec consume(stream(T)) -> [T]. -spec consume(stream(T)) -> [T].
@ -243,16 +307,16 @@ consume(N, S, Acc) ->
%% * `ets:match/1` / `ets:match/3` %% * `ets:match/1` / `ets:match/3`
%% * `ets:match_object/1` / `ets:match_object/3` %% * `ets:match_object/1` / `ets:match_object/3`
-spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record). -spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record).
ets(ContF) -> ets(ContF) when is_function(ContF) ->
ets(undefined, ContF). ets(undefined, ContF).
ets(Cont, ContF) -> ets(Cont, ContF) ->
fun() -> fun() ->
case ContF(Cont) of case ContF(Cont) of
{Records, '$end_of_table'} -> {Records, '$end_of_table'} ->
next(list(Records)); next(Records);
{Records, NCont} -> {Records, NCont} ->
next(chain(list(Records), ets(NCont, ContF))); next(chain(Records, ets(NCont, ContF)));
'$end_of_table' -> '$end_of_table' ->
[] []
end end

View File

@ -74,6 +74,72 @@ chain_list_map_test() ->
emqx_utils_stream:consume(S) emqx_utils_stream:consume(S)
). ).
filter_test() ->
S = emqx_utils_stream:filter(
fun(N) -> N rem 2 =:= 0 end,
emqx_utils_stream:chain(
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:chain(
emqx_utils_stream:empty(),
emqx_utils_stream:list([4, 5, 6])
)
)
),
?assertEqual(
[2, 4, 6],
emqx_utils_stream:consume(S)
).
drop_test() ->
S = emqx_utils_stream:drop(2, emqx_utils_stream:list([1, 2, 3, 4, 5])),
?assertEqual(
[3, 4, 5],
emqx_utils_stream:consume(S)
).
foreach_test() ->
Self = self(),
ok = emqx_utils_stream:foreach(
fun(N) -> erlang:send(Self, N) end,
emqx_utils_stream:chain(
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:chain(
emqx_utils_stream:empty(),
emqx_utils_stream:list([4, 5, 6])
)
)
),
?assertEqual(
[1, 2, 3, 4, 5, 6],
emqx_utils_stream:consume(emqx_utils_stream:mqueue(100))
).
chainmap_test() ->
S = emqx_utils_stream:chainmap(
fun(N) ->
case N rem 2 of
1 ->
emqx_utils_stream:chain(
emqx_utils_stream:chain(emqx_utils_stream:list([N]), []),
emqx_utils_stream:list([N + 1])
);
0 ->
emqx_utils_stream:empty()
end
end,
emqx_utils_stream:chain(
emqx_utils_stream:list([1, 2, 3]),
emqx_utils_stream:chain(
emqx_utils_stream:empty(),
emqx_utils_stream:list([4, 5, 6])
)
)
),
?assertEqual(
[1, 2, 3, 4, 5, 6],
emqx_utils_stream:consume(S)
).
transpose_test() -> transpose_test() ->
S = emqx_utils_stream:transpose([ S = emqx_utils_stream:transpose([
emqx_utils_stream:list([1, 2, 3]), emqx_utils_stream:list([1, 2, 3]),
@ -173,6 +239,21 @@ interleave_stop_test() ->
emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false)) emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false))
). ).
ets_test() ->
T = ets:new(tab, [ordered_set]),
Objects = [{N, N} || N <- lists:seq(1, 10)],
ets:insert(T, Objects),
S = emqx_utils_stream:ets(
fun
(undefined) -> ets:match_object(T, '_', 4);
(Cont) -> ets:match_object(Cont)
end
),
?assertEqual(
Objects,
emqx_utils_stream:consume(S)
).
csv_test() -> csv_test() ->
Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>,
?assertEqual( ?assertEqual(