diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl index 0c9631896..8a50bd19f 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl @@ -449,14 +449,8 @@ group_match_spec(UserGroup, QString) -> %% parse import file/data parse_import_users(Filename, FileData, Convertor) -> - Eval = fun _Eval(F) -> - case F() of - [] -> []; - [User | F1] -> [Convertor(User) | _Eval(F1)] - end - end, - ReaderFn = reader_fn(Filename, FileData), - Users = Eval(ReaderFn), + UserStream = reader_fn(Filename, FileData), + Users = emqx_utils_stream:consume(emqx_utils_stream:map(Convertor, UserStream)), NewUsersCount = lists:foldl( fun( diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index daaa776b7..77297cdcd 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -23,7 +23,6 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). --include_lib("stdlib/include/qlc.hrl"). %% emqx_retainer callbacks -export([ @@ -64,12 +63,13 @@ -define(META_KEY, index_meta). --define(CLEAR_BATCH_SIZE, 1000). -define(REINDEX_BATCH_SIZE, 1000). -define(REINDEX_DISPATCH_WAIT, 30000). -define(REINDEX_RPC_RETRY_INTERVAL, 1000). -define(REINDEX_INDEX_UPDATE_WAIT, 30000). +-define(MESSAGE_SCAN_BATCH_SIZE, 100). + %%-------------------------------------------------------------------- %% Management API %%-------------------------------------------------------------------- @@ -177,15 +177,20 @@ clear_expired(_) -> clear_expired() -> NowMs = erlang:system_time(millisecond), - QH = qlc:q([ - RetainedMsg - || #retained_message{ - expiry_time = ExpiryTime - } = RetainedMsg <- ets:table(?TAB_MESSAGE), - (ExpiryTime =/= 0) and (ExpiryTime < NowMs) - ]), - QC = qlc:cursor(QH), - clear_batch(dirty_indices(write), QC). + S0 = ets_stream(?TAB_MESSAGE), + S1 = emqx_utils_stream:filter( + fun(#retained_message{expiry_time = ExpiryTime}) -> + ExpiryTime =/= 0 andalso ExpiryTime < NowMs + end, + S0 + ), + DirtyWriteIndices = dirty_indices(write), + emqx_utils_stream:foreach( + fun(RetainedMsg) -> + delete_message_with_indices(RetainedMsg, DirtyWriteIndices) + end, + S1 + ). delete_message(_State, Topic) -> Tokens = topic_to_tokens(Topic), @@ -193,13 +198,11 @@ delete_message(_State, Topic) -> false -> ok = delete_message_by_topic(Tokens, dirty_indices(write)); true -> - QH = search_table(Tokens, 0), - qlc:fold( - fun(RetainedMsg, _) -> - ok = delete_message_with_indices(RetainedMsg, dirty_indices(write)) - end, - undefined, - QH + S = search_stream(Tokens, 0), + DirtyWriteIndices = dirty_indices(write), + emqx_utils_stream:foreach( + fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end, + S ) end. @@ -209,59 +212,48 @@ read_message(_State, Topic) -> match_messages(State, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), - QH = msg_table(search_table(Tokens, Now)), + S = msg_stream(search_stream(Tokens, Now)), case batch_read_number() of all_remaining -> - {ok, qlc:eval(QH), undefined}; + {ok, emqx_utils_stream:consume(S), undefined}; BatchNum when is_integer(BatchNum) -> - Cursor = qlc:cursor(QH), - match_messages(State, Topic, {Cursor, BatchNum}) + match_messages(State, Topic, {S, BatchNum}) end; -match_messages(_State, _Topic, {Cursor, BatchNum}) -> - case qlc_next_answers(Cursor, BatchNum) of - {closed, Rows} -> - {ok, Rows, undefined}; - {more, Rows} -> - {ok, Rows, {Cursor, BatchNum}} +match_messages(_State, _Topic, {S0, BatchNum}) -> + case emqx_utils_stream:consume(BatchNum, S0) of + {Rows, S1} -> + {ok, Rows, {S1, BatchNum}}; + Rows when is_list(Rows) -> + {ok, Rows, undefined} end. -delete_cursor(_State, {Cursor, _}) -> - qlc:delete_cursor(Cursor); -delete_cursor(_State, undefined) -> +delete_cursor(_State, _Cursor) -> ok. page_read(_State, Topic, Page, Limit) -> Now = erlang:system_time(millisecond), - QH = + S0 = case Topic of undefined -> - msg_table(search_table(undefined, ['#'], Now)); + msg_stream(search_stream(undefined, ['#'], Now)); _ -> Tokens = topic_to_tokens(Topic), - msg_table(search_table(Tokens, Now)) + msg_stream(search_stream(Tokens, Now)) end, - OrderedQH = qlc:sort(QH, {order, fun compare_message/2}), - Cursor = qlc:cursor(OrderedQH), + %% This is very inefficient, but we are limited with inherited API + S1 = emqx_utils_stream:list( + lists:sort( + fun compare_message/2, + emqx_utils_stream:consume(S0) + ) + ), NSkip = (Page - 1) * Limit, - SkipResult = - case NSkip > 0 of - true -> - {Result, _} = qlc_next_answers(Cursor, NSkip), - Result; - false -> - more - end, - case SkipResult of - closed -> - {ok, false, []}; - more -> - case qlc_next_answers(Cursor, Limit) of - {closed, Rows} -> - {ok, false, Rows}; - {more, Rows} -> - qlc:delete_cursor(Cursor), - {ok, true, Rows} - end + S2 = emqx_utils_stream:drop(NSkip, S1), + case emqx_utils_stream:consume(Limit, S2) of + {Rows, _S3} -> + {ok, true, Rows}; + Rows when is_list(Rows) -> + {ok, false, Rows} end. clean(_) -> @@ -333,58 +325,56 @@ do_store_retained_index(Key, ExpiryTime) -> }, mnesia:write(?TAB_INDEX, RetainedIndex, write). -msg_table(SearchTable) -> - qlc:q([ - Msg - || #retained_message{ - msg = Msg - } <- SearchTable - ]). +msg_stream(SearchStream) -> + emqx_utils_stream:map( + fun(#retained_message{msg = Msg}) -> Msg end, + SearchStream + ). -search_table(Tokens, Now) -> +search_stream(Tokens, Now) -> Indices = dirty_indices(read), Index = emqx_retainer_index:select_index(Tokens, Indices), - search_table(Index, Tokens, Now). + search_stream(Index, Tokens, Now). -search_table(undefined, Tokens, Now) -> +search_stream(undefined, Tokens, Now) -> Ms = make_message_match_spec(Tokens, Now), - ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]); -search_table(Index, FilterTokens, Now) -> + emqx_utils_stream:ets( + fun + (undefined) -> ets:select(?TAB_MESSAGE, Ms, 1); + (Cont) -> ets:select(Cont) + end + ); +search_stream(Index, FilterTokens, Now) -> {Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now), - Topics = [ - emqx_retainer_index:restore_topic(Key) - || #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms) - ], - RetainedMsgQH = qlc:q([ - ets:lookup(?TAB_MESSAGE, TopicTokens) - || TopicTokens <- Topics, match(IsExactMs, TopicTokens, FilterTokens) - ]), - qlc:q([ - RetainedMsg - || [ - #retained_message{ - expiry_time = ExpiryTime - } = RetainedMsg - ] <- RetainedMsgQH, - (ExpiryTime == 0) or (ExpiryTime > Now) - ]). + IndexRecordStream = emqx_utils_stream:ets( + fun + (undefined) -> ets:select(?TAB_INDEX, Ms, 1); + (Cont) -> ets:select(Cont) + end + ), + TopicStream = emqx_utils_stream:map( + fun(#retained_index{key = Key}) -> emqx_retainer_index:restore_topic(Key) end, + IndexRecordStream + ), + MatchingTopicStream = emqx_utils_stream:filter( + fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end, + TopicStream + ), + RetainMsgStream = emqx_utils_stream:chainmap( + fun(TopicTokens) -> emqx_utils_stream:list(ets:lookup(?TAB_MESSAGE, TopicTokens)) end, + MatchingTopicStream + ), + ValidRetainMsgStream = emqx_utils_stream:filter( + fun(#retained_message{expiry_time = ExpiryTime}) -> + ExpiryTime =:= 0 orelse ExpiryTime > Now + end, + RetainMsgStream + ), + ValidRetainMsgStream. match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true; match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens). -clear_batch(Indices, QC) -> - {Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE), - lists:foreach( - fun(RetainedMsg) -> - delete_message_with_indices(RetainedMsg, Indices) - end, - Rows - ), - case Result of - closed -> ok; - more -> clear_batch(Indices, QC) - end. - delete_message_by_topic(TopicTokens, Indices) -> case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of [] -> ok; @@ -424,21 +414,6 @@ read_messages(Topic) -> end end. -qlc_next_answers(QC, N) -> - case qlc:next_answers(QC, N) of - NextAnswers when - is_list(NextAnswers) andalso - length(NextAnswers) < N - -> - qlc:delete_cursor(QC), - {closed, NextAnswers}; - NextAnswers when is_list(NextAnswers) -> - {more, NextAnswers}; - {error, Module, Reason} -> - qlc:delete_cursor(QC), - error({qlc_error, Module, Reason}) - end. - make_message_match_spec(Tokens, NowMs) -> Cond = emqx_retainer_index:condition(Tokens), MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'}, @@ -567,9 +542,11 @@ reindex(NewIndices, Force, StatusFun) when {atomic, ok} = mria:clear_table(?TAB_INDEX), %% Fill index records in batches. - QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]), - - ok = reindex_batch(qlc:cursor(QH), 0, StatusFun), + TopicStream = emqx_utils_stream:map( + fun(#retained_message{topic = Topic}) -> Topic end, + ets_stream(?TAB_MESSAGE) + ), + ok = reindex_batch(TopicStream, 0, StatusFun), %% Enable read indices and unlock reindexing. finalize_reindex(); @@ -647,12 +624,12 @@ reindex_topic(Indices, Topic) -> ok end. -reindex_batch(QC, Done, StatusFun) -> - case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [QC, Done]) of - {atomic, {more, NewDone}} -> +reindex_batch(Stream0, Done, StatusFun) -> + case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [Stream0, Done]) of + {atomic, {more, NewDone, Stream1}} -> _ = StatusFun(NewDone), - reindex_batch(QC, NewDone, StatusFun); - {atomic, {closed, NewDone}} -> + reindex_batch(Stream1, NewDone, StatusFun); + {atomic, {done, NewDone}} -> _ = StatusFun(NewDone), ok; {aborted, Reason} -> @@ -663,14 +640,26 @@ reindex_batch(QC, Done, StatusFun) -> {error, Reason} end. -do_reindex_batch(QC, Done) -> +do_reindex_batch(Stream0, Done) -> Indices = db_indices(write), - {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE), + Result = emqx_utils_stream:consume(?REINDEX_BATCH_SIZE, Stream0), + Topics = + case Result of + {Rows, _Stream1} -> + Rows; + Rows when is_list(Rows) -> + Rows + end, ok = lists:foreach( fun(Topic) -> reindex_topic(Indices, Topic) end, Topics ), - {Status, Done + length(Topics)}. + case Result of + {_Rows, Stream1} -> + {more, Done + length(Topics), Stream1}; + _Rows -> + {done, Done + length(Topics)} + end. wait_dispatch_complete(Timeout) -> Nodes = mria:running_nodes(), @@ -706,3 +695,11 @@ are_indices_updated(Indices) -> _ -> false end. + +ets_stream(Tab) -> + emqx_utils_stream:ets( + fun + (undefined) -> ets:match_object(Tab, '_', ?MESSAGE_SCAN_BATCH_SIZE); + (Cont) -> ets:match_object(Cont) + end + ). diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index b29974068..9a269a509 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -78,19 +78,10 @@ end_per_suite(Config) -> emqx_cth_suite:stop(?config(suite_apps, Config)). init_per_group(mnesia_without_indices, Config) -> - mnesia:clear_table(?TAB_INDEX_META), - mnesia:clear_table(?TAB_INDEX), - mnesia:clear_table(?TAB_MESSAGE), - Config; + [{index, false} | Config]; init_per_group(mnesia_reindex, Config) -> - emqx_retainer_mnesia:populate_index_meta(), - mnesia:clear_table(?TAB_INDEX), - mnesia:clear_table(?TAB_MESSAGE), Config; init_per_group(_, Config) -> - emqx_retainer_mnesia:populate_index_meta(), - mnesia:clear_table(?TAB_INDEX), - mnesia:clear_table(?TAB_MESSAGE), Config. end_per_group(_Group, Config) -> @@ -98,9 +89,13 @@ end_per_group(_Group, Config) -> Config. init_per_testcase(_TestCase, Config) -> - mnesia:clear_table(?TAB_INDEX), - mnesia:clear_table(?TAB_MESSAGE), - emqx_retainer_mnesia:populate_index_meta(), + case ?config(index, Config) of + false -> + mnesia:clear_table(?TAB_INDEX_META); + _ -> + emqx_retainer_mnesia:populate_index_meta() + end, + emqx_retainer:clean(), Config. end_per_testcase(t_flow_control, _Config) -> @@ -315,7 +310,7 @@ t_message_expiry(Config) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). t_message_expiry_2(Config) -> ConfMod = fun(Conf) -> @@ -337,9 +332,9 @@ t_message_expiry_2(Config) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). -t_table_full(_) -> +t_table_full(Config) -> ConfMod = fun(Conf) -> Conf#{<<"backend">> => #{<<"max_retained_messages">> => <<"1">>}} end, @@ -356,7 +351,7 @@ t_table_full(_) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). t_clean(Config) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), @@ -440,59 +435,14 @@ t_flow_control(_) -> Diff = End - Begin, ?assert( - Diff > timer:seconds(2.1) andalso Diff < timer:seconds(3.9), + Diff > timer:seconds(2.1) andalso Diff < timer:seconds(4), lists:flatten(io_lib:format("Diff is :~p~n", [Diff])) ), ok = emqtt:disconnect(C1), ok. -t_cursor_cleanup(_) -> - setup_slow_delivery(), - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), - lists:foreach( - fun(I) -> - emqtt:publish( - C1, - <<"retained/", (integer_to_binary(I))/binary>>, - <<"this is a retained message">>, - [{qos, 0}, {retain, true}] - ) - end, - lists:seq(1, 5) - ), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), - - snabbkaffe:start_trace(), - - ?assertWaitEvent( - emqtt:disconnect(C1), - #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>}, - 2000 - ), - - ?assertEqual(0, qlc_process_count()), - - {Pid, Ref} = spawn_monitor(fun() -> ok end), - receive - {'DOWN', Ref, _, _, _} -> ok - after 1000 -> ct:fail("should receive 'DOWN' message") - end, - - ?assertWaitEvent( - emqx_retainer_dispatcher:dispatch(emqx_retainer:context(), <<"retained/1">>, Pid), - #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/1">>}, - 2000 - ), - - ?assertEqual(0, qlc_process_count()), - - snabbkaffe:stop(), - - ok. - -t_clear_expired(_) -> +t_clear_expired(Config) -> ConfMod = fun(Conf) -> Conf#{ <<"msg_clear_interval">> := <<"1s">>, @@ -528,9 +478,9 @@ t_clear_expired(_) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). -t_max_payload_size(_) -> +t_max_payload_size(Config) -> ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := <<"1kb">>} end, Case = fun() -> emqx_retainer:clean(), @@ -559,7 +509,7 @@ t_max_payload_size(_) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). t_page_read(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), @@ -866,10 +816,11 @@ receive_messages(Count, Msgs) -> Msgs end. -with_conf(ConfMod, Case) -> +with_conf(CTConfig, ConfMod, Case) -> Conf = emqx:get_raw_config([retainer]), NewConf = ConfMod(Conf), emqx_retainer:update_config(NewConf), + ?config(index, CTConfig) =:= false andalso mria:clear_table(?TAB_INDEX_META), try Case(), {ok, _} = emqx_retainer:update_config(Conf) diff --git a/apps/emqx_retainer/test/emqx_retainer_dummy.erl b/apps/emqx_retainer/test/emqx_retainer_dummy.erl index e72a2d0cf..86399edf1 100644 --- a/apps/emqx_retainer/test/emqx_retainer_dummy.erl +++ b/apps/emqx_retainer/test/emqx_retainer_dummy.erl @@ -30,6 +30,7 @@ read_message/2, page_read/4, match_messages/3, + delete_cursor/2, clear_expired/1, clean/1, size/1 @@ -63,6 +64,8 @@ page_read(_Context, _Topic, _Offset, _Limit) -> {ok, false, []}. match_messages(_Context, _Topic, _Cursor) -> {ok, [], 0}. +delete_cursor(_Context, _Cursor) -> ok. + clear_expired(_Context) -> ok. clean(_Context) -> ok. diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 8b6db8292..510b3e377 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -28,14 +28,18 @@ chain/2, repeat/1, interleave/2, - limit_length/2 + limit_length/2, + filter/2, + drop/2, + chainmap/2 ]). %% Evaluating -export([ next/1, consume/1, - consume/2 + consume/2, + foreach/2 ]). %% Streams from ETS tables @@ -51,8 +55,9 @@ -export_type([stream/1]). %% @doc A stream is essentially a lazy list. --type stream(T) :: fun(() -> next(T) | []). --type next(T) :: nonempty_improper_list(T, stream(T)). +-type stream_tail(T) :: fun(() -> next(T) | []). +-type stream(T) :: list(T) | nonempty_improper_list(T, stream_tail(T)) | stream_tail(T). +-type next(T) :: nonempty_improper_list(T, stream_tail(T)). -dialyzer(no_improper_lists). @@ -63,15 +68,12 @@ %% @doc Make a stream that produces no values. -spec empty() -> stream(none()). empty() -> - fun() -> [] end. + []. %% @doc Make a stream out of the given list. %% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`. -spec list([T]) -> stream(T). -list([]) -> - empty(); -list([X | Rest]) -> - fun() -> [X | list(Rest)] end. +list(L) -> L. %% @doc Make a stream with a single element infinitely repeated -spec const(T) -> stream(T). @@ -102,6 +104,64 @@ map(F, S) -> end end. +%% @doc Make a stream by filtering the underlying stream with a predicate function. +filter(F, S) -> + FilterNext = fun FilterNext(St) -> + case next(St) of + [X | Rest] -> + case F(X) of + true -> + [X | filter(F, Rest)]; + false -> + FilterNext(Rest) + end; + [] -> + [] + end + end, + fun() -> FilterNext(S) end. + +%% @doc Consumes the stream and applies the given function to each element. +foreach(F, S) -> + case next(S) of + [X | Rest] -> + F(X), + foreach(F, Rest); + [] -> + ok + end. + +%% @doc Drops N first elements from the stream +-spec drop(non_neg_integer(), stream(T)) -> stream(T). +drop(N, S) -> + DropNext = fun DropNext(M, St) -> + case next(St) of + [_X | Rest] when M > 0 -> + DropNext(M - 1, Rest); + Next -> + Next + end + end, + fun() -> DropNext(N, S) end. + +%% @doc Stream version of flatmap. +-spec chainmap(fun((X) -> stream(Y)), stream(X)) -> stream(Y). +chainmap(F, S) -> + ChainNext = fun ChainNext(St) -> + case next(St) of + [X | Rest] -> + case next(F(X)) of + [Y | YRest] -> + [Y | chain(YRest, chainmap(F, Rest))]; + [] -> + ChainNext(Rest) + end; + [] -> + [] + end + end, + fun() -> ChainNext(S) end. + %% @doc Transpose a list of streams into a stream producing lists of their respective values. %% The resulting stream is as long as the shortest of the input streams. -spec transpose([stream(X)]) -> stream([X]). @@ -172,7 +232,7 @@ repeat(S) -> interleave(L0, ContinueAtEmpty) -> L = lists:map( fun - (Stream) when is_function(Stream) -> + (Stream) when is_function(Stream) or is_list(Stream) -> {1, Stream}; (A = {N, _}) when N >= 0 -> A @@ -201,8 +261,12 @@ limit_length(N, S) when N >= 0 -> %% @doc Produce the next value from the stream. -spec next(stream(T)) -> next(T) | []. -next(S) -> - S(). +next(EvalNext) when is_function(EvalNext) -> + EvalNext(); +next([_ | _Rest] = EvaluatedNext) -> + EvaluatedNext; +next([]) -> + []. %% @doc Consume the stream and return a list of all produced values. -spec consume(stream(T)) -> [T]. @@ -243,16 +307,16 @@ consume(N, S, Acc) -> %% * `ets:match/1` / `ets:match/3` %% * `ets:match_object/1` / `ets:match_object/3` -spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record). -ets(ContF) -> +ets(ContF) when is_function(ContF) -> ets(undefined, ContF). ets(Cont, ContF) -> fun() -> case ContF(Cont) of {Records, '$end_of_table'} -> - next(list(Records)); + next(Records); {Records, NCont} -> - next(chain(list(Records), ets(NCont, ContF))); + next(chain(Records, ets(NCont, ContF))); '$end_of_table' -> [] end diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index fe340d3ee..726a0880d 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -74,6 +74,72 @@ chain_list_map_test() -> emqx_utils_stream:consume(S) ). +filter_test() -> + S = emqx_utils_stream:filter( + fun(N) -> N rem 2 =:= 0 end, + emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:list([4, 5, 6]) + ) + ) + ), + ?assertEqual( + [2, 4, 6], + emqx_utils_stream:consume(S) + ). + +drop_test() -> + S = emqx_utils_stream:drop(2, emqx_utils_stream:list([1, 2, 3, 4, 5])), + ?assertEqual( + [3, 4, 5], + emqx_utils_stream:consume(S) + ). + +foreach_test() -> + Self = self(), + ok = emqx_utils_stream:foreach( + fun(N) -> erlang:send(Self, N) end, + emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:list([4, 5, 6]) + ) + ) + ), + ?assertEqual( + [1, 2, 3, 4, 5, 6], + emqx_utils_stream:consume(emqx_utils_stream:mqueue(100)) + ). + +chainmap_test() -> + S = emqx_utils_stream:chainmap( + fun(N) -> + case N rem 2 of + 1 -> + emqx_utils_stream:chain( + emqx_utils_stream:chain(emqx_utils_stream:list([N]), []), + emqx_utils_stream:list([N + 1]) + ); + 0 -> + emqx_utils_stream:empty() + end + end, + emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:list([4, 5, 6]) + ) + ) + ), + ?assertEqual( + [1, 2, 3, 4, 5, 6], + emqx_utils_stream:consume(S) + ). + transpose_test() -> S = emqx_utils_stream:transpose([ emqx_utils_stream:list([1, 2, 3]), @@ -173,6 +239,21 @@ interleave_stop_test() -> emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false)) ). +ets_test() -> + T = ets:new(tab, [ordered_set]), + Objects = [{N, N} || N <- lists:seq(1, 10)], + ets:insert(T, Objects), + S = emqx_utils_stream:ets( + fun + (undefined) -> ets:match_object(T, '_', 4); + (Cont) -> ets:match_object(Cont) + end + ), + ?assertEqual( + Objects, + emqx_utils_stream:consume(S) + ). + csv_test() -> Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, ?assertEqual(