feat(queue): add leader_rank_progress test

This commit is contained in:
Ilya Averyanov 2024-07-04 20:51:53 +03:00
parent 91dd1183ad
commit 1496f7f778
2 changed files with 50 additions and 3 deletions

View File

@ -71,7 +71,7 @@
rank_progress := emqx_ds_shared_sub_leader_rank_progress:t(),
%%
%% Ephimeral data, should not be persisted
%% Ephemeral data, should not be persisted
%%
agents := #{
emqx_ds_shared_sub_proto:agent() => agent_state()

View File

@ -9,7 +9,8 @@
-export([
init/0,
set_replayed/2,
add_streams/2
add_streams/2,
replayed_up_to/2
]).
%% "shard"
@ -87,6 +88,15 @@ add_streams(StreamsWithRanks, State) ->
SortedStreamsWithRanks
).
-spec replayed_up_to(emqx_ds:rank_x(), t()) -> emqx_ds:rank_y().
replayed_up_to(RankX, State) ->
case State of
#{RankX := #{min_y := MinY}} ->
MinY;
_ ->
undefined
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@ -110,7 +120,7 @@ add_stream({{RankX, RankY}, Stream}, State0) ->
update_min_y([{RankY, RankYStreams} | Rest] = Ys) ->
case {has_unreplayed_streams(RankYStreams), Rest} of
{true, _} ->
{RankY, maps:from_list(Ys)};
{RankY - 1, maps:from_list(Ys)};
{false, []} ->
{RankY - 1, #{}};
{false, _} ->
@ -122,3 +132,40 @@ has_unreplayed_streams(RankYStreams) ->
fun(IsReplayed) -> not IsReplayed end,
maps:values(RankYStreams)
).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
add_streams_set_replayed_test() ->
State0 = init(),
{_, State1} = add_streams(
[
{{shard1, 1}, s111},
{{shard1, 1}, s112},
{{shard1, 2}, s121},
{{shard1, 2}, s122},
{{shard1, 3}, s131},
{{shard1, 4}, s141},
{{shard3, 5}, s51}
],
State0
),
?assertEqual(0, replayed_up_to(shard1, State1)),
State2 = set_replayed({{shard1, 1}, s111}, State1),
State3 = set_replayed({{shard1, 3}, s131}, State2),
?assertEqual(0, replayed_up_to(shard1, State3)),
State4 = set_replayed({{shard1, 1}, s112}, State3),
?assertEqual(1, replayed_up_to(shard1, State4)),
State5 = set_replayed({{shard1, 2}, s121}, State4),
State6 = set_replayed({{shard1, 2}, s122}, State5),
?assertEqual(3, replayed_up_to(shard1, State6)),
State7 = set_replayed({{shard1, 4}, s141}, State6),
?assertEqual(3, replayed_up_to(shard1, State7)).
%% -ifdef(TEST) end
-endif.