feat(queue): identify agents by SessionId in tests

This commit is contained in:
Ilya Averyanov 2024-06-27 17:20:04 +03:00
parent 8f0d807c00
commit 61eda0ff31
4 changed files with 69 additions and 29 deletions

View File

@ -143,7 +143,7 @@ delete_group_subscription(State, _ShareTopicFilter) ->
State. State.
add_group_subscription( add_group_subscription(
#{groups := Groups0} = State0, ShareTopicFilter #{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => agent_add_group_subscription, msg => agent_add_group_subscription,
@ -152,8 +152,9 @@ add_group_subscription(
#share{group = Group} = ShareTopicFilter, #share{group = Group} = ShareTopicFilter,
Groups1 = Groups0#{ Groups1 = Groups0#{
Group => emqx_ds_shared_sub_group_sm:new(#{ Group => emqx_ds_shared_sub_group_sm:new(#{
session_id => SessionId,
topic_filter => ShareTopicFilter, topic_filter => ShareTopicFilter,
agent => this_agent(), agent => this_agent(SessionId),
send_after => send_to_subscription_after(Group) send_after => send_to_subscription_after(Group)
}) })
}, },
@ -172,11 +173,8 @@ fetch_stream_events(#{groups := Groups0} = State0) ->
State1 = State0#{groups => Groups1}, State1 = State0#{groups => Groups1},
{lists:concat(Events), State1}. {lists:concat(Events), State1}.
%%-------------------------------------------------------------------- this_agent(Id) ->
%% Internal functions emqx_ds_shared_sub_proto:agent(Id, self()).
%%--------------------------------------------------------------------
this_agent() -> self().
send_to_subscription_after(Group) -> send_to_subscription_after(Group) ->
fun(Time, Msg) -> fun(Time, Msg) ->

View File

@ -37,6 +37,7 @@
]). ]).
-type options() :: #{ -type options() :: #{
session_id := emqx_persistent_session_ds:id(),
agent := emqx_ds_shared_sub_proto:agent(), agent := emqx_ds_shared_sub_proto:agent(),
topic_filter := emqx_persistent_session_ds:share_topic_filter(), topic_filter := emqx_persistent_session_ds:share_topic_filter(),
send_after := fun((non_neg_integer(), term()) -> reference()) send_after := fun((non_neg_integer(), term()) -> reference())
@ -131,6 +132,7 @@
-spec new(options()) -> group_sm(). -spec new(options()) -> group_sm().
new(#{ new(#{
session_id := SessionId,
agent := Agent, agent := Agent,
topic_filter := ShareTopicFilter, topic_filter := ShareTopicFilter,
send_after := SendAfter send_after := SendAfter
@ -144,6 +146,7 @@ new(#{
} }
), ),
GSM0 = #{ GSM0 = #{
id => SessionId,
topic_filter => ShareTopicFilter, topic_filter => ShareTopicFilter,
agent => Agent, agent => Agent,
send_after => SendAfter send_after => SendAfter
@ -231,6 +234,7 @@ handle_updating(GSM0) ->
handle_leader_update_streams( handle_leader_update_streams(
#{ #{
id := Id,
state := ?replaying, state := ?replaying,
state_data := #{streams := Streams0, version := VersionOld} = StateData state_data := #{streams := Streams0, version := VersionOld} = StateData
} = GSM, } = GSM,
@ -238,6 +242,12 @@ handle_leader_update_streams(
VersionNew, VersionNew,
StreamProgresses StreamProgresses
) -> ) ->
?tp(warning, shared_sub_group_sm_leader_update_streams, #{
id => Id,
version_old => VersionOld,
version_new => VersionNew,
stream_progresses => emqx_ds_shared_sub_proto:format_streams(StreamProgresses)
}),
{AddEvents, Streams1} = lists:foldl( {AddEvents, Streams1} = lists:foldl(
fun(#{stream := Stream, iterator := It}, {AddEventAcc, StreamsAcc}) -> fun(#{stream := Stream, iterator := It}, {AddEventAcc, StreamsAcc}) ->
case maps:is_key(Stream, StreamsAcc) of case maps:is_key(Stream, StreamsAcc) of

View File

@ -270,12 +270,12 @@ renew_streams(#{start_time := StartTime, stream_progresses := Progresses, topic
Data3 = assign_streams(Data2), Data3 = assign_streams(Data2),
Data3. Data3.
%% We revoke streams from agents that have too many streams (> desired_streams_per_agent). %% We revoke streams from agents that have too many streams (> desired_stream_count_per_agent).
%% We revoke only from replaying agents. %% We revoke only from replaying agents.
%% After revoking, no unassigned streams appear. Streams will become unassigned %% After revoking, no unassigned streams appear. Streams will become unassigned
%% only after agents report them as acked and unsubscribed. %% only after agents report them as acked and unsubscribed.
revoke_streams(Data0) -> revoke_streams(Data0) ->
DesiredStreamsPerAgent = desired_streams_per_agent(Data0), DesiredStreamsPerAgent = desired_stream_count_per_agent(Data0),
Agents = replaying_agents(Data0), Agents = replaying_agents(Data0),
lists:foldl( lists:foldl(
fun(Agent, DataAcc) -> fun(Agent, DataAcc) ->
@ -326,10 +326,10 @@ select_streams_for_revoke(
%% * data locality (agents better preserve streams with data available on the agent's node) %% * data locality (agents better preserve streams with data available on the agent's node)
lists:sublist(shuffle(Streams), RevokeCount). lists:sublist(shuffle(Streams), RevokeCount).
%% We assign streams to agents that have too few streams (< desired_streams_per_agent). %% We assign streams to agents that have too few streams (< desired_stream_count_per_agent).
%% We assign only to replaying agents. %% We assign only to replaying agents.
assign_streams(Data0) -> assign_streams(Data0) ->
DesiredStreamsPerAgent = desired_streams_per_agent(Data0), DesiredStreamsPerAgent = desired_stream_count_per_agent(Data0),
Agents = replaying_agents(Data0), Agents = replaying_agents(Data0),
lists:foldl( lists:foldl(
fun(Agent, DataAcc) -> fun(Agent, DataAcc) ->
@ -384,8 +384,7 @@ connect_agent(
agent => Agent, agent => Agent,
group => Group group => Group
}), }),
DesiredCount = desired_streams_per_agent(Data), DesiredCount = desired_stream_count_for_new_agent(Data),
% DesiredCount = desired_streams_for_new_agent(Data),
assign_initial_streams_to_agent(Data, Agent, DesiredCount). assign_initial_streams_to_agent(Data, Agent, DesiredCount).
assign_initial_streams_to_agent(Data, Agent, AssignCount) -> assign_initial_streams_to_agent(Data, Agent, AssignCount) ->
@ -708,13 +707,13 @@ replaying_agents(#{agents := AgentStates}) ->
maps:to_list(AgentStates) maps:to_list(AgentStates)
). ).
desired_streams_per_agent(#{agents := AgentStates} = Data) -> desired_stream_count_per_agent(#{agents := AgentStates} = Data) ->
desired_streams_per_agent(Data, maps:size(AgentStates)). desired_stream_count_per_agent(Data, maps:size(AgentStates)).
desired_streams_for_new_agent(#{agents := AgentStates} = Data) -> desired_stream_count_for_new_agent(#{agents := AgentStates} = Data) ->
desired_streams_per_agent(Data, maps:size(AgentStates) + 1). desired_stream_count_per_agent(Data, maps:size(AgentStates) + 1).
desired_streams_per_agent(#{stream_progresses := StreamProgresses}, AgentCount) -> desired_stream_count_per_agent(#{stream_progresses := StreamProgresses}, AgentCount) ->
case AgentCount of case AgentCount of
0 -> 0 ->
0; 0;

View File

@ -23,7 +23,21 @@
leader_invalidate/2 leader_invalidate/2
]). ]).
-export([
format_streams/1,
agent/2
]).
-ifdef(TEST).
-record(agent, {
pid :: pid(),
id :: term()
}).
-type agent() :: #agent{}.
-else.
-type agent() :: pid(). -type agent() :: pid().
-endif.
-type leader() :: pid(). -type leader() :: pid().
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type group() :: emqx_types:group(). -type group() :: emqx_types:group().
@ -107,7 +121,7 @@ leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
version => Version version => Version
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent, agent_pid(ToAgent),
?leader_lease_streams(OfGroup, Leader, Streams, Version) ?leader_lease_streams(OfGroup, Leader, Streams, Version)
), ),
ok. ok.
@ -121,7 +135,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
version => Version version => Version
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent, agent_pid(ToAgent),
?leader_renew_stream_lease(OfGroup, Version) ?leader_renew_stream_lease(OfGroup, Version)
), ),
ok. ok.
@ -136,7 +150,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
version_new => VersionNew version_new => VersionNew
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent, agent_pid(ToAgent),
?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew) ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew)
), ),
ok. ok.
@ -152,7 +166,7 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
streams_new => format_streams(StreamsNew) streams_new => format_streams(StreamsNew)
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent, agent_pid(ToAgent),
?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew) ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew)
), ),
ok. ok.
@ -165,11 +179,36 @@ leader_invalidate(ToAgent, OfGroup) ->
of_group => OfGroup of_group => OfGroup
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent, agent_pid(ToAgent),
?leader_invalidate(OfGroup) ?leader_invalidate(OfGroup)
), ),
ok. ok.
%%--------------------------------------------------------------------
%% Internal API
%%--------------------------------------------------------------------
-ifdef(TEST).
agent(Id, Pid) ->
#agent{id = Id, pid = Pid}.
agent_pid(#agent{pid = Pid}) ->
Pid.
-else.
agent(_Id, Pid) ->
Pid.
agent_pid(Pid) ->
Pid.
-endif.
format_streams(Streams) ->
lists:map(
fun format_stream/1,
Streams
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helpers %% Helpers
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -177,11 +216,5 @@ leader_invalidate(ToAgent, OfGroup) ->
format_opaque(Opaque) -> format_opaque(Opaque) ->
erlang:phash2(Opaque). erlang:phash2(Opaque).
format_streams(Streams) ->
lists:map(
fun format_stream/1,
Streams
).
format_stream(#{stream := Stream, iterator := Iterator} = Value) -> format_stream(#{stream := Stream, iterator := Iterator} = Value) ->
Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}. Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}.