From 0b39aaadbd8b471d26a30fa7280e9805b3a05217 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Sat, 11 May 2024 14:32:08 +0300 Subject: [PATCH 1/4] chore(retainer): get rid of qlc usage --- .../src/emqx_retainer_mnesia.erl | 278 ++++++++++-------- .../test/emqx_retainer_SUITE.erl | 87 ++---- .../test/emqx_retainer_dummy.erl | 3 + 3 files changed, 177 insertions(+), 191 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index daaa776b7..90b4f40ca 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -23,7 +23,6 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). --include_lib("stdlib/include/qlc.hrl"). %% emqx_retainer callbacks -export([ @@ -64,7 +63,6 @@ -define(META_KEY, index_meta). --define(CLEAR_BATCH_SIZE, 1000). -define(REINDEX_BATCH_SIZE, 1000). -define(REINDEX_DISPATCH_WAIT, 30000). -define(REINDEX_RPC_RETRY_INTERVAL, 1000). @@ -177,15 +175,20 @@ clear_expired(_) -> clear_expired() -> NowMs = erlang:system_time(millisecond), - QH = qlc:q([ - RetainedMsg - || #retained_message{ - expiry_time = ExpiryTime - } = RetainedMsg <- ets:table(?TAB_MESSAGE), - (ExpiryTime =/= 0) and (ExpiryTime < NowMs) - ]), - QC = qlc:cursor(QH), - clear_batch(dirty_indices(write), QC). + S0 = ets_stream(?TAB_MESSAGE), + S1 = stream_filter( + fun(#retained_message{expiry_time = ExpiryTime}) -> + ExpiryTime =/= 0 andalso ExpiryTime < NowMs + end, + S0 + ), + DirtyWriteIndices = dirty_indices(write), + stream_foreach( + fun(RetainedMsg) -> + delete_message_with_indices(RetainedMsg, DirtyWriteIndices) + end, + S1 + ). delete_message(_State, Topic) -> Tokens = topic_to_tokens(Topic), @@ -193,13 +196,11 @@ delete_message(_State, Topic) -> false -> ok = delete_message_by_topic(Tokens, dirty_indices(write)); true -> - QH = search_table(Tokens, 0), - qlc:fold( - fun(RetainedMsg, _) -> - ok = delete_message_with_indices(RetainedMsg, dirty_indices(write)) - end, - undefined, - QH + S = search_stream(Tokens, 0), + DirtyWriteIndices = dirty_indices(write), + stream_foreach( + fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end, + S ) end. @@ -209,59 +210,52 @@ read_message(_State, Topic) -> match_messages(State, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), - QH = msg_table(search_table(Tokens, Now)), + S = msg_stream(search_stream(Tokens, Now)), case batch_read_number() of all_remaining -> - {ok, qlc:eval(QH), undefined}; + {ok, emqx_utils_stream:consume(S), undefined}; BatchNum when is_integer(BatchNum) -> - Cursor = qlc:cursor(QH), - match_messages(State, Topic, {Cursor, BatchNum}) + match_messages(undefined, Topic, {S, BatchNum}) end; -match_messages(_State, _Topic, {Cursor, BatchNum}) -> - case qlc_next_answers(Cursor, BatchNum) of - {closed, Rows} -> - {ok, Rows, undefined}; - {more, Rows} -> - {ok, Rows, {Cursor, BatchNum}} +match_messages(_State, _Topic, {S0, BatchNum}) -> + case emqx_utils_stream:consume(BatchNum, S0) of + {Rows, S1} -> + {ok, Rows, {S1, BatchNum}}; + Rows when is_list(Rows) -> + {ok, Rows, undefined} end. -delete_cursor(_State, {Cursor, _}) -> - qlc:delete_cursor(Cursor); -delete_cursor(_State, undefined) -> +delete_cursor(_State, _Cursor) -> ok. page_read(_State, Topic, Page, Limit) -> Now = erlang:system_time(millisecond), - QH = + S0 = case Topic of undefined -> - msg_table(search_table(undefined, ['#'], Now)); + msg_stream(search_stream(undefined, ['#'], Now)); _ -> Tokens = topic_to_tokens(Topic), - msg_table(search_table(Tokens, Now)) + msg_stream(search_stream(Tokens, Now)) end, - OrderedQH = qlc:sort(QH, {order, fun compare_message/2}), - Cursor = qlc:cursor(OrderedQH), + %% This is very inefficient, but we are limited with inherited API + S1 = emqx_utils_stream:list( + lists:sort( + fun compare_message/2, + emqx_utils_stream:consume(S0) + ) + ), NSkip = (Page - 1) * Limit, - SkipResult = - case NSkip > 0 of - true -> - {Result, _} = qlc_next_answers(Cursor, NSkip), - Result; - false -> - more - end, - case SkipResult of - closed -> - {ok, false, []}; - more -> - case qlc_next_answers(Cursor, Limit) of - {closed, Rows} -> - {ok, false, Rows}; - {more, Rows} -> - qlc:delete_cursor(Cursor), - {ok, true, Rows} - end + case emqx_utils_stream:consume(NSkip, S1) of + {_, S2} -> + case emqx_utils_stream:consume(Limit, S2) of + {Rows, _S3} -> + {ok, true, Rows}; + Rows when is_list(Rows) -> + {ok, false, Rows} + end; + Rows when is_list(Rows) -> + {ok, false, []} end. clean(_) -> @@ -333,58 +327,63 @@ do_store_retained_index(Key, ExpiryTime) -> }, mnesia:write(?TAB_INDEX, RetainedIndex, write). -msg_table(SearchTable) -> - qlc:q([ - Msg - || #retained_message{ - msg = Msg - } <- SearchTable - ]). +msg_stream(SearchStream) -> + emqx_utils_stream:map( + fun(#retained_message{msg = Msg}) -> Msg end, + SearchStream + ). -search_table(Tokens, Now) -> +search_stream(Tokens, Now) -> Indices = dirty_indices(read), Index = emqx_retainer_index:select_index(Tokens, Indices), - search_table(Index, Tokens, Now). + search_stream(Index, Tokens, Now). -search_table(undefined, Tokens, Now) -> +search_stream(undefined, Tokens, Now) -> Ms = make_message_match_spec(Tokens, Now), - ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]); -search_table(Index, FilterTokens, Now) -> + emqx_utils_stream:ets( + fun + (undefined) -> ets:select(?TAB_MESSAGE, Ms, 1); + (Cont) -> ets:select(Cont) + end + ); +search_stream(Index, FilterTokens, Now) -> {Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now), - Topics = [ - emqx_retainer_index:restore_topic(Key) - || #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms) - ], - RetainedMsgQH = qlc:q([ - ets:lookup(?TAB_MESSAGE, TopicTokens) - || TopicTokens <- Topics, match(IsExactMs, TopicTokens, FilterTokens) - ]), - qlc:q([ - RetainedMsg - || [ - #retained_message{ - expiry_time = ExpiryTime - } = RetainedMsg - ] <- RetainedMsgQH, - (ExpiryTime == 0) or (ExpiryTime > Now) - ]). + IndexRecordStream = emqx_utils_stream:ets( + fun + (undefined) -> ets:select(?TAB_INDEX, Ms, 1); + (Cont) -> ets:select(Cont) + end + ), + TopicStream = emqx_utils_stream:map( + fun(#retained_index{key = Key}) -> emqx_retainer_index:restore_topic(Key) end, + IndexRecordStream + ), + MatchingTopicStream = stream_filter( + fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end, + TopicStream + ), + LookedUpRetainMsgStream = emqx_utils_stream:map( + fun(TopicTokens) -> ets:lookup(?TAB_MESSAGE, TopicTokens) end, + MatchingTopicStream + ), + FoundAndValidLookedUpRetainMsgStream = stream_filter( + fun + ([#retained_message{expiry_time = ExpiryTime}]) -> + ExpiryTime =:= 0 orelse ExpiryTime >= Now; + ([]) -> + false + end, + LookedUpRetainMsgStream + ), + RetainMsgStream = emqx_utils_stream:map( + fun([RetainedMsg]) -> RetainedMsg end, + FoundAndValidLookedUpRetainMsgStream + ), + RetainMsgStream. match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true; match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens). -clear_batch(Indices, QC) -> - {Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE), - lists:foreach( - fun(RetainedMsg) -> - delete_message_with_indices(RetainedMsg, Indices) - end, - Rows - ), - case Result of - closed -> ok; - more -> clear_batch(Indices, QC) - end. - delete_message_by_topic(TopicTokens, Indices) -> case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of [] -> ok; @@ -424,21 +423,6 @@ read_messages(Topic) -> end end. -qlc_next_answers(QC, N) -> - case qlc:next_answers(QC, N) of - NextAnswers when - is_list(NextAnswers) andalso - length(NextAnswers) < N - -> - qlc:delete_cursor(QC), - {closed, NextAnswers}; - NextAnswers when is_list(NextAnswers) -> - {more, NextAnswers}; - {error, Module, Reason} -> - qlc:delete_cursor(QC), - error({qlc_error, Module, Reason}) - end. - make_message_match_spec(Tokens, NowMs) -> Cond = emqx_retainer_index:condition(Tokens), MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'}, @@ -567,9 +551,11 @@ reindex(NewIndices, Force, StatusFun) when {atomic, ok} = mria:clear_table(?TAB_INDEX), %% Fill index records in batches. - QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]), - - ok = reindex_batch(qlc:cursor(QH), 0, StatusFun), + TopicStream = emqx_utils_stream:map( + fun(#retained_message{topic = Topic}) -> Topic end, + ets_stream(?TAB_MESSAGE) + ), + ok = reindex_batch(TopicStream, 0, StatusFun), %% Enable read indices and unlock reindexing. finalize_reindex(); @@ -647,12 +633,12 @@ reindex_topic(Indices, Topic) -> ok end. -reindex_batch(QC, Done, StatusFun) -> - case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [QC, Done]) of - {atomic, {more, NewDone}} -> +reindex_batch(Stream0, Done, StatusFun) -> + case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [Stream0, Done]) of + {atomic, {more, NewDone, Stream1}} -> _ = StatusFun(NewDone), - reindex_batch(QC, NewDone, StatusFun); - {atomic, {closed, NewDone}} -> + reindex_batch(Stream1, NewDone, StatusFun); + {atomic, {done, NewDone}} -> _ = StatusFun(NewDone), ok; {aborted, Reason} -> @@ -663,14 +649,26 @@ reindex_batch(QC, Done, StatusFun) -> {error, Reason} end. -do_reindex_batch(QC, Done) -> +do_reindex_batch(Stream0, Done) -> Indices = db_indices(write), - {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE), + Result = emqx_utils_stream:consume(?REINDEX_BATCH_SIZE, Stream0), + Topics = + case Result of + {Rows, _Stream1} -> + Rows; + Rows when is_list(Rows) -> + Rows + end, ok = lists:foreach( fun(Topic) -> reindex_topic(Indices, Topic) end, Topics ), - {Status, Done + length(Topics)}. + case Result of + {_Rows, Stream1} -> + {more, Done + length(Topics), Stream1}; + _Rows -> + {done, Done + length(Topics)} + end. wait_dispatch_complete(Timeout) -> Nodes = mria:running_nodes(), @@ -706,3 +704,37 @@ are_indices_updated(Indices) -> _ -> false end. + +ets_stream(Tab) -> + emqx_utils_stream:ets( + fun + (undefined) -> ets:match_object(Tab, '$1', 1); + (Cont) -> ets:match_object(Cont) + end + ). + +stream_foreach(F, S) -> + case emqx_utils_stream:next(S) of + [X | Rest] -> + F(X), + stream_foreach(F, Rest); + [] -> + ok + end. + +%% TODO: move to emqx_utils_stream +stream_filter(F, S) -> + FilterNext = fun FilterNext(St) -> + case emqx_utils_stream:next(St) of + [X | Rest] -> + case F(X) of + true -> + [X | stream_filter(F, Rest)]; + false -> + FilterNext(Rest) + end; + [] -> + [] + end + end, + fun() -> FilterNext(S) end. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index b29974068..9a269a509 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -78,19 +78,10 @@ end_per_suite(Config) -> emqx_cth_suite:stop(?config(suite_apps, Config)). init_per_group(mnesia_without_indices, Config) -> - mnesia:clear_table(?TAB_INDEX_META), - mnesia:clear_table(?TAB_INDEX), - mnesia:clear_table(?TAB_MESSAGE), - Config; + [{index, false} | Config]; init_per_group(mnesia_reindex, Config) -> - emqx_retainer_mnesia:populate_index_meta(), - mnesia:clear_table(?TAB_INDEX), - mnesia:clear_table(?TAB_MESSAGE), Config; init_per_group(_, Config) -> - emqx_retainer_mnesia:populate_index_meta(), - mnesia:clear_table(?TAB_INDEX), - mnesia:clear_table(?TAB_MESSAGE), Config. end_per_group(_Group, Config) -> @@ -98,9 +89,13 @@ end_per_group(_Group, Config) -> Config. init_per_testcase(_TestCase, Config) -> - mnesia:clear_table(?TAB_INDEX), - mnesia:clear_table(?TAB_MESSAGE), - emqx_retainer_mnesia:populate_index_meta(), + case ?config(index, Config) of + false -> + mnesia:clear_table(?TAB_INDEX_META); + _ -> + emqx_retainer_mnesia:populate_index_meta() + end, + emqx_retainer:clean(), Config. end_per_testcase(t_flow_control, _Config) -> @@ -315,7 +310,7 @@ t_message_expiry(Config) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). t_message_expiry_2(Config) -> ConfMod = fun(Conf) -> @@ -337,9 +332,9 @@ t_message_expiry_2(Config) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). -t_table_full(_) -> +t_table_full(Config) -> ConfMod = fun(Conf) -> Conf#{<<"backend">> => #{<<"max_retained_messages">> => <<"1">>}} end, @@ -356,7 +351,7 @@ t_table_full(_) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). t_clean(Config) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), @@ -440,59 +435,14 @@ t_flow_control(_) -> Diff = End - Begin, ?assert( - Diff > timer:seconds(2.1) andalso Diff < timer:seconds(3.9), + Diff > timer:seconds(2.1) andalso Diff < timer:seconds(4), lists:flatten(io_lib:format("Diff is :~p~n", [Diff])) ), ok = emqtt:disconnect(C1), ok. -t_cursor_cleanup(_) -> - setup_slow_delivery(), - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), - lists:foreach( - fun(I) -> - emqtt:publish( - C1, - <<"retained/", (integer_to_binary(I))/binary>>, - <<"this is a retained message">>, - [{qos, 0}, {retain, true}] - ) - end, - lists:seq(1, 5) - ), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), - - snabbkaffe:start_trace(), - - ?assertWaitEvent( - emqtt:disconnect(C1), - #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>}, - 2000 - ), - - ?assertEqual(0, qlc_process_count()), - - {Pid, Ref} = spawn_monitor(fun() -> ok end), - receive - {'DOWN', Ref, _, _, _} -> ok - after 1000 -> ct:fail("should receive 'DOWN' message") - end, - - ?assertWaitEvent( - emqx_retainer_dispatcher:dispatch(emqx_retainer:context(), <<"retained/1">>, Pid), - #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/1">>}, - 2000 - ), - - ?assertEqual(0, qlc_process_count()), - - snabbkaffe:stop(), - - ok. - -t_clear_expired(_) -> +t_clear_expired(Config) -> ConfMod = fun(Conf) -> Conf#{ <<"msg_clear_interval">> := <<"1s">>, @@ -528,9 +478,9 @@ t_clear_expired(_) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). -t_max_payload_size(_) -> +t_max_payload_size(Config) -> ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := <<"1kb">>} end, Case = fun() -> emqx_retainer:clean(), @@ -559,7 +509,7 @@ t_max_payload_size(_) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). t_page_read(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), @@ -866,10 +816,11 @@ receive_messages(Count, Msgs) -> Msgs end. -with_conf(ConfMod, Case) -> +with_conf(CTConfig, ConfMod, Case) -> Conf = emqx:get_raw_config([retainer]), NewConf = ConfMod(Conf), emqx_retainer:update_config(NewConf), + ?config(index, CTConfig) =:= false andalso mria:clear_table(?TAB_INDEX_META), try Case(), {ok, _} = emqx_retainer:update_config(Conf) diff --git a/apps/emqx_retainer/test/emqx_retainer_dummy.erl b/apps/emqx_retainer/test/emqx_retainer_dummy.erl index e72a2d0cf..86399edf1 100644 --- a/apps/emqx_retainer/test/emqx_retainer_dummy.erl +++ b/apps/emqx_retainer/test/emqx_retainer_dummy.erl @@ -30,6 +30,7 @@ read_message/2, page_read/4, match_messages/3, + delete_cursor/2, clear_expired/1, clean/1, size/1 @@ -63,6 +64,8 @@ page_read(_Context, _Topic, _Offset, _Limit) -> {ok, false, []}. match_messages(_Context, _Topic, _Cursor) -> {ok, [], 0}. +delete_cursor(_Context, _Cursor) -> ok. + clear_expired(_Context) -> ok. clean(_Context) -> ok. From 552b62236c88c43048d37f3fa7f1029ecfdccadc Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Mon, 13 May 2024 11:51:20 +0300 Subject: [PATCH 2/4] chore(retainer): move filter/foreach to emqx_utils_stream --- .../src/emqx_retainer_mnesia.erl | 40 ++++--------------- apps/emqx_utils/src/emqx_utils_stream.erl | 31 +++++++++++++- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 90b4f40ca..e61bd13fb 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -176,14 +176,14 @@ clear_expired(_) -> clear_expired() -> NowMs = erlang:system_time(millisecond), S0 = ets_stream(?TAB_MESSAGE), - S1 = stream_filter( + S1 = emqx_utils_stream:filter( fun(#retained_message{expiry_time = ExpiryTime}) -> ExpiryTime =/= 0 andalso ExpiryTime < NowMs end, S0 ), DirtyWriteIndices = dirty_indices(write), - stream_foreach( + emqx_utils_stream:foreach( fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end, @@ -198,7 +198,7 @@ delete_message(_State, Topic) -> true -> S = search_stream(Tokens, 0), DirtyWriteIndices = dirty_indices(write), - stream_foreach( + emqx_utils_stream:foreach( fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end, S ) @@ -207,7 +207,7 @@ delete_message(_State, Topic) -> read_message(_State, Topic) -> {ok, read_messages(Topic)}. -match_messages(State, Topic, undefined) -> +match_messages(_State, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), S = msg_stream(search_stream(Tokens, Now)), @@ -358,7 +358,7 @@ search_stream(Index, FilterTokens, Now) -> fun(#retained_index{key = Key}) -> emqx_retainer_index:restore_topic(Key) end, IndexRecordStream ), - MatchingTopicStream = stream_filter( + MatchingTopicStream = emqx_utils_stream:filter( fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end, TopicStream ), @@ -366,7 +366,7 @@ search_stream(Index, FilterTokens, Now) -> fun(TopicTokens) -> ets:lookup(?TAB_MESSAGE, TopicTokens) end, MatchingTopicStream ), - FoundAndValidLookedUpRetainMsgStream = stream_filter( + FoundAndValidLookedUpRetainMsgStream = emqx_utils_stream:filter( fun ([#retained_message{expiry_time = ExpiryTime}]) -> ExpiryTime =:= 0 orelse ExpiryTime >= Now; @@ -708,33 +708,7 @@ are_indices_updated(Indices) -> ets_stream(Tab) -> emqx_utils_stream:ets( fun - (undefined) -> ets:match_object(Tab, '$1', 1); + (undefined) -> ets:match_object(Tab, '_', 1); (Cont) -> ets:match_object(Cont) end ). - -stream_foreach(F, S) -> - case emqx_utils_stream:next(S) of - [X | Rest] -> - F(X), - stream_foreach(F, Rest); - [] -> - ok - end. - -%% TODO: move to emqx_utils_stream -stream_filter(F, S) -> - FilterNext = fun FilterNext(St) -> - case emqx_utils_stream:next(St) of - [X | Rest] -> - case F(X) of - true -> - [X | stream_filter(F, Rest)]; - false -> - FilterNext(Rest) - end; - [] -> - [] - end - end, - fun() -> FilterNext(S) end. diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 8b6db8292..6ff43528b 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -23,6 +23,8 @@ const/1, mqueue/1, map/2, + filter/2, + foreach/2, transpose/1, chain/1, chain/2, @@ -102,6 +104,33 @@ map(F, S) -> end end. +%% @doc Make a stream by filtering the underlying stream with a predicate function. +filter(F, S) -> + FilterNext = fun FilterNext(St) -> + case emqx_utils_stream:next(St) of + [X | Rest] -> + case F(X) of + true -> + [X | filter(F, Rest)]; + false -> + FilterNext(Rest) + end; + [] -> + [] + end + end, + fun() -> FilterNext(S) end. + +%% @doc Consumes the stream and applies the given function to each element. +foreach(F, S) -> + case emqx_utils_stream:next(S) of + [X | Rest] -> + F(X), + foreach(F, Rest); + [] -> + ok + end. + %% @doc Transpose a list of streams into a stream producing lists of their respective values. %% The resulting stream is as long as the shortest of the input streams. -spec transpose([stream(X)]) -> stream([X]). @@ -243,7 +272,7 @@ consume(N, S, Acc) -> %% * `ets:match/1` / `ets:match/3` %% * `ets:match_object/1` / `ets:match_object/3` -spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record). -ets(ContF) -> +ets(ContF) when is_function(ContF) -> ets(undefined, ContF). ets(Cont, ContF) -> From e1ce6377f38793b0258f754c85483f2c4c3959ed Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 15 May 2024 18:57:28 +0300 Subject: [PATCH 3/4] chore(streams): add stream methods, optimize streams --- .../src/emqx_authn_mnesia.erl | 10 +-- apps/emqx_utils/src/emqx_utils_stream.erl | 71 ++++++++++++---- .../test/emqx_utils_stream_tests.erl | 84 +++++++++++++++++++ 3 files changed, 139 insertions(+), 26 deletions(-) diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl index 0c9631896..8a50bd19f 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl @@ -449,14 +449,8 @@ group_match_spec(UserGroup, QString) -> %% parse import file/data parse_import_users(Filename, FileData, Convertor) -> - Eval = fun _Eval(F) -> - case F() of - [] -> []; - [User | F1] -> [Convertor(User) | _Eval(F1)] - end - end, - ReaderFn = reader_fn(Filename, FileData), - Users = Eval(ReaderFn), + UserStream = reader_fn(Filename, FileData), + Users = emqx_utils_stream:consume(emqx_utils_stream:map(Convertor, UserStream)), NewUsersCount = lists:foldl( fun( diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 6ff43528b..510b3e377 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -23,21 +23,23 @@ const/1, mqueue/1, map/2, - filter/2, - foreach/2, transpose/1, chain/1, chain/2, repeat/1, interleave/2, - limit_length/2 + limit_length/2, + filter/2, + drop/2, + chainmap/2 ]). %% Evaluating -export([ next/1, consume/1, - consume/2 + consume/2, + foreach/2 ]). %% Streams from ETS tables @@ -53,8 +55,9 @@ -export_type([stream/1]). %% @doc A stream is essentially a lazy list. --type stream(T) :: fun(() -> next(T) | []). --type next(T) :: nonempty_improper_list(T, stream(T)). +-type stream_tail(T) :: fun(() -> next(T) | []). +-type stream(T) :: list(T) | nonempty_improper_list(T, stream_tail(T)) | stream_tail(T). +-type next(T) :: nonempty_improper_list(T, stream_tail(T)). -dialyzer(no_improper_lists). @@ -65,15 +68,12 @@ %% @doc Make a stream that produces no values. -spec empty() -> stream(none()). empty() -> - fun() -> [] end. + []. %% @doc Make a stream out of the given list. %% Essentially it's an opposite of `consume/1`, i.e. `L = consume(list(L))`. -spec list([T]) -> stream(T). -list([]) -> - empty(); -list([X | Rest]) -> - fun() -> [X | list(Rest)] end. +list(L) -> L. %% @doc Make a stream with a single element infinitely repeated -spec const(T) -> stream(T). @@ -107,7 +107,7 @@ map(F, S) -> %% @doc Make a stream by filtering the underlying stream with a predicate function. filter(F, S) -> FilterNext = fun FilterNext(St) -> - case emqx_utils_stream:next(St) of + case next(St) of [X | Rest] -> case F(X) of true -> @@ -123,7 +123,7 @@ filter(F, S) -> %% @doc Consumes the stream and applies the given function to each element. foreach(F, S) -> - case emqx_utils_stream:next(S) of + case next(S) of [X | Rest] -> F(X), foreach(F, Rest); @@ -131,6 +131,37 @@ foreach(F, S) -> ok end. +%% @doc Drops N first elements from the stream +-spec drop(non_neg_integer(), stream(T)) -> stream(T). +drop(N, S) -> + DropNext = fun DropNext(M, St) -> + case next(St) of + [_X | Rest] when M > 0 -> + DropNext(M - 1, Rest); + Next -> + Next + end + end, + fun() -> DropNext(N, S) end. + +%% @doc Stream version of flatmap. +-spec chainmap(fun((X) -> stream(Y)), stream(X)) -> stream(Y). +chainmap(F, S) -> + ChainNext = fun ChainNext(St) -> + case next(St) of + [X | Rest] -> + case next(F(X)) of + [Y | YRest] -> + [Y | chain(YRest, chainmap(F, Rest))]; + [] -> + ChainNext(Rest) + end; + [] -> + [] + end + end, + fun() -> ChainNext(S) end. + %% @doc Transpose a list of streams into a stream producing lists of their respective values. %% The resulting stream is as long as the shortest of the input streams. -spec transpose([stream(X)]) -> stream([X]). @@ -201,7 +232,7 @@ repeat(S) -> interleave(L0, ContinueAtEmpty) -> L = lists:map( fun - (Stream) when is_function(Stream) -> + (Stream) when is_function(Stream) or is_list(Stream) -> {1, Stream}; (A = {N, _}) when N >= 0 -> A @@ -230,8 +261,12 @@ limit_length(N, S) when N >= 0 -> %% @doc Produce the next value from the stream. -spec next(stream(T)) -> next(T) | []. -next(S) -> - S(). +next(EvalNext) when is_function(EvalNext) -> + EvalNext(); +next([_ | _Rest] = EvaluatedNext) -> + EvaluatedNext; +next([]) -> + []. %% @doc Consume the stream and return a list of all produced values. -spec consume(stream(T)) -> [T]. @@ -279,9 +314,9 @@ ets(Cont, ContF) -> fun() -> case ContF(Cont) of {Records, '$end_of_table'} -> - next(list(Records)); + next(Records); {Records, NCont} -> - next(chain(list(Records), ets(NCont, ContF))); + next(chain(Records, ets(NCont, ContF))); '$end_of_table' -> [] end diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index fe340d3ee..a0787dd1d 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -74,6 +74,72 @@ chain_list_map_test() -> emqx_utils_stream:consume(S) ). +filter_test() -> + S = emqx_utils_stream:filter( + fun(N) -> N rem 2 =:= 0 end, + emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:list([4, 5, 6]) + ) + ) + ), + ?assertEqual( + [2, 4, 6], + emqx_utils_stream:consume(S) + ). + +drop_test() -> + S = emqx_utils_stream:drop(2, emqx_utils_stream:list([1, 2, 3, 4, 5])), + ?assertEqual( + [3, 4, 5], + emqx_utils_stream:consume(S) + ). + +foreach_test() -> + Self = self(), + ok = emqx_utils_stream:foreach( + fun(N) -> erlang:send(Self, N) end, + emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:list([4, 5, 6]) + ) + ) + ), + ?assertEqual( + [1, 2, 3, 4, 5, 6], + emqx_utils_stream:consume(emqx_utils_stream:mqueue(100)) + ). + +chainmap_test() -> + S = emqx_utils_stream:chainmap( + fun(N) -> + case N rem 2 of + 1 -> + emqx_utils_stream:chain( + emqx_utils_stream:chain(emqx_utils_stream:list([N]), []), + emqx_utils_stream:list([N + 1]) + ); + 0 -> + emqx_utils_stream:empty() + end + end, + emqx_utils_stream:chain( + emqx_utils_stream:list([1, 2, 3]), + emqx_utils_stream:chain( + emqx_utils_stream:empty(), + emqx_utils_stream:list([4, 5, 6]) + ) + ) + ), + ?assertEqual( + [1, 2, 3, 4, 5, 6], + emqx_utils_stream:consume(S) + ). + transpose_test() -> S = emqx_utils_stream:transpose([ emqx_utils_stream:list([1, 2, 3]), @@ -173,6 +239,24 @@ interleave_stop_test() -> emqx_utils_stream:consume(emqx_utils_stream:interleave([{2, S1}, {2, S2}], false)) ). +ets_test() -> + T = ets:new(tab, [ordered_set]), + Objects = [{N, N} || N <- lists:seq(1, 10)], + lists:foreach( + fun(Object) -> ets:insert(T, Object) end, + Objects + ), + S = emqx_utils_stream:ets( + fun + (undefined) -> ets:match_object(T, '_', 4); + (Cont) -> ets:match_object(Cont) + end + ), + ?assertEqual( + Objects, + emqx_utils_stream:consume(S) + ). + csv_test() -> Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, ?assertEqual( From 1a664c941b4371023ef8abf2fb8bbb3c49fcce6a Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 15 May 2024 18:58:26 +0300 Subject: [PATCH 4/4] chore(retainer): scan table in batches, improve stream usage Co-authored-by: Thales Macedo Garitezi Co-authored-by: Zaiming (Stone) Shi --- .../src/emqx_retainer_mnesia.erl | 43 ++++++++----------- .../test/emqx_utils_stream_tests.erl | 5 +-- 2 files changed, 18 insertions(+), 30 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index e61bd13fb..77297cdcd 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -68,6 +68,8 @@ -define(REINDEX_RPC_RETRY_INTERVAL, 1000). -define(REINDEX_INDEX_UPDATE_WAIT, 30000). +-define(MESSAGE_SCAN_BATCH_SIZE, 100). + %%-------------------------------------------------------------------- %% Management API %%-------------------------------------------------------------------- @@ -207,7 +209,7 @@ delete_message(_State, Topic) -> read_message(_State, Topic) -> {ok, read_messages(Topic)}. -match_messages(_State, Topic, undefined) -> +match_messages(State, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), S = msg_stream(search_stream(Tokens, Now)), @@ -215,7 +217,7 @@ match_messages(_State, Topic, undefined) -> all_remaining -> {ok, emqx_utils_stream:consume(S), undefined}; BatchNum when is_integer(BatchNum) -> - match_messages(undefined, Topic, {S, BatchNum}) + match_messages(State, Topic, {S, BatchNum}) end; match_messages(_State, _Topic, {S0, BatchNum}) -> case emqx_utils_stream:consume(BatchNum, S0) of @@ -246,16 +248,12 @@ page_read(_State, Topic, Page, Limit) -> ) ), NSkip = (Page - 1) * Limit, - case emqx_utils_stream:consume(NSkip, S1) of - {_, S2} -> - case emqx_utils_stream:consume(Limit, S2) of - {Rows, _S3} -> - {ok, true, Rows}; - Rows when is_list(Rows) -> - {ok, false, Rows} - end; + S2 = emqx_utils_stream:drop(NSkip, S1), + case emqx_utils_stream:consume(Limit, S2) of + {Rows, _S3} -> + {ok, true, Rows}; Rows when is_list(Rows) -> - {ok, false, []} + {ok, false, Rows} end. clean(_) -> @@ -362,24 +360,17 @@ search_stream(Index, FilterTokens, Now) -> fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end, TopicStream ), - LookedUpRetainMsgStream = emqx_utils_stream:map( - fun(TopicTokens) -> ets:lookup(?TAB_MESSAGE, TopicTokens) end, + RetainMsgStream = emqx_utils_stream:chainmap( + fun(TopicTokens) -> emqx_utils_stream:list(ets:lookup(?TAB_MESSAGE, TopicTokens)) end, MatchingTopicStream ), - FoundAndValidLookedUpRetainMsgStream = emqx_utils_stream:filter( - fun - ([#retained_message{expiry_time = ExpiryTime}]) -> - ExpiryTime =:= 0 orelse ExpiryTime >= Now; - ([]) -> - false + ValidRetainMsgStream = emqx_utils_stream:filter( + fun(#retained_message{expiry_time = ExpiryTime}) -> + ExpiryTime =:= 0 orelse ExpiryTime > Now end, - LookedUpRetainMsgStream + RetainMsgStream ), - RetainMsgStream = emqx_utils_stream:map( - fun([RetainedMsg]) -> RetainedMsg end, - FoundAndValidLookedUpRetainMsgStream - ), - RetainMsgStream. + ValidRetainMsgStream. match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true; match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens). @@ -708,7 +699,7 @@ are_indices_updated(Indices) -> ets_stream(Tab) -> emqx_utils_stream:ets( fun - (undefined) -> ets:match_object(Tab, '_', 1); + (undefined) -> ets:match_object(Tab, '_', ?MESSAGE_SCAN_BATCH_SIZE); (Cont) -> ets:match_object(Cont) end ). diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index a0787dd1d..726a0880d 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -242,10 +242,7 @@ interleave_stop_test() -> ets_test() -> T = ets:new(tab, [ordered_set]), Objects = [{N, N} || N <- lists:seq(1, 10)], - lists:foreach( - fun(Object) -> ets:insert(T, Object) end, - Objects - ), + ets:insert(T, Objects), S = emqx_utils_stream:ets( fun (undefined) -> ets:match_object(T, '_', 4);