feat(queue): send metadata with agent when connecting to leader

It will be used to attach agent taints to improve stream assignment.
This commit is contained in:
Ilya Averyanov 2024-06-27 18:49:47 +03:00
parent 49bff5c08a
commit 1d728a05b2
6 changed files with 57 additions and 29 deletions

View File

@ -177,7 +177,7 @@ fetch_stream_events(
%% Connecting state %% Connecting state
handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) -> handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) ->
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter), ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM), ShareTopicFilter),
ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT). ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT).
handle_leader_lease_streams( handle_leader_lease_streams(
@ -201,7 +201,7 @@ handle_leader_lease_streams(GSM, _Leader, _StreamProgresses, _Version) ->
GSM. GSM.
handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) -> handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) ->
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter), ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), TopicFilter),
GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT), GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT),
GSM1. GSM1.
@ -444,6 +444,9 @@ transition(GSM0, NewState, NewStateData, LeaseEvents) ->
}, },
run_enter_callback(GSM2). run_enter_callback(GSM2).
agent_metadata(#{id := Id} = _GSM) ->
#{id => Id}.
ensure_state_timeout(GSM0, Name, Delay) -> ensure_state_timeout(GSM0, Name, Delay) ->
ensure_state_timeout(GSM0, Name, Delay, Name). ensure_state_timeout(GSM0, Name, Delay, Name).

View File

@ -43,6 +43,7 @@
state := emqx_ds_shared_sub_agent:status(), state := emqx_ds_shared_sub_agent:status(),
prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()), prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()),
version := emqx_ds_shared_sub_proto:version(), version := emqx_ds_shared_sub_proto:version(),
agent_metadata := emqx_ds_shared_sub_proto:agent_metadata(),
streams := list(emqx_ds:stream()), streams := list(emqx_ds:stream()),
revoked_streams := list(emqx_ds:stream()) revoked_streams := list(emqx_ds:stream())
}. }.
@ -182,9 +183,11 @@ handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0)
{keep_state, Data1, {{timeout, #drop_timeout{}}, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}}; {keep_state, Data1, {{timeout, #drop_timeout{}}, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}};
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% agent events %% agent events
handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?leader_active, Data0) -> handle_event(
info, ?agent_connect_leader_match(Agent, AgentMetadata, _TopicFilter), ?leader_active, Data0
) ->
% ?tp(warning, shared_sub_leader_connect_agent, #{agent => Agent}), % ?tp(warning, shared_sub_leader_connect_agent, #{agent => Agent}),
Data1 = connect_agent(Data0, Agent), Data1 = connect_agent(Data0, Agent, AgentMetadata),
{keep_state, Data1}; {keep_state, Data1};
handle_event( handle_event(
info, info,
@ -375,7 +378,8 @@ select_streams_for_assign(Data0, _Agent, AssignCount) ->
connect_agent( connect_agent(
#{group := Group} = Data, #{group := Group} = Data,
Agent Agent,
AgentMetadata
) -> ) ->
%% TODO %% TODO
%% implement graceful reconnection of the same agent %% implement graceful reconnection of the same agent
@ -385,13 +389,13 @@ connect_agent(
group => Group group => Group
}), }),
DesiredCount = desired_stream_count_for_new_agent(Data), DesiredCount = desired_stream_count_for_new_agent(Data),
assign_initial_streams_to_agent(Data, Agent, DesiredCount). assign_initial_streams_to_agent(Data, Agent, AgentMetadata, DesiredCount).
assign_initial_streams_to_agent(Data, Agent, AssignCount) -> assign_initial_streams_to_agent(Data, Agent, AgentMetadata, AssignCount) ->
InitialStreamsToAssign = select_streams_for_assign(Data, Agent, AssignCount), InitialStreamsToAssign = select_streams_for_assign(Data, Agent, AssignCount),
Data1 = set_stream_ownership_to_agent(Data, Agent, InitialStreamsToAssign), Data1 = set_stream_ownership_to_agent(Data, Agent, InitialStreamsToAssign),
AgentState = agent_transition_to_initial_waiting_replaying( AgentState = agent_transition_to_initial_waiting_replaying(
Data1, Agent, InitialStreamsToAssign Data1, Agent, AgentMetadata, InitialStreamsToAssign
), ),
set_agent_state(Data1, Agent, AgentState). set_agent_state(Data1, Agent, AgentState).
@ -639,7 +643,7 @@ agent_transition_to_waiting_replaying(
}. }.
agent_transition_to_initial_waiting_replaying( agent_transition_to_initial_waiting_replaying(
#{group := Group} = Data, Agent, InitialStreams #{group := Group} = Data, Agent, AgentMetadata, InitialStreams
) -> ) ->
?tp(warning, shared_sub_leader_agent_state_transition, #{ ?tp(warning, shared_sub_leader_agent_state_transition, #{
agent => Agent, agent => Agent,
@ -653,6 +657,7 @@ agent_transition_to_initial_waiting_replaying(
Agent, Group, Leader, StreamProgresses, Version Agent, Group, Leader, StreamProgresses, Version
), ),
#{ #{
metadata => AgentMetadata,
state => ?waiting_replaying, state => ?waiting_replaying,
version => Version, version => Version,
prev_version => undefined, prev_version => undefined,

View File

@ -13,7 +13,7 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([ -export([
agent_connect_leader/3, agent_connect_leader/4,
agent_update_stream_states/4, agent_update_stream_states/4,
agent_update_stream_states/5, agent_update_stream_states/5,
@ -34,6 +34,9 @@
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type group() :: emqx_types:group(). -type group() :: emqx_types:group().
-type version() :: non_neg_integer(). -type version() :: non_neg_integer().
-type agent_metadata() :: #{
id := emqx_persistent_session_ds:id()
}.
-type stream_progress() :: #{ -type stream_progress() :: #{
stream := emqx_ds:stream(), stream := emqx_ds:stream(),
@ -51,7 +54,9 @@
leader/0, leader/0,
group/0, group/0,
version/0, version/0,
stream_progress/0 stream_progress/0,
agent_stream_progress/0,
agent_metadata/0
]). ]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -60,19 +65,22 @@
%% agent -> leader messages %% agent -> leader messages
-spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok. -spec agent_connect_leader(leader(), agent(), agent_metadata(), topic_filter()) -> ok.
agent_connect_leader(ToLeader, FromAgent, TopicFilter) when ?is_local_leader(ToLeader) -> agent_connect_leader(ToLeader, FromAgent, AgentMetadata, TopicFilter) when
?is_local_leader(ToLeader)
->
?tp(warning, shared_sub_proto_msg, #{ ?tp(warning, shared_sub_proto_msg, #{
type => agent_connect_leader, type => agent_connect_leader,
to_leader => ToLeader, to_leader => ToLeader,
from_agent => FromAgent, from_agent => FromAgent,
agent_metadata => AgentMetadata,
topic_filter => TopicFilter topic_filter => TopicFilter
}), }),
_ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)), _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, TopicFilter)),
ok; ok;
agent_connect_leader(ToLeader, FromAgent, TopicFilter) -> agent_connect_leader(ToLeader, FromAgent, AgentMetadata, TopicFilter) ->
emqx_ds_shared_sub_proto_v1:agent_connect_leader( emqx_ds_shared_sub_proto_v1:agent_connect_leader(
?leader_node(ToLeader), ToLeader, FromAgent, TopicFilter ?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, TopicFilter
). ).
-spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok. -spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok.

