From 1a664c941b4371023ef8abf2fb8bbb3c49fcce6a Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 15 May 2024 18:58:26 +0300 Subject: [PATCH] chore(retainer): scan table in batches, improve stream usage Co-authored-by: Thales Macedo Garitezi Co-authored-by: Zaiming (Stone) Shi --- .../src/emqx_retainer_mnesia.erl | 43 ++++++++----------- .../test/emqx_utils_stream_tests.erl | 5 +-- 2 files changed, 18 insertions(+), 30 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index e61bd13fb..77297cdcd 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -68,6 +68,8 @@ -define(REINDEX_RPC_RETRY_INTERVAL, 1000). -define(REINDEX_INDEX_UPDATE_WAIT, 30000). +-define(MESSAGE_SCAN_BATCH_SIZE, 100). + %%-------------------------------------------------------------------- %% Management API %%-------------------------------------------------------------------- @@ -207,7 +209,7 @@ delete_message(_State, Topic) -> read_message(_State, Topic) -> {ok, read_messages(Topic)}. -match_messages(_State, Topic, undefined) -> +match_messages(State, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), S = msg_stream(search_stream(Tokens, Now)), @@ -215,7 +217,7 @@ match_messages(_State, Topic, undefined) -> all_remaining -> {ok, emqx_utils_stream:consume(S), undefined}; BatchNum when is_integer(BatchNum) -> - match_messages(undefined, Topic, {S, BatchNum}) + match_messages(State, Topic, {S, BatchNum}) end; match_messages(_State, _Topic, {S0, BatchNum}) -> case emqx_utils_stream:consume(BatchNum, S0) of @@ -246,16 +248,12 @@ page_read(_State, Topic, Page, Limit) -> ) ), NSkip = (Page - 1) * Limit, - case emqx_utils_stream:consume(NSkip, S1) of - {_, S2} -> - case emqx_utils_stream:consume(Limit, S2) of - {Rows, _S3} -> - {ok, true, Rows}; - Rows when is_list(Rows) -> - {ok, false, Rows} - end; + S2 = emqx_utils_stream:drop(NSkip, S1), + case emqx_utils_stream:consume(Limit, S2) of + {Rows, _S3} -> + {ok, true, Rows}; Rows when is_list(Rows) -> - {ok, false, []} + {ok, false, Rows} end. clean(_) -> @@ -362,24 +360,17 @@ search_stream(Index, FilterTokens, Now) -> fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end, TopicStream ), - LookedUpRetainMsgStream = emqx_utils_stream:map( - fun(TopicTokens) -> ets:lookup(?TAB_MESSAGE, TopicTokens) end, + RetainMsgStream = emqx_utils_stream:chainmap( + fun(TopicTokens) -> emqx_utils_stream:list(ets:lookup(?TAB_MESSAGE, TopicTokens)) end, MatchingTopicStream ), - FoundAndValidLookedUpRetainMsgStream = emqx_utils_stream:filter( - fun - ([#retained_message{expiry_time = ExpiryTime}]) -> - ExpiryTime =:= 0 orelse ExpiryTime >= Now; - ([]) -> - false + ValidRetainMsgStream = emqx_utils_stream:filter( + fun(#retained_message{expiry_time = ExpiryTime}) -> + ExpiryTime =:= 0 orelse ExpiryTime > Now end, - LookedUpRetainMsgStream + RetainMsgStream ), - RetainMsgStream = emqx_utils_stream:map( - fun([RetainedMsg]) -> RetainedMsg end, - FoundAndValidLookedUpRetainMsgStream - ), - RetainMsgStream. + ValidRetainMsgStream. match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true; match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens). @@ -708,7 +699,7 @@ are_indices_updated(Indices) -> ets_stream(Tab) -> emqx_utils_stream:ets( fun - (undefined) -> ets:match_object(Tab, '_', 1); + (undefined) -> ets:match_object(Tab, '_', ?MESSAGE_SCAN_BATCH_SIZE); (Cont) -> ets:match_object(Cont) end ). diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index a0787dd1d..726a0880d 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -242,10 +242,7 @@ interleave_stop_test() -> ets_test() -> T = ets:new(tab, [ordered_set]), Objects = [{N, N} || N <- lists:seq(1, 10)], - lists:foreach( - fun(Object) -> ets:insert(T, Object) end, - Objects - ), + ets:insert(T, Objects), S = emqx_utils_stream:ets( fun (undefined) -> ets:match_object(T, '_', 4);