From 13166e6ca851aa97018834214bb797f0295de815 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Sat, 11 May 2024 14:32:08 +0300 Subject: [PATCH] chore(retainer): get rid of qlc usage --- .../src/emqx_retainer_mnesia.erl | 273 ++++++++++-------- .../test/emqx_retainer_SUITE.erl | 103 ++----- 2 files changed, 173 insertions(+), 203 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index bdc1f2c67..67e7529dd 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -22,7 +22,6 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). --include_lib("stdlib/include/qlc.hrl"). %% emqx_retainer callbacks -export([ @@ -60,7 +59,6 @@ -define(META_KEY, index_meta). --define(CLEAR_BATCH_SIZE, 1000). -define(REINDEX_BATCH_SIZE, 1000). -define(REINDEX_DISPATCH_WAIT, 30000). -define(REINDEX_RPC_RETRY_INTERVAL, 1000). @@ -167,15 +165,20 @@ clear_expired(_) -> clear_expired() -> NowMs = erlang:system_time(millisecond), - QH = qlc:q([ - RetainedMsg - || #retained_message{ - expiry_time = ExpiryTime - } = RetainedMsg <- ets:table(?TAB_MESSAGE), - (ExpiryTime =/= 0) and (ExpiryTime < NowMs) - ]), - QC = qlc:cursor(QH), - clear_batch(dirty_indices(write), QC). + S0 = ets_stream(?TAB_MESSAGE), + S1 = stream_filter( + fun(#retained_message{expiry_time = ExpiryTime}) -> + ExpiryTime =/= 0 andalso ExpiryTime < NowMs + end, + S0 + ), + DirtyWriteIndices = dirty_indices(write), + stream_foreach( + fun(RetainedMsg) -> + delete_message_with_indices(RetainedMsg, DirtyWriteIndices) + end, + S1 + ). delete_message(_State, Topic) -> Tokens = topic_to_tokens(Topic), @@ -183,13 +186,11 @@ delete_message(_State, Topic) -> false -> ok = delete_message_by_topic(Tokens, dirty_indices(write)); true -> - QH = search_table(Tokens, 0), - qlc:fold( - fun(RetainedMsg, _) -> - ok = delete_message_with_indices(RetainedMsg, dirty_indices(write)) - end, - undefined, - QH + S = search_stream(Tokens, 0), + DirtyWriteIndices = dirty_indices(write), + stream_foreach( + fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end, + S ) end. @@ -199,54 +200,49 @@ read_message(_State, Topic) -> match_messages(_State, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), - QH = msg_table(search_table(Tokens, Now)), + S = msg_stream(search_stream(Tokens, Now)), case batch_read_number() of all_remaining -> - {ok, qlc:eval(QH), undefined}; + {ok, emqx_utils_stream:consume(S), undefined}; BatchNum when is_integer(BatchNum) -> - Cursor = qlc:cursor(QH), - match_messages(undefined, Topic, {Cursor, BatchNum}) + match_messages(undefined, Topic, {S, BatchNum}) end; -match_messages(_State, _Topic, {Cursor, BatchNum}) -> - case qlc_next_answers(Cursor, BatchNum) of - {closed, Rows} -> - {ok, Rows, undefined}; - {more, Rows} -> - {ok, Rows, {Cursor, BatchNum}} +match_messages(_State, _Topic, {S0, BatchNum}) -> + case emqx_utils_stream:consume(BatchNum, S0) of + {Rows, S1} -> + {ok, Rows, {S1, BatchNum}}; + Rows when is_list(Rows) -> + {ok, Rows, undefined} end. page_read(_State, Topic, Page, Limit) -> Now = erlang:system_time(millisecond), - QH = + S0 = case Topic of undefined -> - msg_table(search_table(undefined, ['#'], Now)); + msg_stream(search_stream(undefined, ['#'], Now)); _ -> Tokens = topic_to_tokens(Topic), - msg_table(search_table(Tokens, Now)) + msg_stream(search_stream(Tokens, Now)) end, - OrderedQH = qlc:sort(QH, {order, fun compare_message/2}), - Cursor = qlc:cursor(OrderedQH), + %% This is very inefficient, but we are limited with inherited API + S1 = emqx_utils_stream:list( + lists:sort( + fun compare_message/2, + emqx_utils_stream:consume(S0) + ) + ), NSkip = (Page - 1) * Limit, - SkipResult = - case NSkip > 0 of - true -> - {Result, _} = qlc_next_answers(Cursor, NSkip), - Result; - false -> - more - end, - case SkipResult of - closed -> - {ok, false, []}; - more -> - case qlc_next_answers(Cursor, Limit) of - {closed, Rows} -> - {ok, false, Rows}; - {more, Rows} -> - qlc:delete_cursor(Cursor), - {ok, true, Rows} - end + case emqx_utils_stream:consume(NSkip, S1) of + {_, S2} -> + case emqx_utils_stream:consume(Limit, S2) of + {Rows, _S3} -> + {ok, true, Rows}; + Rows when is_list(Rows) -> + {ok, false, Rows} + end; + Rows when is_list(Rows) -> + {ok, false, []} end. clean(_) -> @@ -318,58 +314,63 @@ do_store_retained_index(Key, ExpiryTime) -> }, mnesia:write(?TAB_INDEX, RetainedIndex, write). -msg_table(SearchTable) -> - qlc:q([ - Msg - || #retained_message{ - msg = Msg - } <- SearchTable - ]). +msg_stream(SearchStream) -> + emqx_utils_stream:map( + fun(#retained_message{msg = Msg}) -> Msg end, + SearchStream + ). -search_table(Tokens, Now) -> +search_stream(Tokens, Now) -> Indices = dirty_indices(read), Index = emqx_retainer_index:select_index(Tokens, Indices), - search_table(Index, Tokens, Now). + search_stream(Index, Tokens, Now). -search_table(undefined, Tokens, Now) -> +search_stream(undefined, Tokens, Now) -> Ms = make_message_match_spec(Tokens, Now), - ets:table(?TAB_MESSAGE, [{traverse, {select, Ms}}]); -search_table(Index, FilterTokens, Now) -> + emqx_utils_stream:ets( + fun + (undefined) -> ets:select(?TAB_MESSAGE, Ms, 1); + (Cont) -> ets:select(Cont) + end + ); +search_stream(Index, FilterTokens, Now) -> {Ms, IsExactMs} = make_index_match_spec(Index, FilterTokens, Now), - Topics = [ - emqx_retainer_index:restore_topic(Key) - || #retained_index{key = Key} <- ets:select(?TAB_INDEX, Ms) - ], - RetainedMsgQH = qlc:q([ - ets:lookup(?TAB_MESSAGE, TopicTokens) - || TopicTokens <- Topics, match(IsExactMs, TopicTokens, FilterTokens) - ]), - qlc:q([ - RetainedMsg - || [ - #retained_message{ - expiry_time = ExpiryTime - } = RetainedMsg - ] <- RetainedMsgQH, - (ExpiryTime == 0) or (ExpiryTime > Now) - ]). + IndexRecordStream = emqx_utils_stream:ets( + fun + (undefined) -> ets:select(?TAB_INDEX, Ms, 1); + (Cont) -> ets:select(Cont) + end + ), + TopicStream = emqx_utils_stream:map( + fun(#retained_index{key = Key}) -> emqx_retainer_index:restore_topic(Key) end, + IndexRecordStream + ), + MatchingTopicStream = stream_filter( + fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end, + TopicStream + ), + LookedUpRetainMsgStream = emqx_utils_stream:map( + fun(TopicTokens) -> ets:lookup(?TAB_MESSAGE, TopicTokens) end, + MatchingTopicStream + ), + FoundAndValidLookedUpRetainMsgStream = stream_filter( + fun + ([#retained_message{expiry_time = ExpiryTime}]) -> + ExpiryTime =:= 0 orelse ExpiryTime >= Now; + ([]) -> + false + end, + LookedUpRetainMsgStream + ), + RetainMsgStream = emqx_utils_stream:map( + fun([RetainedMsg]) -> RetainedMsg end, + FoundAndValidLookedUpRetainMsgStream + ), + RetainMsgStream. match(_IsExactMs = true, _TopicTokens, _FilterTokens) -> true; match(_IsExactMs = false, TopicTokens, FilterTokens) -> emqx_topic:match(TopicTokens, FilterTokens). -clear_batch(Indices, QC) -> - {Result, Rows} = qlc_next_answers(QC, ?CLEAR_BATCH_SIZE), - lists:foreach( - fun(RetainedMsg) -> - delete_message_with_indices(RetainedMsg, Indices) - end, - Rows - ), - case Result of - closed -> ok; - more -> clear_batch(Indices, QC) - end. - delete_message_by_topic(TopicTokens, Indices) -> case mnesia:dirty_read(?TAB_MESSAGE, TopicTokens) of [] -> ok; @@ -409,21 +410,6 @@ read_messages(Topic) -> end end. -qlc_next_answers(QC, N) -> - case qlc:next_answers(QC, N) of - NextAnswers when - is_list(NextAnswers) andalso - length(NextAnswers) < N - -> - qlc:delete_cursor(QC), - {closed, NextAnswers}; - NextAnswers when is_list(NextAnswers) -> - {more, NextAnswers}; - {error, Module, Reason} -> - qlc:delete_cursor(QC), - error({qlc_error, Module, Reason}) - end. - make_message_match_spec(Tokens, NowMs) -> Cond = emqx_retainer_index:condition(Tokens), MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'}, @@ -552,8 +538,11 @@ reindex(NewIndices, Force, StatusFun) when {atomic, ok} = mria:clear_table(?TAB_INDEX), %% Fill index records in batches. - QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]), - ok = reindex_batch(qlc:cursor(QH), 0, StatusFun), + TopicStream = emqx_utils_stream:map( + fun(#retained_message{topic = Topic}) -> Topic end, + ets_stream(?TAB_MESSAGE) + ), + ok = reindex_batch(TopicStream, 0, StatusFun), %% Enable read indices and unlock reindexing. finalize_reindex(); @@ -631,12 +620,12 @@ reindex_topic(Indices, Topic) -> ok end. -reindex_batch(QC, Done, StatusFun) -> - case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [QC, Done]) of - {atomic, {more, NewDone}} -> +reindex_batch(Stream0, Done, StatusFun) -> + case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [Stream0, Done]) of + {atomic, {more, NewDone, Stream1}} -> _ = StatusFun(NewDone), - reindex_batch(QC, NewDone, StatusFun); - {atomic, {closed, NewDone}} -> + reindex_batch(Stream1, NewDone, StatusFun); + {atomic, {done, NewDone}} -> _ = StatusFun(NewDone), ok; {aborted, Reason} -> @@ -647,14 +636,26 @@ reindex_batch(QC, Done, StatusFun) -> {error, Reason} end. -do_reindex_batch(QC, Done) -> +do_reindex_batch(Stream0, Done) -> Indices = db_indices(write), - {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE), + Result = emqx_utils_stream:consume(?REINDEX_BATCH_SIZE, Stream0), + Topics = + case Result of + {Rows, _Stream1} -> + Rows; + Rows when is_list(Rows) -> + Rows + end, ok = lists:foreach( fun(Topic) -> reindex_topic(Indices, Topic) end, Topics ), - {Status, Done + length(Topics)}. + case Result of + {_Rows, Stream1} -> + {more, Done + length(Topics), Stream1}; + _Rows -> + {done, Done + length(Topics)} + end. wait_dispatch_complete(Timeout) -> Nodes = mria:running_nodes(), @@ -690,3 +691,37 @@ are_indices_updated(Indices) -> _ -> false end. + +ets_stream(Tab) -> + emqx_utils_stream:ets( + fun + (undefined) -> ets:match_object(Tab, '$1', 1); + (Cont) -> ets:match_object(Cont) + end + ). + +stream_foreach(F, S) -> + case emqx_utils_stream:next(S) of + [X | Rest] -> + F(X), + stream_foreach(F, Rest); + [] -> + ok + end. + +%% TODO: move to emqx_utils_stream +stream_filter(F, S) -> + FilterNext = fun FilterNext(St) -> + case emqx_utils_stream:next(St) of + [X | Rest] -> + case F(X) of + true -> + [X | stream_filter(F, Rest)]; + false -> + FilterNext(Rest) + end; + [] -> + [] + end + end, + fun() -> FilterNext(S) end. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 993684ded..0ac0e1588 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -78,31 +78,25 @@ end_per_suite(Config) -> emqx_cth_suite:stop(?config(suite_apps, Config)). init_per_group(mnesia_without_indices, Config) -> - mnesia:clear_table(?TAB_INDEX_META), - mnesia:clear_table(?TAB_INDEX), - mnesia:clear_table(?TAB_MESSAGE), - Config; + [{index, false} | Config]; init_per_group(mnesia_reindex, Config) -> - emqx_retainer_mnesia:populate_index_meta(), - mnesia:clear_table(?TAB_INDEX), - mnesia:clear_table(?TAB_MESSAGE), Config; init_per_group(_, Config) -> - emqx_retainer_mnesia:populate_index_meta(), - mnesia:clear_table(?TAB_INDEX), - mnesia:clear_table(?TAB_MESSAGE), Config. end_per_group(_Group, Config) -> emqx_retainer_mnesia:populate_index_meta(), Config. -init_per_testcase(t_get_basic_usage_info, Config) -> +init_per_testcase(_TestCase, Config) -> + case ?config(index, Config) of + false -> + mnesia:clear_table(?TAB_INDEX_META); + _ -> + emqx_retainer_mnesia:populate_index_meta() + end, mnesia:clear_table(?TAB_INDEX), mnesia:clear_table(?TAB_MESSAGE), - emqx_retainer_mnesia:populate_index_meta(), - Config; -init_per_testcase(_TestCase, Config) -> Config. app_spec() -> @@ -310,7 +304,7 @@ t_message_expiry(Config) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). t_message_expiry_2(Config) -> ConfMod = fun(Conf) -> @@ -332,9 +326,9 @@ t_message_expiry_2(Config) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). -t_table_full(_) -> +t_table_full(Config) -> ConfMod = fun(Conf) -> Conf#{<<"backend">> => #{<<"max_retained_messages">> => <<"1">>}} end, @@ -351,7 +345,7 @@ t_table_full(_) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). t_clean(Config) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), @@ -446,7 +440,7 @@ t_flow_control(_) -> Diff = End - Begin, ?assert( - Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9), + Diff > timer:seconds(2.5) andalso Diff < timer:seconds(4), lists:flatten(io_lib:format("Diff is :~p~n", [Diff])) ), @@ -462,67 +456,7 @@ t_flow_control(_) -> }), ok. -t_cursor_cleanup(_) -> - Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"), - LimiterCfg = make_limiter_cfg(Rate), - JsonCfg = make_limiter_json(<<"1/1s">>), - emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), - emqx_retainer:update_config(#{ - <<"delivery_rate">> => <<"1/1s">>, - <<"flow_control">> => - #{ - <<"batch_read_number">> => 1, - <<"batch_deliver_number">> => 1, - <<"batch_deliver_limiter">> => JsonCfg - } - }), - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), - lists:foreach( - fun(I) -> - emqtt:publish( - C1, - <<"retained/", (integer_to_binary(I))/binary>>, - <<"this is a retained message">>, - [{qos, 0}, {retain, true}] - ) - end, - lists:seq(1, 5) - ), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), - - snabbkaffe:start_trace(), - - ?assertWaitEvent( - emqtt:disconnect(C1), - #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>}, - 2000 - ), - - QLCProcesses = lists:filter( - fun(Pid) -> - {current_function, {qlc, wait_for_request, 3}} =:= - erlang:process_info(Pid, current_function) - end, - erlang:processes() - ), - - ?assertEqual(0, length(QLCProcesses)), - - snabbkaffe:stop(), - - emqx_limiter_server:del_bucket(emqx_retainer, internal), - emqx_retainer:update_config(#{ - <<"flow_control">> => - #{ - <<"batch_read_number">> => 1, - <<"batch_deliver_number">> => 1 - } - }), - - ok. - -t_clear_expired(_) -> +t_clear_expired(Config) -> ConfMod = fun(Conf) -> Conf#{ <<"msg_clear_interval">> := <<"1s">>, @@ -558,9 +492,9 @@ t_clear_expired(_) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). -t_max_payload_size(_) -> +t_max_payload_size(Config) -> ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := <<"1kb">>} end, Case = fun() -> emqx_retainer:clean(), @@ -589,7 +523,7 @@ t_max_payload_size(_) -> ok = emqtt:disconnect(C1) end, - with_conf(ConfMod, Case). + with_conf(Config, ConfMod, Case). t_page_read(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), @@ -896,10 +830,11 @@ receive_messages(Count, Msgs) -> Msgs end. -with_conf(ConfMod, Case) -> +with_conf(CTConfig, ConfMod, Case) -> Conf = emqx:get_raw_config([retainer]), NewConf = ConfMod(Conf), emqx_retainer:update_config(NewConf), + ?config(index, CTConfig) =:= false andalso mria:clear_table(?TAB_INDEX_META), try Case(), {ok, _} = emqx_retainer:update_config(Conf)