View File

@ -20,15 +20,17 @@
%% Agent messages sent to the leader. %% Agent messages sent to the leader.
%% Leader talks to many agents, `agent` field is used to identify the sender. %% Leader talks to many agents, `agent` field is used to identify the sender.
-define(agent_connect_leader(Agent, TopicFilter), #{ -define(agent_connect_leader(Agent, AgentMetadata, TopicFilter), #{
type => ?agent_connect_leader_msg, type => ?agent_connect_leader_msg,
topic_filter => TopicFilter, topic_filter => TopicFilter,
agent_metadata => AgentMetadata,
agent => Agent agent => Agent
}). }).
-define(agent_connect_leader_match(Agent, TopicFilter), #{ -define(agent_connect_leader_match(Agent, AgentMetadata, TopicFilter), #{
type := ?agent_connect_leader_msg, type := ?agent_connect_leader_msg,
topic_filter := TopicFilter, topic_filter := TopicFilter,
agent_metadata := AgentMetadata,
agent := Agent agent := Agent
}). }).

View File

@ -20,11 +20,12 @@
]). ]).
-export([ -export([
lookup_leader/2 lookup_leader/3
]). ]).
-record(lookup_leader, { -record(lookup_leader, {
agent :: emqx_ds_shared_sub_proto:agent(), agent :: emqx_ds_shared_sub_proto:agent(),
agent_metadata :: emqx_ds_shared_sub_proto:agent_metadata(),
topic_filter :: emqx_persistent_session_ds:share_topic_filter() topic_filter :: emqx_persistent_session_ds:share_topic_filter()
}). }).
@ -35,10 +36,14 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec lookup_leader( -spec lookup_leader(
emqx_ds_shared_sub_proto:agent(), emqx_persistent_session_ds:share_topic_filter() emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:agent_metadata(),
emqx_persistent_session_ds:share_topic_filter()
) -> ok. ) -> ok.
lookup_leader(Agent, TopicFilter) -> lookup_leader(Agent, AgentMetadata, TopicFilter) ->
gen_server:cast(?MODULE, #lookup_leader{agent = Agent, topic_filter = TopicFilter}). gen_server:cast(?MODULE, #lookup_leader{
agent = Agent, agent_metadata = AgentMetadata, topic_filter = TopicFilter
}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal API %% Internal API
@ -66,8 +71,10 @@ init([]) ->
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
{reply, {error, unknown_request}, State}. {reply, {error, unknown_request}, State}.
handle_cast(#lookup_leader{agent = Agent, topic_filter = TopicFilter}, State) -> handle_cast(
State1 = do_lookup_leader(Agent, TopicFilter, State), #lookup_leader{agent = Agent, agent_metadata = AgentMetadata, topic_filter = TopicFilter}, State
) ->
State1 = do_lookup_leader(Agent, AgentMetadata, TopicFilter, State),
{noreply, State1}. {noreply, State1}.
handle_info(_Info, State) -> handle_info(_Info, State) ->
@ -80,7 +87,7 @@ terminate(_Reason, _State) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
do_lookup_leader(Agent, TopicFilter, State) -> do_lookup_leader(Agent, AgentMetadata, TopicFilter, State) ->
%% TODO https://emqx.atlassian.net/browse/EMQX-12309 %% TODO https://emqx.atlassian.net/browse/EMQX-12309
%% Cluster-wide unique leader election should be implemented %% Cluster-wide unique leader election should be implemented
Id = emqx_ds_shared_sub_leader:id(TopicFilter), Id = emqx_ds_shared_sub_leader:id(TopicFilter),
@ -107,5 +114,7 @@ do_lookup_leader(Agent, TopicFilter, State) ->
topic_filter => TopicFilter, topic_filter => TopicFilter,
leader => LeaderPid leader => LeaderPid
}), }),
ok = emqx_ds_shared_sub_proto:agent_connect_leader(LeaderPid, Agent, TopicFilter), ok = emqx_ds_shared_sub_proto:agent_connect_leader(
LeaderPid, Agent, AgentMetadata, TopicFilter
),
State. State.

View File

@ -11,7 +11,7 @@
-export([ -export([
introduced_in/0, introduced_in/0,
agent_connect_leader/4, agent_connect_leader/5,
agent_update_stream_states/5, agent_update_stream_states/5,
agent_update_stream_states/6, agent_update_stream_states/6,
@ -29,11 +29,12 @@ introduced_in() ->
node(), node(),
emqx_ds_shared_sub_proto:leader(), emqx_ds_shared_sub_proto:leader(),
emqx_ds_shared_sub_proto:agent(), emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:agent_metadata(),
emqx_ds_shared_sub_proto:topic_filter() emqx_ds_shared_sub_proto:topic_filter()
) -> ok. ) -> ok.
agent_connect_leader(Node, ToLeader, FromAgent, TopicFilter) -> agent_connect_leader(Node, ToLeader, FromAgent, AgentMetadata, TopicFilter) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, agent_connect_leader, [ erpc:cast(Node, emqx_ds_shared_sub_proto, agent_connect_leader, [
ToLeader, FromAgent, TopicFilter ToLeader, FromAgent, AgentMetadata, TopicFilter
]). ]).
-spec agent_update_stream_states( -spec agent_update_stream_states(