diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index a16b2bcf9..cd029d8df 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -177,7 +177,7 @@ fetch_stream_events( %% Connecting state handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) -> - ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter), + ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM), ShareTopicFilter), ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT). handle_leader_lease_streams( @@ -201,7 +201,7 @@ handle_leader_lease_streams(GSM, _Leader, _StreamProgresses, _Version) -> GSM. handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) -> - ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter), + ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), TopicFilter), GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT), GSM1. @@ -444,6 +444,9 @@ transition(GSM0, NewState, NewStateData, LeaseEvents) -> }, run_enter_callback(GSM2). +agent_metadata(#{id := Id} = _GSM) -> + #{id => Id}. + ensure_state_timeout(GSM0, Name, Delay) -> ensure_state_timeout(GSM0, Name, Delay, Name). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index e1437dc36..379c0278f 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -43,6 +43,7 @@ state := emqx_ds_shared_sub_agent:status(), prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()), version := emqx_ds_shared_sub_proto:version(), + agent_metadata := emqx_ds_shared_sub_proto:agent_metadata(), streams := list(emqx_ds:stream()), revoked_streams := list(emqx_ds:stream()) }. @@ -182,9 +183,11 @@ handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0) {keep_state, Data1, {{timeout, #drop_timeout{}}, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}}; %%-------------------------------------------------------------------- %% agent events -handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?leader_active, Data0) -> +handle_event( + info, ?agent_connect_leader_match(Agent, AgentMetadata, _TopicFilter), ?leader_active, Data0 +) -> % ?tp(warning, shared_sub_leader_connect_agent, #{agent => Agent}), - Data1 = connect_agent(Data0, Agent), + Data1 = connect_agent(Data0, Agent, AgentMetadata), {keep_state, Data1}; handle_event( info, @@ -375,7 +378,8 @@ select_streams_for_assign(Data0, _Agent, AssignCount) -> connect_agent( #{group := Group} = Data, - Agent + Agent, + AgentMetadata ) -> %% TODO %% implement graceful reconnection of the same agent @@ -385,13 +389,13 @@ connect_agent( group => Group }), DesiredCount = desired_stream_count_for_new_agent(Data), - assign_initial_streams_to_agent(Data, Agent, DesiredCount). + assign_initial_streams_to_agent(Data, Agent, AgentMetadata, DesiredCount). -assign_initial_streams_to_agent(Data, Agent, AssignCount) -> +assign_initial_streams_to_agent(Data, Agent, AgentMetadata, AssignCount) -> InitialStreamsToAssign = select_streams_for_assign(Data, Agent, AssignCount), Data1 = set_stream_ownership_to_agent(Data, Agent, InitialStreamsToAssign), AgentState = agent_transition_to_initial_waiting_replaying( - Data1, Agent, InitialStreamsToAssign + Data1, Agent, AgentMetadata, InitialStreamsToAssign ), set_agent_state(Data1, Agent, AgentState). @@ -639,7 +643,7 @@ agent_transition_to_waiting_replaying( }. agent_transition_to_initial_waiting_replaying( - #{group := Group} = Data, Agent, InitialStreams + #{group := Group} = Data, Agent, AgentMetadata, InitialStreams ) -> ?tp(warning, shared_sub_leader_agent_state_transition, #{ agent => Agent, @@ -653,6 +657,7 @@ agent_transition_to_initial_waiting_replaying( Agent, Group, Leader, StreamProgresses, Version ), #{ + metadata => AgentMetadata, state => ?waiting_replaying, version => Version, prev_version => undefined, diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index 363a16e46..ec0a25f14 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -13,7 +13,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([ - agent_connect_leader/3, + agent_connect_leader/4, agent_update_stream_states/4, agent_update_stream_states/5, @@ -34,6 +34,9 @@ -type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type group() :: emqx_types:group(). -type version() :: non_neg_integer(). +-type agent_metadata() :: #{ + id := emqx_persistent_session_ds:id() +}. -type stream_progress() :: #{ stream := emqx_ds:stream(), @@ -51,7 +54,9 @@ leader/0, group/0, version/0, - stream_progress/0 + stream_progress/0, + agent_stream_progress/0, + agent_metadata/0 ]). %%-------------------------------------------------------------------- @@ -60,19 +65,22 @@ %% agent -> leader messages --spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok. -agent_connect_leader(ToLeader, FromAgent, TopicFilter) when ?is_local_leader(ToLeader) -> +-spec agent_connect_leader(leader(), agent(), agent_metadata(), topic_filter()) -> ok. +agent_connect_leader(ToLeader, FromAgent, AgentMetadata, TopicFilter) when + ?is_local_leader(ToLeader) +-> ?tp(warning, shared_sub_proto_msg, #{ type => agent_connect_leader, to_leader => ToLeader, from_agent => FromAgent, + agent_metadata => AgentMetadata, topic_filter => TopicFilter }), - _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)), + _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, TopicFilter)), ok; -agent_connect_leader(ToLeader, FromAgent, TopicFilter) -> +agent_connect_leader(ToLeader, FromAgent, AgentMetadata, TopicFilter) -> emqx_ds_shared_sub_proto_v1:agent_connect_leader( - ?leader_node(ToLeader), ToLeader, FromAgent, TopicFilter + ?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, TopicFilter ). -spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl index c9227ea2d..a2cf284f3 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl @@ -20,15 +20,17 @@ %% Agent messages sent to the leader. %% Leader talks to many agents, `agent` field is used to identify the sender. --define(agent_connect_leader(Agent, TopicFilter), #{ +-define(agent_connect_leader(Agent, AgentMetadata, TopicFilter), #{ type => ?agent_connect_leader_msg, topic_filter => TopicFilter, + agent_metadata => AgentMetadata, agent => Agent }). --define(agent_connect_leader_match(Agent, TopicFilter), #{ +-define(agent_connect_leader_match(Agent, AgentMetadata, TopicFilter), #{ type := ?agent_connect_leader_msg, topic_filter := TopicFilter, + agent_metadata := AgentMetadata, agent := Agent }). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl index 9b4a6bd11..bc732249a 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl @@ -20,11 +20,12 @@ ]). -export([ - lookup_leader/2 + lookup_leader/3 ]). -record(lookup_leader, { agent :: emqx_ds_shared_sub_proto:agent(), + agent_metadata :: emqx_ds_shared_sub_proto:agent_metadata(), topic_filter :: emqx_persistent_session_ds:share_topic_filter() }). @@ -35,10 +36,14 @@ %%-------------------------------------------------------------------- -spec lookup_leader( - emqx_ds_shared_sub_proto:agent(), emqx_persistent_session_ds:share_topic_filter() + emqx_ds_shared_sub_proto:agent(), + emqx_ds_shared_sub_proto:agent_metadata(), + emqx_persistent_session_ds:share_topic_filter() ) -> ok. -lookup_leader(Agent, TopicFilter) -> - gen_server:cast(?MODULE, #lookup_leader{agent = Agent, topic_filter = TopicFilter}). +lookup_leader(Agent, AgentMetadata, TopicFilter) -> + gen_server:cast(?MODULE, #lookup_leader{ + agent = Agent, agent_metadata = AgentMetadata, topic_filter = TopicFilter + }). %%-------------------------------------------------------------------- %% Internal API @@ -66,8 +71,10 @@ init([]) -> handle_call(_Request, _From, State) -> {reply, {error, unknown_request}, State}. -handle_cast(#lookup_leader{agent = Agent, topic_filter = TopicFilter}, State) -> - State1 = do_lookup_leader(Agent, TopicFilter, State), +handle_cast( + #lookup_leader{agent = Agent, agent_metadata = AgentMetadata, topic_filter = TopicFilter}, State +) -> + State1 = do_lookup_leader(Agent, AgentMetadata, TopicFilter, State), {noreply, State1}. handle_info(_Info, State) -> @@ -80,7 +87,7 @@ terminate(_Reason, _State) -> %% Internal functions %%-------------------------------------------------------------------- -do_lookup_leader(Agent, TopicFilter, State) -> +do_lookup_leader(Agent, AgentMetadata, TopicFilter, State) -> %% TODO https://emqx.atlassian.net/browse/EMQX-12309 %% Cluster-wide unique leader election should be implemented Id = emqx_ds_shared_sub_leader:id(TopicFilter), @@ -107,5 +114,7 @@ do_lookup_leader(Agent, TopicFilter, State) -> topic_filter => TopicFilter, leader => LeaderPid }), - ok = emqx_ds_shared_sub_proto:agent_connect_leader(LeaderPid, Agent, TopicFilter), + ok = emqx_ds_shared_sub_proto:agent_connect_leader( + LeaderPid, Agent, AgentMetadata, TopicFilter + ), State. diff --git a/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl b/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl index b0a132ea5..01c704cb0 100644 --- a/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl +++ b/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl @@ -11,7 +11,7 @@ -export([ introduced_in/0, - agent_connect_leader/4, + agent_connect_leader/5, agent_update_stream_states/5, agent_update_stream_states/6, @@ -29,11 +29,12 @@ introduced_in() -> node(), emqx_ds_shared_sub_proto:leader(), emqx_ds_shared_sub_proto:agent(), + emqx_ds_shared_sub_proto:agent_metadata(), emqx_ds_shared_sub_proto:topic_filter() ) -> ok. -agent_connect_leader(Node, ToLeader, FromAgent, TopicFilter) -> +agent_connect_leader(Node, ToLeader, FromAgent, AgentMetadata, TopicFilter) -> erpc:cast(Node, emqx_ds_shared_sub_proto, agent_connect_leader, [ - ToLeader, FromAgent, TopicFilter + ToLeader, FromAgent, AgentMetadata, TopicFilter ]). -spec agent_update_stream_states(