diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index f0d194dfc..510d6a45f 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -71,7 +71,7 @@ rank_progress := emqx_ds_shared_sub_leader_rank_progress:t(), %% - %% Ephimeral data, should not be persisted + %% Ephemeral data, should not be persisted %% agents := #{ emqx_ds_shared_sub_proto:agent() => agent_state() diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl index 5cde51f16..fa611463d 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl @@ -9,7 +9,8 @@ -export([ init/0, set_replayed/2, - add_streams/2 + add_streams/2, + replayed_up_to/2 ]). %% "shard" @@ -87,6 +88,15 @@ add_streams(StreamsWithRanks, State) -> SortedStreamsWithRanks ). +-spec replayed_up_to(emqx_ds:rank_x(), t()) -> emqx_ds:rank_y(). +replayed_up_to(RankX, State) -> + case State of + #{RankX := #{min_y := MinY}} -> + MinY; + _ -> + undefined + end. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -110,7 +120,7 @@ add_stream({{RankX, RankY}, Stream}, State0) -> update_min_y([{RankY, RankYStreams} | Rest] = Ys) -> case {has_unreplayed_streams(RankYStreams), Rest} of {true, _} -> - {RankY, maps:from_list(Ys)}; + {RankY - 1, maps:from_list(Ys)}; {false, []} -> {RankY - 1, #{}}; {false, _} -> @@ -122,3 +132,40 @@ has_unreplayed_streams(RankYStreams) -> fun(IsReplayed) -> not IsReplayed end, maps:values(RankYStreams) ). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +add_streams_set_replayed_test() -> + State0 = init(), + {_, State1} = add_streams( + [ + {{shard1, 1}, s111}, + {{shard1, 1}, s112}, + {{shard1, 2}, s121}, + {{shard1, 2}, s122}, + {{shard1, 3}, s131}, + {{shard1, 4}, s141}, + + {{shard3, 5}, s51} + ], + State0 + ), + ?assertEqual(0, replayed_up_to(shard1, State1)), + + State2 = set_replayed({{shard1, 1}, s111}, State1), + State3 = set_replayed({{shard1, 3}, s131}, State2), + ?assertEqual(0, replayed_up_to(shard1, State3)), + State4 = set_replayed({{shard1, 1}, s112}, State3), + ?assertEqual(1, replayed_up_to(shard1, State4)), + + State5 = set_replayed({{shard1, 2}, s121}, State4), + State6 = set_replayed({{shard1, 2}, s122}, State5), + + ?assertEqual(3, replayed_up_to(shard1, State6)), + + State7 = set_replayed({{shard1, 4}, s141}, State6), + ?assertEqual(3, replayed_up_to(shard1, State7)). + +%% -ifdef(TEST) end +-endif.