diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 90b4f40ca..e61bd13fb 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -176,14 +176,14 @@ clear_expired(_) -> clear_expired() -> NowMs = erlang:system_time(millisecond), S0 = ets_stream(?TAB_MESSAGE), - S1 = stream_filter( + S1 = emqx_utils_stream:filter( fun(#retained_message{expiry_time = ExpiryTime}) -> ExpiryTime =/= 0 andalso ExpiryTime < NowMs end, S0 ), DirtyWriteIndices = dirty_indices(write), - stream_foreach( + emqx_utils_stream:foreach( fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end, @@ -198,7 +198,7 @@ delete_message(_State, Topic) -> true -> S = search_stream(Tokens, 0), DirtyWriteIndices = dirty_indices(write), - stream_foreach( + emqx_utils_stream:foreach( fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end, S ) @@ -207,7 +207,7 @@ delete_message(_State, Topic) -> read_message(_State, Topic) -> {ok, read_messages(Topic)}. -match_messages(State, Topic, undefined) -> +match_messages(_State, Topic, undefined) -> Tokens = topic_to_tokens(Topic), Now = erlang:system_time(millisecond), S = msg_stream(search_stream(Tokens, Now)), @@ -358,7 +358,7 @@ search_stream(Index, FilterTokens, Now) -> fun(#retained_index{key = Key}) -> emqx_retainer_index:restore_topic(Key) end, IndexRecordStream ), - MatchingTopicStream = stream_filter( + MatchingTopicStream = emqx_utils_stream:filter( fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end, TopicStream ), @@ -366,7 +366,7 @@ search_stream(Index, FilterTokens, Now) -> fun(TopicTokens) -> ets:lookup(?TAB_MESSAGE, TopicTokens) end, MatchingTopicStream ), - FoundAndValidLookedUpRetainMsgStream = stream_filter( + FoundAndValidLookedUpRetainMsgStream = emqx_utils_stream:filter( fun ([#retained_message{expiry_time = ExpiryTime}]) -> ExpiryTime =:= 0 orelse ExpiryTime >= Now; @@ -708,33 +708,7 @@ are_indices_updated(Indices) -> ets_stream(Tab) -> emqx_utils_stream:ets( fun - (undefined) -> ets:match_object(Tab, '$1', 1); + (undefined) -> ets:match_object(Tab, '_', 1); (Cont) -> ets:match_object(Cont) end ). - -stream_foreach(F, S) -> - case emqx_utils_stream:next(S) of - [X | Rest] -> - F(X), - stream_foreach(F, Rest); - [] -> - ok - end. - -%% TODO: move to emqx_utils_stream -stream_filter(F, S) -> - FilterNext = fun FilterNext(St) -> - case emqx_utils_stream:next(St) of - [X | Rest] -> - case F(X) of - true -> - [X | stream_filter(F, Rest)]; - false -> - FilterNext(Rest) - end; - [] -> - [] - end - end, - fun() -> FilterNext(S) end. diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 8b6db8292..6ff43528b 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -23,6 +23,8 @@ const/1, mqueue/1, map/2, + filter/2, + foreach/2, transpose/1, chain/1, chain/2, @@ -102,6 +104,33 @@ map(F, S) -> end end. +%% @doc Make a stream by filtering the underlying stream with a predicate function. +filter(F, S) -> + FilterNext = fun FilterNext(St) -> + case emqx_utils_stream:next(St) of + [X | Rest] -> + case F(X) of + true -> + [X | filter(F, Rest)]; + false -> + FilterNext(Rest) + end; + [] -> + [] + end + end, + fun() -> FilterNext(S) end. + +%% @doc Consumes the stream and applies the given function to each element. +foreach(F, S) -> + case emqx_utils_stream:next(S) of + [X | Rest] -> + F(X), + foreach(F, Rest); + [] -> + ok + end. + %% @doc Transpose a list of streams into a stream producing lists of their respective values. %% The resulting stream is as long as the shortest of the input streams. -spec transpose([stream(X)]) -> stream([X]). @@ -243,7 +272,7 @@ consume(N, S, Acc) -> %% * `ets:match/1` / `ets:match/3` %% * `ets:match_object/1` / `ets:match_object/3` -spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record). -ets(ContF) -> +ets(ContF) when is_function(ContF) -> ets(undefined, ContF). ets(Cont, ContF) ->