diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 616414112..bf0798e1a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -152,7 +152,7 @@ to_map(_S, _SharedSubS) -> stream_progresses(S) -> fold_shared_stream_states( fun(TopicFilter, Stream, SRS, Acc) -> - #srs{it_begin = BeginIt} = SRS, + #srs{it_end = EndIt} = SRS, case is_stream_fully_acked(S, SRS) of true -> @@ -161,7 +161,7 @@ stream_progresses(S) -> StreamProgress = #{ topic_filter => TopicFilter, stream => Stream, - iterator => BeginIt, + iterator => EndIt, use_finished => is_use_finished(S, SRS) }, [StreamProgress | Acc]; diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index d563c0115..ecd06846c 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -540,7 +540,7 @@ update_stream_progresses( }. clean_revoked_streams( - Data0, #{revoked_streams := RevokedStreams0} = AgentState0, ReceivedStreamProgresses + Data0, _Agent, #{revoked_streams := RevokedStreams0} = AgentState0, ReceivedStreamProgresses ) -> FinishedReportedStreams = maps:from_list( lists:filtermap( @@ -569,13 +569,7 @@ clean_revoked_streams( {AgentState1, Data1}. unassign_streams(#{stream_owners := StreamOwners0} = Data, Streams) -> - StreamOwners1 = lists:foldl( - fun(Stream, StreamOwnersAcc) -> - maps:remove(Stream, StreamOwnersAcc) - end, - StreamOwners0, - Streams - ), + StreamOwners1 = maps:without(Streams, StreamOwners0), Data#{ stream_owners => StreamOwners1 }. @@ -591,7 +585,9 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers %% Client started updating Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), AgentState1 = update_agent_timeout(AgentState0), - {AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses), + {AgentState2, Data2} = clean_revoked_streams( + Data1, Agent, AgentState1, AgentStreamProgresses + ), AgentState3 = case AgentState2 of #{revoked_streams := []} -> @@ -603,7 +599,9 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers {?updating, AgentPrevVersion, AgentVersion} -> Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), AgentState1 = update_agent_timeout(AgentState0), - {AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses), + {AgentState2, Data2} = clean_revoked_streams( + Data1, Agent, AgentState1, AgentStreamProgresses + ), AgentState3 = case AgentState2 of #{revoked_streams := []} -> diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index e9d83d4fb..3e80b44a9 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -10,7 +10,6 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/asserts.hrl"). all() -> @@ -184,6 +183,89 @@ t_graceful_disconnect(_Config) -> ok = emqtt:disconnect(ConnShared2), ok = emqtt:disconnect(ConnPub). +t_intensive_reassign(_Config) -> + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + ConnShared1 = emqtt_connect_sub(<<"client_shared1">>), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr8/topic8/#">>, 1), + + ct:sleep(1000), + + NPubs = 10_000, + + Topics = [<<"topic8/1">>, <<"topic8/2">>, <<"topic8/3">>], + ok = publish_n(ConnPub, Topics, 1, NPubs), + + Self = self(), + _ = spawn_link(fun() -> + ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs), + Self ! publish_done + end), + + ConnShared2 = emqtt_connect_sub(<<"client_shared2">>), + ConnShared3 = emqtt_connect_sub(<<"client_shared3">>), + {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr8/topic8/#">>, 1), + {ok, _, _} = emqtt:subscribe(ConnShared3, <<"$share/gr8/topic8/#">>, 1), + + receive + publish_done -> ok + end, + + Pubs = drain_publishes(), + + ClientByBid = fun(Pid) -> + case Pid of + ConnShared1 -> <<"client_shared1">>; + ConnShared2 -> <<"client_shared2">>; + ConnShared3 -> <<"client_shared3">> + end + end, + + Messages = lists:foldl( + fun(#{payload := Payload, client_pid := Pid}, Acc) -> + maps:update_with( + binary_to_integer(Payload), + fun(Clients) -> + [ClientByBid(Pid) | Clients] + end, + [ClientByBid(Pid)], + Acc + ) + end, + #{}, + Pubs + ), + + Missing = lists:filter( + fun(N) -> not maps:is_key(N, Messages) end, + lists:seq(1, 2 * NPubs) + ), + Duplicate = lists:filtermap( + fun(N) -> + case Messages of + #{N := [_]} -> false; + #{N := [_ | _] = Clients} -> {true, {N, Clients}}; + _ -> false + end + end, + lists:seq(1, 2 * NPubs) + ), + + ?assertEqual( + [], + Missing + ), + + ?assertEqual( + [], + Duplicate + ), + + ok = emqtt:disconnect(ConnShared1), + ok = emqtt:disconnect(ConnShared2), + ok = emqtt:disconnect(ConnShared3), + ok = emqtt:disconnect(ConnPub). + t_lease_reconnect(_Config) -> ConnPub = emqtt_connect_pub(<<"client_pub">>), @@ -265,3 +347,20 @@ terminate_leaders() -> ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup), {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup), ok. + +publish_n(_Conn, _Topics, From, To) when From > To -> + ok; +publish_n(Conn, [Topic | RestTopics], From, To) -> + {ok, _} = emqtt:publish(Conn, Topic, integer_to_binary(From), 1), + publish_n(Conn, RestTopics ++ [Topic], From + 1, To). + +drain_publishes() -> + drain_publishes([]). + +drain_publishes(Acc) -> + receive + {publish, Msg} -> + drain_publishes([Msg | Acc]) + after 5_000 -> + lists:reverse(Acc) + end.