chore(retainer): move filter/foreach to emqx_utils_stream

This commit is contained in:
Ilya Averyanov 2024-05-13 11:51:20 +03:00
parent 0b39aaadbd
commit 552b62236c
2 changed files with 37 additions and 34 deletions

View File

@ -176,14 +176,14 @@ clear_expired(_) ->
clear_expired() ->
NowMs = erlang:system_time(millisecond),
S0 = ets_stream(?TAB_MESSAGE),
S1 = stream_filter(
S1 = emqx_utils_stream:filter(
fun(#retained_message{expiry_time = ExpiryTime}) ->
ExpiryTime =/= 0 andalso ExpiryTime < NowMs
end,
S0
),
DirtyWriteIndices = dirty_indices(write),
stream_foreach(
emqx_utils_stream:foreach(
fun(RetainedMsg) ->
delete_message_with_indices(RetainedMsg, DirtyWriteIndices)
end,
@ -198,7 +198,7 @@ delete_message(_State, Topic) ->
true ->
S = search_stream(Tokens, 0),
DirtyWriteIndices = dirty_indices(write),
stream_foreach(
emqx_utils_stream:foreach(
fun(RetainedMsg) -> delete_message_with_indices(RetainedMsg, DirtyWriteIndices) end,
S
)
@ -207,7 +207,7 @@ delete_message(_State, Topic) ->
read_message(_State, Topic) ->
{ok, read_messages(Topic)}.
match_messages(State, Topic, undefined) ->
match_messages(_State, Topic, undefined) ->
Tokens = topic_to_tokens(Topic),
Now = erlang:system_time(millisecond),
S = msg_stream(search_stream(Tokens, Now)),
@ -358,7 +358,7 @@ search_stream(Index, FilterTokens, Now) ->
fun(#retained_index{key = Key}) -> emqx_retainer_index:restore_topic(Key) end,
IndexRecordStream
),
MatchingTopicStream = stream_filter(
MatchingTopicStream = emqx_utils_stream:filter(
fun(TopicTokens) -> match(IsExactMs, TopicTokens, FilterTokens) end,
TopicStream
),
@ -366,7 +366,7 @@ search_stream(Index, FilterTokens, Now) ->
fun(TopicTokens) -> ets:lookup(?TAB_MESSAGE, TopicTokens) end,
MatchingTopicStream
),
FoundAndValidLookedUpRetainMsgStream = stream_filter(
FoundAndValidLookedUpRetainMsgStream = emqx_utils_stream:filter(
fun
([#retained_message{expiry_time = ExpiryTime}]) ->
ExpiryTime =:= 0 orelse ExpiryTime >= Now;
@ -708,33 +708,7 @@ are_indices_updated(Indices) ->
ets_stream(Tab) ->
emqx_utils_stream:ets(
fun
(undefined) -> ets:match_object(Tab, '$1', 1);
(undefined) -> ets:match_object(Tab, '_', 1);
(Cont) -> ets:match_object(Cont)
end
).
stream_foreach(F, S) ->
case emqx_utils_stream:next(S) of
[X | Rest] ->
F(X),
stream_foreach(F, Rest);
[] ->
ok
end.
%% TODO: move to emqx_utils_stream
stream_filter(F, S) ->
FilterNext = fun FilterNext(St) ->
case emqx_utils_stream:next(St) of
[X | Rest] ->
case F(X) of
true ->
[X | stream_filter(F, Rest)];
false ->
FilterNext(Rest)
end;
[] ->
[]
end
end,
fun() -> FilterNext(S) end.

View File

@ -23,6 +23,8 @@
const/1,
mqueue/1,
map/2,
filter/2,
foreach/2,
transpose/1,
chain/1,
chain/2,
@ -102,6 +104,33 @@ map(F, S) ->
end
end.
%% @doc Make a stream by filtering the underlying stream with a predicate function.
filter(F, S) ->
FilterNext = fun FilterNext(St) ->
case emqx_utils_stream:next(St) of
[X | Rest] ->
case F(X) of
true ->
[X | filter(F, Rest)];
false ->
FilterNext(Rest)
end;
[] ->
[]
end
end,
fun() -> FilterNext(S) end.
%% @doc Consumes the stream and applies the given function to each element.
foreach(F, S) ->
case emqx_utils_stream:next(S) of
[X | Rest] ->
F(X),
foreach(F, Rest);
[] ->
ok
end.
%% @doc Transpose a list of streams into a stream producing lists of their respective values.
%% The resulting stream is as long as the shortest of the input streams.
-spec transpose([stream(X)]) -> stream([X]).
@ -243,7 +272,7 @@ consume(N, S, Acc) ->
%% * `ets:match/1` / `ets:match/3`
%% * `ets:match_object/1` / `ets:match_object/3`
-spec ets(fun((Cont) -> select_result(Record, Cont))) -> stream(Record).
ets(ContF) ->
ets(ContF) when is_function(ContF) ->
ets(undefined, ContF).
ets(Cont, ContF) ->