From e5547005eb1cca333d5903cdbcda764e5befdbba Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 3 Jul 2024 18:14:16 +0300 Subject: [PATCH] feat(queue): implement resubscribe test --- .../src/emqx_ds_shared_sub_leader.erl | 40 +++++++++-- .../test/emqx_ds_shared_sub_SUITE.erl | 66 ++++++++++++++----- 2 files changed, 84 insertions(+), 22 deletions(-) diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index ce38a72f9..8f6b7c683 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -390,19 +390,25 @@ select_streams_for_assign(Data0, _Agent, AssignCount) -> %% Handle a newly connected agent connect_agent( - #{group := Group} = Data, + #{group := Group, agents := Agents} = Data, Agent, AgentMetadata ) -> - %% TODO - %% implement graceful reconnection of the same agent ?SLOG(info, #{ msg => leader_agent_connected, agent => Agent, group => Group }), - DesiredCount = desired_stream_count_for_new_agent(Data), - assign_initial_streams_to_agent(Data, Agent, AgentMetadata, DesiredCount). + case Agents of + #{Agent := AgentState} -> + ?tp(warning, shared_sub_leader_agent_already_connected, #{ + agent => Agent + }), + reconnect_agent(Data, Agent, AgentMetadata, AgentState); + _ -> + DesiredCount = desired_stream_count_for_new_agent(Data), + assign_initial_streams_to_agent(Data, Agent, AgentMetadata, DesiredCount) + end. assign_initial_streams_to_agent(Data, Agent, AgentMetadata, AssignCount) -> InitialStreamsToAssign = select_streams_for_assign(Data, Agent, AssignCount), @@ -412,6 +418,30 @@ assign_initial_streams_to_agent(Data, Agent, AgentMetadata, AssignCount) -> ), set_agent_state(Data1, Agent, AgentState). +reconnect_agent( + Data0, + Agent, + AgentMetadata, + #{streams := OldStreams, revoked_streams := OldRevokedStreams} = _OldAgentState +) -> + ?tp(warning, shared_sub_leader_agent_reconnect, #{ + agent => Agent, + agent_metadata => AgentMetadata, + inherited_streams => OldStreams + }), + AgentState = agent_transition_to_initial_waiting_replaying( + Data0, Agent, AgentMetadata, OldStreams + ), + Data1 = set_agent_state(Data0, Agent, AgentState), + %% If client reconnected gracefully then it either had already sent all the final progresses + %% for the revoked streams (so `OldRevokedStreams` should be empty) or it had not started + %% to replay them (if we revoked streams after it desided to reconnect). So we can safely + %% unassign them. + %% + %% If client reconnects after a crash, then we wouldn't be here (the agent identity will be new). + Data2 = unassign_streams(Data1, OldRevokedStreams), + Data2. + %%-------------------------------------------------------------------- %% Disconnect agent gracefully diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index defc90c78..4c2e9a239 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -223,15 +223,8 @@ t_intensive_reassign(_Config) -> {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), - ?assertEqual( - [], - Missing - ), - - ?assertEqual( - [], - Duplicate - ), + ?assertEqual([], Missing), + ?assertEqual([], Duplicate), ok = emqtt:disconnect(ConnShared1), ok = emqtt:disconnect(ConnShared2), @@ -276,15 +269,54 @@ t_unsubscribe(_Config) -> {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), - ?assertEqual( - [], - Missing - ), + ?assertEqual([], Missing), + ?assertEqual([], Duplicate), - ?assertEqual( - [], - Duplicate - ), + ok = emqtt:disconnect(ConnShared1), + ok = emqtt:disconnect(ConnShared2), + ok = emqtt:disconnect(ConnPub). + +t_quick_resubscribe(_Config) -> + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + ConnShared1 = emqtt_connect_sub(<<"client_shared1">>), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr10/topic10/#">>, 1), + + ct:sleep(1000), + + NPubs = 10_000, + + Topics = [<<"topic10/1">>, <<"topic10/2">>, <<"topic10/3">>], + ok = publish_n(ConnPub, Topics, 1, NPubs), + + Self = self(), + _ = spawn_link(fun() -> + ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs), + Self ! publish_done + end), + + ConnShared2 = emqtt_connect_sub(<<"client_shared2">>), + {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr10/topic10/#">>, 1), + {ok, _, _} = emqtt:unsubscribe(ConnShared1, <<"$share/gr10/topic10/#">>), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr10/topic10/#">>, 1), + + receive + publish_done -> ok + end, + + Pubs = drain_publishes(), + + ClientByBid = fun(Pid) -> + case Pid of + ConnShared1 -> <<"client_shared1">>; + ConnShared2 -> <<"client_shared2">> + end + end, + + {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), + + ?assertEqual([], Missing), + ?assertEqual([], Duplicate), ok = emqtt:disconnect(ConnShared1), ok = emqtt:disconnect(ConnShared2),