From bceb5d43ed00b601154c35e04acd14790a3a0a38 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 26 Jun 2024 22:36:31 +0300 Subject: [PATCH] feat(queue): fix stream rebalancing issues, update tests --- .../src/emqx_ds_shared_sub_agent.erl | 2 +- .../src/emqx_ds_shared_sub_group_sm.erl | 61 ++++++-- .../src/emqx_ds_shared_sub_leader.erl | 141 +++++++++++++----- .../src/emqx_ds_shared_sub_proto.erl | 76 ++++++++++ .../test/emqx_ds_shared_sub_SUITE.erl | 35 ++--- 5 files changed, 239 insertions(+), 76 deletions(-) diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index 6e43e0a65..5e27f290a 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -190,7 +190,7 @@ send_to_subscription_after(Group) -> with_group_sm(State, Group, Fun) -> case State of #{groups := #{Group := GSM0} = Groups} -> - GSM1 = Fun(GSM0), + #{} = GSM1 = Fun(GSM0), State#{groups => Groups#{Group => GSM1}}; _ -> %% TODO diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index eb13e7147..1bf023e56 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -122,7 +122,8 @@ %% TODO https://emqx.atlassian.net/browse/EMQX-12574 %% Move to settings -define(FIND_LEADER_TIMEOUT, 1000). --define(RENEW_LEASE_TIMEOUT, 2000). +-define(RENEW_LEASE_TIMEOUT, 5000). +-define(MIN_UPDATE_STREAM_STATE_INTERVAL, 500). %%----------------------------------------------------------------------- %% API @@ -204,8 +205,12 @@ handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0 %%----------------------------------------------------------------------- %% Replaying state -handle_replaying(GSM) -> - ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT). +handle_replaying(GSM0) -> + GSM1 = ensure_state_timeout(GSM0, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT), + GSM2 = ensure_state_timeout( + GSM1, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL + ), + GSM2. handle_renew_lease_timeout(GSM) -> ?tp(debug, renew_lease_timeout, #{}), @@ -214,8 +219,12 @@ handle_renew_lease_timeout(GSM) -> %%----------------------------------------------------------------------- %% Updating state -handle_updating(GSM) -> - ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT). +handle_updating(GSM0) -> + GSM1 = ensure_state_timeout(GSM0, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT), + GSM2 = ensure_state_timeout( + GSM1, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL + ), + GSM2. %%----------------------------------------------------------------------- %% Common handlers @@ -223,7 +232,7 @@ handle_updating(GSM) -> handle_leader_update_streams( #{ state := ?replaying, - stream_data := #{streams := Streams0, version := VersionOld} = StateData + state_data := #{streams := Streams0, version := VersionOld} = StateData } = GSM, VersionOld, VersionNew, @@ -275,14 +284,19 @@ handle_leader_update_streams( handle_leader_update_streams( #{ state := ?updating, - stream_data := #{version := VersionNew} = _StreamData + state_data := #{version := VersionNew} = _StreamData } = GSM, _VersionOld, VersionNew, _StreamProgresses ) -> ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); -handle_leader_update_streams(GSM, _VersionOld, _VersionNew, _StreamProgresses) -> +handle_leader_update_streams(GSM, VersionOld, VersionNew, _StreamProgresses) -> + ?tp(warning, shared_sub_group_sm_unexpected_leader_update_streams, #{ + gsm => GSM, + version_old => VersionOld, + version_new => VersionNew + }), %% Unexpected versions or state transition(GSM, ?connecting, #{}). @@ -311,7 +325,13 @@ handle_leader_renew_stream_lease( VersionNew ) -> ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); -handle_leader_renew_stream_lease(GSM, _VersionOld, _VersionNew) -> +handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) -> + ?tp(warning, shared_sub_group_sm_unexpected_leader_renew_stream_lease, #{ + gsm => GSM, + version_old => VersionOld, + version_new => VersionNew + }), + %% Unexpected versions or state transition(GSM, ?connecting, #{}). handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) -> @@ -319,32 +339,34 @@ handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) -> handle_stream_progress( #{ state := ?replaying, + agent := Agent, state_data := #{ - agent := Agent, leader := Leader, version := Version } - } = _GSM, + } = GSM, StreamProgresses ) -> ok = emqx_ds_shared_sub_proto:agent_update_stream_states( Leader, Agent, StreamProgresses, Version - ); + ), + ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL); handle_stream_progress( #{ state := ?updating, + agent := Agent, state_data := #{ - agent := Agent, leader := Leader, version := Version, prev_version := PrevVersion } - } = _GSM, + } = GSM, StreamProgresses ) -> ok = emqx_ds_shared_sub_proto:agent_update_stream_states( Leader, Agent, StreamProgresses, PrevVersion, Version - ). + ), + ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL). handle_leader_invalidate(GSM) -> transition(GSM, ?connecting, #{}). @@ -365,7 +387,14 @@ handle_state_timeout( renew_lease_timeout, _Message ) -> - handle_renew_lease_timeout(GSM). + handle_renew_lease_timeout(GSM); +handle_state_timeout( + GSM, + update_stream_state_timeout, + _Message +) -> + ?tp(debug, update_stream_state_timeout, #{}), + handle_stream_progress(GSM, []). handle_info( #{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = _Info diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 3f2a85424..64a74510a 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -75,7 +75,7 @@ %% States -define(leader_waiting_registration, leader_waiting_registration). --define(leader_replaying, leader_replaying). +-define(leader_active, leader_active). %% Events @@ -96,6 +96,8 @@ -define(AGENT_TIMEOUT, 5000). +-define(START_TIME_THRESHOLD, 5000). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -133,6 +135,7 @@ init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) -> group => Group, topic => Topic, router_id => gen_router_id(), + start_time => now_ms() - ?START_TIME_THRESHOLD, stream_progresses => #{}, stream_owners => #{}, agents => #{} @@ -146,37 +149,50 @@ handle_event({call, From}, #register{register_fun = Fun}, ?leader_waiting_regist Self = self(), case Fun() of Self -> - {next_state, ?replaying, Data, {reply, From, {ok, Self}}}; + {next_state, ?leader_active, Data, {reply, From, {ok, Self}}}; OtherPid -> {stop_and_reply, normal, {reply, From, {ok, OtherPid}}} end; %%-------------------------------------------------------------------- %% repalying state -handle_event(enter, _OldState, ?leader_replaying, #{topic := Topic, router_id := RouterId} = _Data) -> +handle_event(enter, _OldState, ?leader_active, #{topic := Topic, router_id := RouterId} = _Data) -> + ?tp(warning, shared_sub_leader_enter_actve, #{topic => Topic, router_id => RouterId}), ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId), {keep_state_and_data, [ - {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}, - {state_timeout, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}, - {state_timeout, 0, #renew_streams{}} + {{timeout, #renew_streams{}}, 0, #renew_streams{}}, + {{timeout, #renew_leases{}}, ?RENEW_LEASE_INTERVAL, #renew_leases{}}, + {{timeout, #drop_timeout{}}, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}} ]}; -handle_event(state_timeout, #renew_streams{}, ?leader_replaying, Data0) -> +%%-------------------------------------------------------------------- +%% timers +%% renew_streams timer +handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data0) -> + % ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_streams}), Data1 = renew_streams(Data0), - {keep_state, Data1, {state_timeout, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}}; -handle_event(state_timeout, #renew_leases{}, ?leader_replaying, Data0) -> + {keep_state, Data1, {{timeout, #renew_streams{}}, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}}; +%% renew_leases timer +handle_event({timeout, #renew_leases{}}, #renew_leases{}, ?leader_active, Data0) -> + % ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_leases}), Data1 = renew_leases(Data0), - {keep_state, Data1, {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}}; -handle_event(state_timeout, #drop_timeout{}, ?leader_replaying, Data0) -> + {keep_state, Data1, {{timeout, #renew_leases{}}, ?RENEW_LEASE_INTERVAL, #renew_leases{}}}; +%% drop_timeout timer +handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0) -> + % ?tp(warning, shared_sub_leader_timeout, #{timeout => drop_timeout}), Data1 = drop_timeout_agents(Data0), - {keep_state, Data1, {state_timeout, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}}; -handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?leader_replaying, Data0) -> + {keep_state, Data1, {{timeout, #drop_timeout{}}, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}}; +%%-------------------------------------------------------------------- +%% agent events +handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?leader_active, Data0) -> + % ?tp(warning, shared_sub_leader_connect_agent, #{agent => Agent}), Data1 = connect_agent(Data0, Agent), {keep_state, Data1}; handle_event( info, ?agent_update_stream_states_match(Agent, StreamProgresses, Version), - ?leader_replaying, + ?leader_active, Data0 ) -> + % ?tp(warning, shared_sub_leader_update_stream_states, #{agent => Agent, version => Version}), Data1 = with_agent(Data0, Agent, fun() -> update_agent_stream_states(Data0, Agent, StreamProgresses, Version) end), @@ -184,9 +200,12 @@ handle_event( handle_event( info, ?agent_update_stream_states_match(Agent, StreamProgresses, VersionOld, VersionNew), - ?leader_replaying, + ?leader_active, Data0 ) -> + % ?tp(warning, shared_sub_leader_update_stream_states, #{ + % agent => Agent, version_old => VersionOld, version_new => VersionNew + % }), Data1 = with_agent(Data0, Agent, fun() -> update_agent_stream_states(Data0, Agent, StreamProgresses, VersionOld, VersionNew) end), @@ -195,10 +214,11 @@ handle_event( %% fallback handle_event(enter, _OldState, _State, _Data) -> keep_state_and_data; -handle_event(Event, _Content, State, _Data) -> +handle_event(Event, Content, State, _Data) -> ?SLOG(warning, #{ msg => unexpected_event, event => Event, + content => Content, state => State }), keep_state_and_data. @@ -218,11 +238,10 @@ terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) -> %% * Revoke streams from agents having too many streams %% * Assign streams to agents having too few streams -renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) -> +renew_streams(#{start_time := StartTime, stream_progresses := Progresses, topic := Topic} = Data0) -> TopicFilter = emqx_topic:words(Topic), - StartTime = now_ms(), {_, Streams} = lists:unzip( - emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, now_ms()) + emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime) ), %% TODO https://emqx.atlassian.net/browse/EMQX-12572 %% Handle stream removal @@ -274,6 +293,12 @@ revoke_excess_streams_from_agent(Data0, Agent, DesiredCount) -> false -> AgentState0; true -> + ?tp(warning, shared_sub_leader_revoke_streams, #{ + agent => Agent, + agent_stream_count => length(Streams0), + revoke_count => RevokeCount, + desired_count => DesiredCount + }), revoke_streams_from_agent(Data0, Agent, AgentState0, RevokeCount) end, set_agent_state(Data0, Agent, AgentState1). @@ -321,6 +346,12 @@ assign_lacking_streams(Data0, Agent, DesiredCount) -> false -> Data0; true -> + ?tp(warning, shared_sub_leader_assign_streams, #{ + agent => Agent, + agent_stream_count => length(Streams0), + assign_count => AssignCount, + desired_count => DesiredCount + }), assign_streams_to_agent(Data0, Agent, AssignCount) end. @@ -346,12 +377,15 @@ connect_agent( #{group := Group} = Data, Agent ) -> + %% TODO + %% implement graceful reconnection of the same agent ?SLOG(info, #{ msg => leader_agent_connected, agent => Agent, group => Group }), DesiredCount = desired_streams_per_agent(Data), + % DesiredCount = desired_streams_for_new_agent(Data), assign_initial_streams_to_agent(Data, Agent, DesiredCount). assign_initial_streams_to_agent(Data, Agent, AssignCount) -> @@ -388,6 +422,7 @@ drop_timeout_agents(#{agents := Agents} = Data) -> %% Send lease confirmations to agents renew_leases(#{agents := AgentStates} = Data) -> + ?tp(warning, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}), ok = lists:foreach( fun({Agent, AgentState}) -> renew_lease(Data, Agent, AgentState) @@ -407,11 +442,11 @@ renew_lease(#{group := Group} = Data, Agent, #{ ok = emqx_ds_shared_sub_proto:leader_update_streams( Agent, Group, PrevVersion, Version, StreamProgresses ), - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version, PrevVersion); + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, PrevVersion, Version); renew_lease(#{group := Group}, Agent, #{ state := ?updating, version := Version, prev_version := PrevVersion }) -> - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version, PrevVersion). + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, PrevVersion, Version). %%-------------------------------------------------------------------- %% Handle stream progress updates from agent in replaying state @@ -427,7 +462,7 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version) -> %% Agent finished updating, now replaying Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses), AgentState1 = update_agent_timeout(AgentState0), - AgentState2 = agent_transition_to_replaying(AgentState1), + AgentState2 = agent_transition_to_replaying(Agent, AgentState1), set_agent_state(Data1, Agent, AgentState2); {?replaying, AgentVersion} -> %% Common case, agent is replaying @@ -521,10 +556,10 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers {AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses), AgentState3 = case AgentState2 of - #{revoke_streams := []} -> - agent_transition_to_waiting_replaying(AgentState2); + #{revoked_streams := []} -> + agent_transition_to_waiting_replaying(Data1, Agent, AgentState2); _ -> - agent_transition_to_updating(AgentState2) + agent_transition_to_updating(Agent, AgentState2) end, set_agent_state(Data2, Agent, AgentState3); {?updating, AgentPrevVersion, AgentVersion} -> @@ -533,8 +568,8 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers {AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses), AgentState3 = case AgentState2 of - #{revoke_streams := []} -> - agent_transition_to_waiting_replaying(AgentState2); + #{revoked_streams := []} -> + agent_transition_to_waiting_replaying(Data1, Agent, AgentState2); _ -> AgentState2 end, @@ -566,10 +601,15 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers agent_transition_to_waiting_updating( #{group := Group} = Data, Agent, - #{version := Version, prev_version := undefined} = AgentState0, + #{state := OldState, version := Version, prev_version := undefined} = AgentState0, Streams, RevokedStreams ) -> + ?tp(warning, shared_sub_leader_agent_state_transition, #{ + agent => Agent, + old_state => OldState, + new_state => ?waiting_updating + }), NewVersion = next_version(Version), AgentState1 = AgentState0#{ @@ -585,7 +625,15 @@ agent_transition_to_waiting_updating( ), AgentState1. -agent_transition_to_waiting_replaying(AgentState0) -> +agent_transition_to_waiting_replaying( + #{group := Group} = _Data, Agent, #{state := OldState, version := Version} = AgentState0 +) -> + ?tp(warning, shared_sub_leader_agent_state_transition, #{ + agent => Agent, + old_state => OldState, + new_state => ?waiting_replaying + }), + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version), AgentState0#{ state => ?waiting_replaying, revoked_streams => [] @@ -594,6 +642,11 @@ agent_transition_to_waiting_replaying(AgentState0) -> agent_transition_to_initial_waiting_replaying( #{group := Group} = Data, Agent, InitialStreams ) -> + ?tp(warning, shared_sub_leader_agent_state_transition, #{ + agent => Agent, + old_state => none, + new_state => ?waiting_replaying + }), Version = 0, StreamProgresses = stream_progresses(Data, InitialStreams), Leader = this_leader(Data), @@ -609,13 +662,23 @@ agent_transition_to_initial_waiting_replaying( update_deadline => now_ms() + ?AGENT_TIMEOUT }. -agent_transition_to_replaying(#{state := ?waiting_replaying} = AgentState) -> +agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState) -> + ?tp(warning, shared_sub_leader_agent_state_transition, #{ + agent => Agent, + old_state => ?waiting_replaying, + new_state => ?replaying + }), AgentState#{ state => ?replaying, prev_version => undefined }. -agent_transition_to_updating(#{state := ?waiting_updating} = AgentState) -> +agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState) -> + ?tp(warning, shared_sub_leader_agent_state_transition, #{ + agent => Agent, + old_state => ?waiting_updating, + new_state => ?updating + }), AgentState#{state => ?updating}. %%-------------------------------------------------------------------- @@ -645,14 +708,24 @@ replaying_agents(#{agents := AgentStates}) -> maps:to_list(AgentStates) ). -desired_streams_per_agent(#{agents := AgentStates, stream_progresses := StreamProgresses}) -> - AgentCount = maps:size(AgentStates), +desired_streams_per_agent(#{agents := AgentStates} = Data) -> + desired_streams_per_agent(Data, maps:size(AgentStates)). + +desired_streams_for_new_agent(#{agents := AgentStates} = Data) -> + desired_streams_per_agent(Data, maps:size(AgentStates) + 1). + +desired_streams_per_agent(#{stream_progresses := StreamProgresses}, AgentCount) -> case AgentCount of 0 -> 0; _ -> StreamCount = maps:size(StreamProgresses), - (StreamCount div AgentCount) + 1 + case StreamCount rem AgentCount of + 0 -> + StreamCount div AgentCount; + _ -> + 1 + StreamCount div AgentCount + end end. stream_progresses(#{stream_progresses := StreamProgresses} = _Data, Streams) -> diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index 7d81de083..01f63aaad 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -9,6 +9,7 @@ -module(emqx_ds_shared_sub_proto). -include("emqx_ds_shared_sub_proto.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([ agent_connect_leader/3, @@ -47,15 +48,32 @@ stream_progress/0 ]). +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + %% agent -> leader messages -spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok. agent_connect_leader(ToLeader, FromAgent, TopicFilter) -> + ?tp(warning, shared_sub_proto_msg, #{ + type => agent_connect_leader, + to_leader => ToLeader, + from_agent => FromAgent, + topic_filter => TopicFilter + }), _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)), ok. -spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok. agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> + ?tp(warning, shared_sub_proto_msg, #{ + type => agent_update_stream_states, + to_leader => ToLeader, + from_agent => FromAgent, + stream_progresses => format_streams(StreamProgresses), + version => Version + }), _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)), ok. @@ -63,6 +81,14 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> leader(), agent(), list(agent_stream_progress()), version(), version() ) -> ok. agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) -> + ?tp(warning, shared_sub_proto_msg, #{ + type => agent_update_stream_states, + to_leader => ToLeader, + from_agent => FromAgent, + stream_progresses => format_streams(StreamProgresses), + version_old => VersionOld, + version_new => VersionNew + }), _ = erlang:send( ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew) ), @@ -72,6 +98,14 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve -spec leader_lease_streams(agent(), group(), leader(), list(stream_progress()), version()) -> ok. leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> + ?tp(warning, shared_sub_proto_msg, #{ + type => leader_lease_streams, + to_agent => ToAgent, + of_group => OfGroup, + leader => Leader, + streams => format_streams(Streams), + version => Version + }), _ = emqx_persistent_session_ds_shared_subs_agent:send( ToAgent, ?leader_lease_streams(OfGroup, Leader, Streams, Version) @@ -80,6 +114,12 @@ leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> -spec leader_renew_stream_lease(agent(), group(), version()) -> ok. leader_renew_stream_lease(ToAgent, OfGroup, Version) -> + ?tp(warning, shared_sub_proto_msg, #{ + type => leader_renew_stream_lease, + to_agent => ToAgent, + of_group => OfGroup, + version => Version + }), _ = emqx_persistent_session_ds_shared_subs_agent:send( ToAgent, ?leader_renew_stream_lease(OfGroup, Version) @@ -88,6 +128,13 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) -> -spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok. leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> + ?tp(warning, shared_sub_proto_msg, #{ + type => leader_renew_stream_lease, + to_agent => ToAgent, + of_group => OfGroup, + version_old => VersionOld, + version_new => VersionNew + }), _ = emqx_persistent_session_ds_shared_subs_agent:send( ToAgent, ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew) @@ -96,6 +143,14 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> -spec leader_update_streams(agent(), group(), version(), version(), list(stream_progress())) -> ok. leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> + ?tp(warning, shared_sub_proto_msg, #{ + type => leader_update_streams, + to_agent => ToAgent, + of_group => OfGroup, + version_old => VersionOld, + version_new => VersionNew, + streams_new => format_streams(StreamsNew) + }), _ = emqx_persistent_session_ds_shared_subs_agent:send( ToAgent, ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew) @@ -104,8 +159,29 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> -spec leader_invalidate(agent(), group()) -> ok. leader_invalidate(ToAgent, OfGroup) -> + ?tp(warning, shared_sub_proto_msg, #{ + type => leader_invalidate, + to_agent => ToAgent, + of_group => OfGroup + }), _ = emqx_persistent_session_ds_shared_subs_agent:send( ToAgent, ?leader_invalidate(OfGroup) ), ok. + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +format_opaque(Opaque) -> + erlang:phash2(Opaque). + +format_streams(Streams) -> + lists:map( + fun format_stream/1, + Streams + ). + +format_stream(#{stream := Stream, iterator := Iterator} = Value) -> + Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}. diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index f18114918..6eafe0e4a 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -13,7 +13,8 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/asserts.hrl"). -all() -> emqx_common_test_helpers:all(?MODULE). +all() -> + emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> Apps = emqx_cth_suite:start( @@ -51,17 +52,16 @@ end_per_testcase(_TC, _Config) -> ok. t_lease_initial(_Config) -> - ConnPub = emqtt_connect_pub(<<"client_pub">>), - - %% Need to pre-create some streams in "topic/#". - %% Leader is dummy by far and won't update streams after the first lease to the agent. - %% So there should be some streams already when the agent connects. - ok = init_streams(ConnPub, <<"topic1/1">>), - ConnShared = emqtt_connect_sub(<<"client_shared">>), {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic1/#">>, 1), - {ok, _} = emqtt:publish(ConnPub, <<"topic1/1">>, <<"hello2">>, 1), + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + {ok, _} = emqtt:publish(ConnPub, <<"topic1/1">>, <<"hello1">>, 1), + ct:sleep(2_000), + {ok, _} = emqtt:publish(ConnPub, <<"topic1/2">>, <<"hello2">>, 1), + + ?assertReceive({publish, #{payload := <<"hello1">>}}, 10_000), ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000), ok = emqtt:disconnect(ConnShared), @@ -70,11 +70,6 @@ t_lease_initial(_Config) -> t_lease_reconnect(_Config) -> ConnPub = emqtt_connect_pub(<<"client_pub">>), - %% Need to pre-create some streams in "topic/#". - %% Leader is dummy by far and won't update streams after the first lease to the agent. - %% So there should be some streams already when the agent connects. - ok = init_streams(ConnPub, <<"topic2/2">>), - ConnShared = emqtt_connect_sub(<<"client_shared">>), %% Stop registry to simulate unability to find leader. @@ -93,7 +88,6 @@ t_lease_reconnect(_Config) -> 5_000 ), - ct:sleep(1_000), {ok, _} = emqtt:publish(ConnPub, <<"topic2/2">>, <<"hello2">>, 1), ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000), @@ -114,7 +108,7 @@ t_renew_lease_timeout(_Config) -> ?wait_async_action( ok = terminate_leaders(), #{?snk_kind := leader_lease_streams}, - 5_000 + 10_000 ), fun(Trace) -> ?strict_causality( @@ -131,15 +125,6 @@ t_renew_lease_timeout(_Config) -> %% Helper functions %%-------------------------------------------------------------------- -init_streams(ConnPub, Topic) -> - ConnRegular = emqtt_connect_sub(<<"client_regular">>), - {ok, _, _} = emqtt:subscribe(ConnRegular, Topic, 1), - {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello1">>, 1), - - ?assertReceive({publish, #{payload := <<"hello1">>}}, 5_000), - - ok = emqtt:disconnect(ConnRegular). - emqtt_connect_sub(ClientId) -> {ok, C} = emqtt:start_link([ {client_id, ClientId},