From 61eda0ff31f9c1cc5d2ce96c68d75e030a17e5e4 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 27 Jun 2024 17:20:04 +0300 Subject: [PATCH] feat(queue): identify agents by SessionId in tests --- .../src/emqx_ds_shared_sub_agent.erl | 12 ++-- .../src/emqx_ds_shared_sub_group_sm.erl | 10 ++++ .../src/emqx_ds_shared_sub_leader.erl | 21 ++++--- .../src/emqx_ds_shared_sub_proto.erl | 55 +++++++++++++++---- 4 files changed, 69 insertions(+), 29 deletions(-) diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index 5e27f290a..11cb011d3 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -143,7 +143,7 @@ delete_group_subscription(State, _ShareTopicFilter) -> State. add_group_subscription( - #{groups := Groups0} = State0, ShareTopicFilter + #{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter ) -> ?SLOG(info, #{ msg => agent_add_group_subscription, @@ -152,8 +152,9 @@ add_group_subscription( #share{group = Group} = ShareTopicFilter, Groups1 = Groups0#{ Group => emqx_ds_shared_sub_group_sm:new(#{ + session_id => SessionId, topic_filter => ShareTopicFilter, - agent => this_agent(), + agent => this_agent(SessionId), send_after => send_to_subscription_after(Group) }) }, @@ -172,11 +173,8 @@ fetch_stream_events(#{groups := Groups0} = State0) -> State1 = State0#{groups => Groups1}, {lists:concat(Events), State1}. -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -this_agent() -> self(). +this_agent(Id) -> + emqx_ds_shared_sub_proto:agent(Id, self()). send_to_subscription_after(Group) -> fun(Time, Msg) -> diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index 1bf023e56..a16b2bcf9 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -37,6 +37,7 @@ ]). -type options() :: #{ + session_id := emqx_persistent_session_ds:id(), agent := emqx_ds_shared_sub_proto:agent(), topic_filter := emqx_persistent_session_ds:share_topic_filter(), send_after := fun((non_neg_integer(), term()) -> reference()) @@ -131,6 +132,7 @@ -spec new(options()) -> group_sm(). new(#{ + session_id := SessionId, agent := Agent, topic_filter := ShareTopicFilter, send_after := SendAfter @@ -144,6 +146,7 @@ new(#{ } ), GSM0 = #{ + id => SessionId, topic_filter => ShareTopicFilter, agent => Agent, send_after => SendAfter @@ -231,6 +234,7 @@ handle_updating(GSM0) -> handle_leader_update_streams( #{ + id := Id, state := ?replaying, state_data := #{streams := Streams0, version := VersionOld} = StateData } = GSM, @@ -238,6 +242,12 @@ handle_leader_update_streams( VersionNew, StreamProgresses ) -> + ?tp(warning, shared_sub_group_sm_leader_update_streams, #{ + id => Id, + version_old => VersionOld, + version_new => VersionNew, + stream_progresses => emqx_ds_shared_sub_proto:format_streams(StreamProgresses) + }), {AddEvents, Streams1} = lists:foldl( fun(#{stream := Stream, iterator := It}, {AddEventAcc, StreamsAcc}) -> case maps:is_key(Stream, StreamsAcc) of diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 64a74510a..e1437dc36 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -270,12 +270,12 @@ renew_streams(#{start_time := StartTime, stream_progresses := Progresses, topic Data3 = assign_streams(Data2), Data3. -%% We revoke streams from agents that have too many streams (> desired_streams_per_agent). +%% We revoke streams from agents that have too many streams (> desired_stream_count_per_agent). %% We revoke only from replaying agents. %% After revoking, no unassigned streams appear. Streams will become unassigned %% only after agents report them as acked and unsubscribed. revoke_streams(Data0) -> - DesiredStreamsPerAgent = desired_streams_per_agent(Data0), + DesiredStreamsPerAgent = desired_stream_count_per_agent(Data0), Agents = replaying_agents(Data0), lists:foldl( fun(Agent, DataAcc) -> @@ -326,10 +326,10 @@ select_streams_for_revoke( %% * data locality (agents better preserve streams with data available on the agent's node) lists:sublist(shuffle(Streams), RevokeCount). -%% We assign streams to agents that have too few streams (< desired_streams_per_agent). +%% We assign streams to agents that have too few streams (< desired_stream_count_per_agent). %% We assign only to replaying agents. assign_streams(Data0) -> - DesiredStreamsPerAgent = desired_streams_per_agent(Data0), + DesiredStreamsPerAgent = desired_stream_count_per_agent(Data0), Agents = replaying_agents(Data0), lists:foldl( fun(Agent, DataAcc) -> @@ -384,8 +384,7 @@ connect_agent( agent => Agent, group => Group }), - DesiredCount = desired_streams_per_agent(Data), - % DesiredCount = desired_streams_for_new_agent(Data), + DesiredCount = desired_stream_count_for_new_agent(Data), assign_initial_streams_to_agent(Data, Agent, DesiredCount). assign_initial_streams_to_agent(Data, Agent, AssignCount) -> @@ -708,13 +707,13 @@ replaying_agents(#{agents := AgentStates}) -> maps:to_list(AgentStates) ). -desired_streams_per_agent(#{agents := AgentStates} = Data) -> - desired_streams_per_agent(Data, maps:size(AgentStates)). +desired_stream_count_per_agent(#{agents := AgentStates} = Data) -> + desired_stream_count_per_agent(Data, maps:size(AgentStates)). -desired_streams_for_new_agent(#{agents := AgentStates} = Data) -> - desired_streams_per_agent(Data, maps:size(AgentStates) + 1). +desired_stream_count_for_new_agent(#{agents := AgentStates} = Data) -> + desired_stream_count_per_agent(Data, maps:size(AgentStates) + 1). -desired_streams_per_agent(#{stream_progresses := StreamProgresses}, AgentCount) -> +desired_stream_count_per_agent(#{stream_progresses := StreamProgresses}, AgentCount) -> case AgentCount of 0 -> 0; diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index 01f63aaad..d7d85b8f2 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -23,7 +23,21 @@ leader_invalidate/2 ]). +-export([ + format_streams/1, + agent/2 +]). + +-ifdef(TEST). +-record(agent, { + pid :: pid(), + id :: term() +}). +-type agent() :: #agent{}. +-else. -type agent() :: pid(). +-endif. + -type leader() :: pid(). -type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type group() :: emqx_types:group(). @@ -107,7 +121,7 @@ leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> version => Version }), _ = emqx_persistent_session_ds_shared_subs_agent:send( - ToAgent, + agent_pid(ToAgent), ?leader_lease_streams(OfGroup, Leader, Streams, Version) ), ok. @@ -121,7 +135,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) -> version => Version }), _ = emqx_persistent_session_ds_shared_subs_agent:send( - ToAgent, + agent_pid(ToAgent), ?leader_renew_stream_lease(OfGroup, Version) ), ok. @@ -136,7 +150,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> version_new => VersionNew }), _ = emqx_persistent_session_ds_shared_subs_agent:send( - ToAgent, + agent_pid(ToAgent), ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew) ), ok. @@ -152,7 +166,7 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> streams_new => format_streams(StreamsNew) }), _ = emqx_persistent_session_ds_shared_subs_agent:send( - ToAgent, + agent_pid(ToAgent), ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew) ), ok. @@ -165,11 +179,36 @@ leader_invalidate(ToAgent, OfGroup) -> of_group => OfGroup }), _ = emqx_persistent_session_ds_shared_subs_agent:send( - ToAgent, + agent_pid(ToAgent), ?leader_invalidate(OfGroup) ), ok. +%%-------------------------------------------------------------------- +%% Internal API +%%-------------------------------------------------------------------- + +-ifdef(TEST). +agent(Id, Pid) -> + #agent{id = Id, pid = Pid}. + +agent_pid(#agent{pid = Pid}) -> + Pid. + +-else. +agent(_Id, Pid) -> + Pid. + +agent_pid(Pid) -> + Pid. +-endif. + +format_streams(Streams) -> + lists:map( + fun format_stream/1, + Streams + ). + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- @@ -177,11 +216,5 @@ leader_invalidate(ToAgent, OfGroup) -> format_opaque(Opaque) -> erlang:phash2(Opaque). -format_streams(Streams) -> - lists:map( - fun format_stream/1, - Streams - ). - format_stream(#{stream := Stream, iterator := Iterator} = Value) -> Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}.