feat(queue): implement resubscribe test

This commit is contained in:
Ilya Averyanov 2024-07-03 18:14:16 +03:00
parent fada2a3fea
commit e5547005eb
2 changed files with 84 additions and 22 deletions

View File

@ -390,19 +390,25 @@ select_streams_for_assign(Data0, _Agent, AssignCount) ->
%% Handle a newly connected agent %% Handle a newly connected agent
connect_agent( connect_agent(
#{group := Group} = Data, #{group := Group, agents := Agents} = Data,
Agent, Agent,
AgentMetadata AgentMetadata
) -> ) ->
%% TODO
%% implement graceful reconnection of the same agent
?SLOG(info, #{ ?SLOG(info, #{
msg => leader_agent_connected, msg => leader_agent_connected,
agent => Agent, agent => Agent,
group => Group group => Group
}), }),
DesiredCount = desired_stream_count_for_new_agent(Data), case Agents of
assign_initial_streams_to_agent(Data, Agent, AgentMetadata, DesiredCount). #{Agent := AgentState} ->
?tp(warning, shared_sub_leader_agent_already_connected, #{
agent => Agent
}),
reconnect_agent(Data, Agent, AgentMetadata, AgentState);
_ ->
DesiredCount = desired_stream_count_for_new_agent(Data),
assign_initial_streams_to_agent(Data, Agent, AgentMetadata, DesiredCount)
end.
assign_initial_streams_to_agent(Data, Agent, AgentMetadata, AssignCount) -> assign_initial_streams_to_agent(Data, Agent, AgentMetadata, AssignCount) ->
InitialStreamsToAssign = select_streams_for_assign(Data, Agent, AssignCount), InitialStreamsToAssign = select_streams_for_assign(Data, Agent, AssignCount),
@ -412,6 +418,30 @@ assign_initial_streams_to_agent(Data, Agent, AgentMetadata, AssignCount) ->
), ),
set_agent_state(Data1, Agent, AgentState). set_agent_state(Data1, Agent, AgentState).
reconnect_agent(
Data0,
Agent,
AgentMetadata,
#{streams := OldStreams, revoked_streams := OldRevokedStreams} = _OldAgentState
) ->
?tp(warning, shared_sub_leader_agent_reconnect, #{
agent => Agent,
agent_metadata => AgentMetadata,
inherited_streams => OldStreams
}),
AgentState = agent_transition_to_initial_waiting_replaying(
Data0, Agent, AgentMetadata, OldStreams
),
Data1 = set_agent_state(Data0, Agent, AgentState),
%% If client reconnected gracefully then it either had already sent all the final progresses
%% for the revoked streams (so `OldRevokedStreams` should be empty) or it had not started
%% to replay them (if we revoked streams after it desided to reconnect). So we can safely
%% unassign them.
%%
%% If client reconnects after a crash, then we wouldn't be here (the agent identity will be new).
Data2 = unassign_streams(Data1, OldRevokedStreams),
Data2.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Disconnect agent gracefully %% Disconnect agent gracefully

View File

@ -223,15 +223,8 @@ t_intensive_reassign(_Config) ->
{Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
?assertEqual( ?assertEqual([], Missing),
[], ?assertEqual([], Duplicate),
Missing
),
?assertEqual(
[],
Duplicate
),
ok = emqtt:disconnect(ConnShared1), ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnShared2), ok = emqtt:disconnect(ConnShared2),
@ -276,15 +269,54 @@ t_unsubscribe(_Config) ->
{Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
?assertEqual( ?assertEqual([], Missing),
[], ?assertEqual([], Duplicate),
Missing
),
?assertEqual( ok = emqtt:disconnect(ConnShared1),
[], ok = emqtt:disconnect(ConnShared2),
Duplicate ok = emqtt:disconnect(ConnPub).
),
t_quick_resubscribe(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
ConnShared1 = emqtt_connect_sub(<<"client_shared1">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr10/topic10/#">>, 1),
ct:sleep(1000),
NPubs = 10_000,
Topics = [<<"topic10/1">>, <<"topic10/2">>, <<"topic10/3">>],
ok = publish_n(ConnPub, Topics, 1, NPubs),
Self = self(),
_ = spawn_link(fun() ->
ok = publish_n(ConnPub, Topics, NPubs + 1, 2 * NPubs),
Self ! publish_done
end),
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr10/topic10/#">>, 1),
{ok, _, _} = emqtt:unsubscribe(ConnShared1, <<"$share/gr10/topic10/#">>),
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr10/topic10/#">>, 1),
receive
publish_done -> ok
end,
Pubs = drain_publishes(),
ClientByBid = fun(Pid) ->
case Pid of
ConnShared1 -> <<"client_shared1">>;
ConnShared2 -> <<"client_shared2">>
end
end,
{Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid),
?assertEqual([], Missing),
?assertEqual([], Duplicate),
ok = emqtt:disconnect(ConnShared1), ok = emqtt:disconnect(ConnShared1),
ok = emqtt:disconnect(ConnShared2), ok = emqtt:disconnect(ConnShared2),