feat(queue): fix stream rebalancing issues, update tests

This commit is contained in:
Ilya Averyanov 2024-06-26 22:36:31 +03:00
parent 03fea34962
commit bceb5d43ed
5 changed files with 239 additions and 76 deletions

View File

@ -190,7 +190,7 @@ send_to_subscription_after(Group) ->
with_group_sm(State, Group, Fun) ->
case State of
#{groups := #{Group := GSM0} = Groups} ->
GSM1 = Fun(GSM0),
#{} = GSM1 = Fun(GSM0),
State#{groups => Groups#{Group => GSM1}};
_ ->
%% TODO

View File

@ -122,7 +122,8 @@
%% TODO https://emqx.atlassian.net/browse/EMQX-12574
%% Move to settings
-define(FIND_LEADER_TIMEOUT, 1000).
-define(RENEW_LEASE_TIMEOUT, 2000).
-define(RENEW_LEASE_TIMEOUT, 5000).
-define(MIN_UPDATE_STREAM_STATE_INTERVAL, 500).
%%-----------------------------------------------------------------------
%% API
@ -204,8 +205,12 @@ handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0
%%-----------------------------------------------------------------------
%% Replaying state
handle_replaying(GSM) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT).
handle_replaying(GSM0) ->
GSM1 = ensure_state_timeout(GSM0, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT),
GSM2 = ensure_state_timeout(
GSM1, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL
),
GSM2.
handle_renew_lease_timeout(GSM) ->
?tp(debug, renew_lease_timeout, #{}),
@ -214,8 +219,12 @@ handle_renew_lease_timeout(GSM) ->
%%-----------------------------------------------------------------------
%% Updating state
handle_updating(GSM) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT).
handle_updating(GSM0) ->
GSM1 = ensure_state_timeout(GSM0, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT),
GSM2 = ensure_state_timeout(
GSM1, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL
),
GSM2.
%%-----------------------------------------------------------------------
%% Common handlers
@ -223,7 +232,7 @@ handle_updating(GSM) ->
handle_leader_update_streams(
#{
state := ?replaying,
stream_data := #{streams := Streams0, version := VersionOld} = StateData
state_data := #{streams := Streams0, version := VersionOld} = StateData
} = GSM,
VersionOld,
VersionNew,
@ -275,14 +284,19 @@ handle_leader_update_streams(
handle_leader_update_streams(
#{
state := ?updating,
stream_data := #{version := VersionNew} = _StreamData
state_data := #{version := VersionNew} = _StreamData
} = GSM,
_VersionOld,
VersionNew,
_StreamProgresses
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
handle_leader_update_streams(GSM, _VersionOld, _VersionNew, _StreamProgresses) ->
handle_leader_update_streams(GSM, VersionOld, VersionNew, _StreamProgresses) ->
?tp(warning, shared_sub_group_sm_unexpected_leader_update_streams, #{
gsm => GSM,
version_old => VersionOld,
version_new => VersionNew
}),
%% Unexpected versions or state
transition(GSM, ?connecting, #{}).
@ -311,7 +325,13 @@ handle_leader_renew_stream_lease(
VersionNew
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
handle_leader_renew_stream_lease(GSM, _VersionOld, _VersionNew) ->
handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) ->
?tp(warning, shared_sub_group_sm_unexpected_leader_renew_stream_lease, #{
gsm => GSM,
version_old => VersionOld,
version_new => VersionNew
}),
%% Unexpected versions or state
transition(GSM, ?connecting, #{}).
handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) ->
@ -319,32 +339,34 @@ handle_stream_progress(#{state := ?connecting} = GSM, _StreamProgresses) ->
handle_stream_progress(
#{
state := ?replaying,
state_data := #{
agent := Agent,
state_data := #{
leader := Leader,
version := Version
}
} = _GSM,
} = GSM,
StreamProgresses
) ->
ok = emqx_ds_shared_sub_proto:agent_update_stream_states(
Leader, Agent, StreamProgresses, Version
);
),
ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL);
handle_stream_progress(
#{
state := ?updating,
state_data := #{
agent := Agent,
state_data := #{
leader := Leader,
version := Version,
prev_version := PrevVersion
}
} = _GSM,
} = GSM,
StreamProgresses
) ->
ok = emqx_ds_shared_sub_proto:agent_update_stream_states(
Leader, Agent, StreamProgresses, PrevVersion, Version
).
),
ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL).
handle_leader_invalidate(GSM) ->
transition(GSM, ?connecting, #{}).
@ -365,7 +387,14 @@ handle_state_timeout(
renew_lease_timeout,
_Message
) ->
handle_renew_lease_timeout(GSM).
handle_renew_lease_timeout(GSM);
handle_state_timeout(
GSM,
update_stream_state_timeout,
_Message
) ->
?tp(debug, update_stream_state_timeout, #{}),
handle_stream_progress(GSM, []).
handle_info(
#{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = _Info

View File

@ -75,7 +75,7 @@
%% States
-define(leader_waiting_registration, leader_waiting_registration).
-define(leader_replaying, leader_replaying).
-define(leader_active, leader_active).
%% Events
@ -96,6 +96,8 @@
-define(AGENT_TIMEOUT, 5000).
-define(START_TIME_THRESHOLD, 5000).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
@ -133,6 +135,7 @@ init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) ->
group => Group,
topic => Topic,
router_id => gen_router_id(),
start_time => now_ms() - ?START_TIME_THRESHOLD,
stream_progresses => #{},
stream_owners => #{},
agents => #{}
@ -146,37 +149,50 @@ handle_event({call, From}, #register{register_fun = Fun}, ?leader_waiting_regist
Self = self(),
case Fun() of
Self ->
{next_state, ?replaying, Data, {reply, From, {ok, Self}}};
{next_state, ?leader_active, Data, {reply, From, {ok, Self}}};
OtherPid ->
{stop_and_reply, normal, {reply, From, {ok, OtherPid}}}
end;
%%--------------------------------------------------------------------
%% repalying state
handle_event(enter, _OldState, ?leader_replaying, #{topic := Topic, router_id := RouterId} = _Data) ->
handle_event(enter, _OldState, ?leader_active, #{topic := Topic, router_id := RouterId} = _Data) ->
?tp(warning, shared_sub_leader_enter_actve, #{topic => Topic, router_id => RouterId}),
ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId),
{keep_state_and_data, [
{state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}},
{state_timeout, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}},
{state_timeout, 0, #renew_streams{}}
{{timeout, #renew_streams{}}, 0, #renew_streams{}},
{{timeout, #renew_leases{}}, ?RENEW_LEASE_INTERVAL, #renew_leases{}},
{{timeout, #drop_timeout{}}, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}
]};
handle_event(state_timeout, #renew_streams{}, ?leader_replaying, Data0) ->
%%--------------------------------------------------------------------
%% timers
%% renew_streams timer
handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data0) ->
% ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_streams}),
Data1 = renew_streams(Data0),
{keep_state, Data1, {state_timeout, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}};
handle_event(state_timeout, #renew_leases{}, ?leader_replaying, Data0) ->
{keep_state, Data1, {{timeout, #renew_streams{}}, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}};
%% renew_leases timer
handle_event({timeout, #renew_leases{}}, #renew_leases{}, ?leader_active, Data0) ->
% ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_leases}),
Data1 = renew_leases(Data0),
{keep_state, Data1, {state_timeout, ?RENEW_LEASE_INTERVAL, #renew_leases{}}};
handle_event(state_timeout, #drop_timeout{}, ?leader_replaying, Data0) ->
{keep_state, Data1, {{timeout, #renew_leases{}}, ?RENEW_LEASE_INTERVAL, #renew_leases{}}};
%% drop_timeout timer
handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0) ->
% ?tp(warning, shared_sub_leader_timeout, #{timeout => drop_timeout}),
Data1 = drop_timeout_agents(Data0),
{keep_state, Data1, {state_timeout, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}};
handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?leader_replaying, Data0) ->
{keep_state, Data1, {{timeout, #drop_timeout{}}, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}};
%%--------------------------------------------------------------------
%% agent events
handle_event(info, ?agent_connect_leader_match(Agent, _TopicFilter), ?leader_active, Data0) ->
% ?tp(warning, shared_sub_leader_connect_agent, #{agent => Agent}),
Data1 = connect_agent(Data0, Agent),
{keep_state, Data1};
handle_event(
info,
?agent_update_stream_states_match(Agent, StreamProgresses, Version),
?leader_replaying,
?leader_active,
Data0
) ->
% ?tp(warning, shared_sub_leader_update_stream_states, #{agent => Agent, version => Version}),
Data1 = with_agent(Data0, Agent, fun() ->
update_agent_stream_states(Data0, Agent, StreamProgresses, Version)
end),
@ -184,9 +200,12 @@ handle_event(
handle_event(
info,
?agent_update_stream_states_match(Agent, StreamProgresses, VersionOld, VersionNew),
?leader_replaying,
?leader_active,
Data0
) ->
% ?tp(warning, shared_sub_leader_update_stream_states, #{
% agent => Agent, version_old => VersionOld, version_new => VersionNew
% }),
Data1 = with_agent(Data0, Agent, fun() ->
update_agent_stream_states(Data0, Agent, StreamProgresses, VersionOld, VersionNew)
end),
@ -195,10 +214,11 @@ handle_event(
%% fallback
handle_event(enter, _OldState, _State, _Data) ->
keep_state_and_data;
handle_event(Event, _Content, State, _Data) ->
handle_event(Event, Content, State, _Data) ->
?SLOG(warning, #{
msg => unexpected_event,
event => Event,
content => Content,
state => State
}),
keep_state_and_data.
@ -218,11 +238,10 @@ terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) ->
%% * Revoke streams from agents having too many streams
%% * Assign streams to agents having too few streams
renew_streams(#{stream_progresses := Progresses, topic := Topic} = Data0) ->
renew_streams(#{start_time := StartTime, stream_progresses := Progresses, topic := Topic} = Data0) ->
TopicFilter = emqx_topic:words(Topic),
StartTime = now_ms(),
{_, Streams} = lists:unzip(
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, now_ms())
emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime)
),
%% TODO https://emqx.atlassian.net/browse/EMQX-12572
%% Handle stream removal
@ -274,6 +293,12 @@ revoke_excess_streams_from_agent(Data0, Agent, DesiredCount) ->
false ->
AgentState0;
true ->
?tp(warning, shared_sub_leader_revoke_streams, #{
agent => Agent,
agent_stream_count => length(Streams0),
revoke_count => RevokeCount,
desired_count => DesiredCount
}),
revoke_streams_from_agent(Data0, Agent, AgentState0, RevokeCount)
end,
set_agent_state(Data0, Agent, AgentState1).
@ -321,6 +346,12 @@ assign_lacking_streams(Data0, Agent, DesiredCount) ->
false ->
Data0;
true ->
?tp(warning, shared_sub_leader_assign_streams, #{
agent => Agent,
agent_stream_count => length(Streams0),
assign_count => AssignCount,
desired_count => DesiredCount
}),
assign_streams_to_agent(Data0, Agent, AssignCount)
end.
@ -346,12 +377,15 @@ connect_agent(
#{group := Group} = Data,
Agent
) ->
%% TODO
%% implement graceful reconnection of the same agent
?SLOG(info, #{
msg => leader_agent_connected,
agent => Agent,
group => Group
}),
DesiredCount = desired_streams_per_agent(Data),
% DesiredCount = desired_streams_for_new_agent(Data),
assign_initial_streams_to_agent(Data, Agent, DesiredCount).
assign_initial_streams_to_agent(Data, Agent, AssignCount) ->
@ -388,6 +422,7 @@ drop_timeout_agents(#{agents := Agents} = Data) ->
%% Send lease confirmations to agents
renew_leases(#{agents := AgentStates} = Data) ->
?tp(warning, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}),
ok = lists:foreach(
fun({Agent, AgentState}) ->
renew_lease(Data, Agent, AgentState)
@ -407,11 +442,11 @@ renew_lease(#{group := Group} = Data, Agent, #{
ok = emqx_ds_shared_sub_proto:leader_update_streams(
Agent, Group, PrevVersion, Version, StreamProgresses
),
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version, PrevVersion);
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, PrevVersion, Version);
renew_lease(#{group := Group}, Agent, #{
state := ?updating, version := Version, prev_version := PrevVersion
}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version, PrevVersion).
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, PrevVersion, Version).
%%--------------------------------------------------------------------
%% Handle stream progress updates from agent in replaying state
@ -427,7 +462,7 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, Version) ->
%% Agent finished updating, now replaying
Data1 = update_stream_progresses(Data0, Agent, AgentStreamProgresses),
AgentState1 = update_agent_timeout(AgentState0),
AgentState2 = agent_transition_to_replaying(AgentState1),
AgentState2 = agent_transition_to_replaying(Agent, AgentState1),
set_agent_state(Data1, Agent, AgentState2);
{?replaying, AgentVersion} ->
%% Common case, agent is replaying
@ -521,10 +556,10 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers
{AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses),
AgentState3 =
case AgentState2 of
#{revoke_streams := []} ->
agent_transition_to_waiting_replaying(AgentState2);
#{revoked_streams := []} ->
agent_transition_to_waiting_replaying(Data1, Agent, AgentState2);
_ ->
agent_transition_to_updating(AgentState2)
agent_transition_to_updating(Agent, AgentState2)
end,
set_agent_state(Data2, Agent, AgentState3);
{?updating, AgentPrevVersion, AgentVersion} ->
@ -533,8 +568,8 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers
{AgentState2, Data2} = clean_revoked_streams(Data1, AgentState1, AgentStreamProgresses),
AgentState3 =
case AgentState2 of
#{revoke_streams := []} ->
agent_transition_to_waiting_replaying(AgentState2);
#{revoked_streams := []} ->
agent_transition_to_waiting_replaying(Data1, Agent, AgentState2);
_ ->
AgentState2
end,
@ -566,10 +601,15 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers
agent_transition_to_waiting_updating(
#{group := Group} = Data,
Agent,
#{version := Version, prev_version := undefined} = AgentState0,
#{state := OldState, version := Version, prev_version := undefined} = AgentState0,
Streams,
RevokedStreams
) ->
?tp(warning, shared_sub_leader_agent_state_transition, #{
agent => Agent,
old_state => OldState,
new_state => ?waiting_updating
}),
NewVersion = next_version(Version),
AgentState1 = AgentState0#{
@ -585,7 +625,15 @@ agent_transition_to_waiting_updating(
),
AgentState1.
agent_transition_to_waiting_replaying(AgentState0) ->
agent_transition_to_waiting_replaying(
#{group := Group} = _Data, Agent, #{state := OldState, version := Version} = AgentState0
) ->
?tp(warning, shared_sub_leader_agent_state_transition, #{
agent => Agent,
old_state => OldState,
new_state => ?waiting_replaying
}),
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version),
AgentState0#{
state => ?waiting_replaying,
revoked_streams => []
@ -594,6 +642,11 @@ agent_transition_to_waiting_replaying(AgentState0) ->
agent_transition_to_initial_waiting_replaying(
#{group := Group} = Data, Agent, InitialStreams
) ->
?tp(warning, shared_sub_leader_agent_state_transition, #{
agent => Agent,
old_state => none,
new_state => ?waiting_replaying
}),
Version = 0,
StreamProgresses = stream_progresses(Data, InitialStreams),
Leader = this_leader(Data),
@ -609,13 +662,23 @@ agent_transition_to_initial_waiting_replaying(
update_deadline => now_ms() + ?AGENT_TIMEOUT
}.
agent_transition_to_replaying(#{state := ?waiting_replaying} = AgentState) ->
agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState) ->
?tp(warning, shared_sub_leader_agent_state_transition, #{
agent => Agent,
old_state => ?waiting_replaying,
new_state => ?replaying
}),
AgentState#{
state => ?replaying,
prev_version => undefined
}.
agent_transition_to_updating(#{state := ?waiting_updating} = AgentState) ->
agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState) ->
?tp(warning, shared_sub_leader_agent_state_transition, #{
agent => Agent,
old_state => ?waiting_updating,
new_state => ?updating
}),
AgentState#{state => ?updating}.
%%--------------------------------------------------------------------
@ -645,14 +708,24 @@ replaying_agents(#{agents := AgentStates}) ->
maps:to_list(AgentStates)
).
desired_streams_per_agent(#{agents := AgentStates, stream_progresses := StreamProgresses}) ->
AgentCount = maps:size(AgentStates),
desired_streams_per_agent(#{agents := AgentStates} = Data) ->
desired_streams_per_agent(Data, maps:size(AgentStates)).
desired_streams_for_new_agent(#{agents := AgentStates} = Data) ->
desired_streams_per_agent(Data, maps:size(AgentStates) + 1).
desired_streams_per_agent(#{stream_progresses := StreamProgresses}, AgentCount) ->
case AgentCount of
0 ->
0;
_ ->
StreamCount = maps:size(StreamProgresses),
(StreamCount div AgentCount) + 1
case StreamCount rem AgentCount of
0 ->
StreamCount div AgentCount;
_ ->
1 + StreamCount div AgentCount
end
end.
stream_progresses(#{stream_progresses := StreamProgresses} = _Data, Streams) ->

View File

@ -9,6 +9,7 @@
-module(emqx_ds_shared_sub_proto).
-include("emqx_ds_shared_sub_proto.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([
agent_connect_leader/3,
@ -47,15 +48,32 @@
stream_progress/0
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%% agent -> leader messages
-spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok.
agent_connect_leader(ToLeader, FromAgent, TopicFilter) ->
?tp(warning, shared_sub_proto_msg, #{
type => agent_connect_leader,
to_leader => ToLeader,
from_agent => FromAgent,
topic_filter => TopicFilter
}),
_ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)),
ok.
-spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok.
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
?tp(warning, shared_sub_proto_msg, #{
type => agent_update_stream_states,
to_leader => ToLeader,
from_agent => FromAgent,
stream_progresses => format_streams(StreamProgresses),
version => Version
}),
_ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)),
ok.
@ -63,6 +81,14 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
leader(), agent(), list(agent_stream_progress()), version(), version()
) -> ok.
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) ->
?tp(warning, shared_sub_proto_msg, #{
type => agent_update_stream_states,
to_leader => ToLeader,
from_agent => FromAgent,
stream_progresses => format_streams(StreamProgresses),
version_old => VersionOld,
version_new => VersionNew
}),
_ = erlang:send(
ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew)
),
@ -72,6 +98,14 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve
-spec leader_lease_streams(agent(), group(), leader(), list(stream_progress()), version()) -> ok.
leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
?tp(warning, shared_sub_proto_msg, #{
type => leader_lease_streams,
to_agent => ToAgent,
of_group => OfGroup,
leader => Leader,
streams => format_streams(Streams),
version => Version
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_lease_streams(OfGroup, Leader, Streams, Version)
@ -80,6 +114,12 @@ leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
-spec leader_renew_stream_lease(agent(), group(), version()) -> ok.
leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
?tp(warning, shared_sub_proto_msg, #{
type => leader_renew_stream_lease,
to_agent => ToAgent,
of_group => OfGroup,
version => Version
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_renew_stream_lease(OfGroup, Version)
@ -88,6 +128,13 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
-spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok.
leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
?tp(warning, shared_sub_proto_msg, #{
type => leader_renew_stream_lease,
to_agent => ToAgent,
of_group => OfGroup,
version_old => VersionOld,
version_new => VersionNew
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew)
@ -96,6 +143,14 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
-spec leader_update_streams(agent(), group(), version(), version(), list(stream_progress())) -> ok.
leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
?tp(warning, shared_sub_proto_msg, #{
type => leader_update_streams,
to_agent => ToAgent,
of_group => OfGroup,
version_old => VersionOld,
version_new => VersionNew,
streams_new => format_streams(StreamsNew)
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew)
@ -104,8 +159,29 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
-spec leader_invalidate(agent(), group()) -> ok.
leader_invalidate(ToAgent, OfGroup) ->
?tp(warning, shared_sub_proto_msg, #{
type => leader_invalidate,
to_agent => ToAgent,
of_group => OfGroup
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
ToAgent,
?leader_invalidate(OfGroup)
),
ok.
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
format_opaque(Opaque) ->
erlang:phash2(Opaque).
format_streams(Streams) ->
lists:map(
fun format_stream/1,
Streams
).
format_stream(#{stream := Stream, iterator := Iterator} = Value) ->
Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}.

View File

@ -13,7 +13,8 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/asserts.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
@ -51,17 +52,16 @@ end_per_testcase(_TC, _Config) ->
ok.
t_lease_initial(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
%% Need to pre-create some streams in "topic/#".
%% Leader is dummy by far and won't update streams after the first lease to the agent.
%% So there should be some streams already when the agent connects.
ok = init_streams(ConnPub, <<"topic1/1">>),
ConnShared = emqtt_connect_sub(<<"client_shared">>),
{ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic1/#">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic1/1">>, <<"hello2">>, 1),
ConnPub = emqtt_connect_pub(<<"client_pub">>),
{ok, _} = emqtt:publish(ConnPub, <<"topic1/1">>, <<"hello1">>, 1),
ct:sleep(2_000),
{ok, _} = emqtt:publish(ConnPub, <<"topic1/2">>, <<"hello2">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 10_000),
?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
ok = emqtt:disconnect(ConnShared),
@ -70,11 +70,6 @@ t_lease_initial(_Config) ->
t_lease_reconnect(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
%% Need to pre-create some streams in "topic/#".
%% Leader is dummy by far and won't update streams after the first lease to the agent.
%% So there should be some streams already when the agent connects.
ok = init_streams(ConnPub, <<"topic2/2">>),
ConnShared = emqtt_connect_sub(<<"client_shared">>),
%% Stop registry to simulate unability to find leader.
@ -93,7 +88,6 @@ t_lease_reconnect(_Config) ->
5_000
),
ct:sleep(1_000),
{ok, _} = emqtt:publish(ConnPub, <<"topic2/2">>, <<"hello2">>, 1),
?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
@ -114,7 +108,7 @@ t_renew_lease_timeout(_Config) ->
?wait_async_action(
ok = terminate_leaders(),
#{?snk_kind := leader_lease_streams},
5_000
10_000
),
fun(Trace) ->
?strict_causality(
@ -131,15 +125,6 @@ t_renew_lease_timeout(_Config) ->
%% Helper functions
%%--------------------------------------------------------------------
init_streams(ConnPub, Topic) ->
ConnRegular = emqtt_connect_sub(<<"client_regular">>),
{ok, _, _} = emqtt:subscribe(ConnRegular, Topic, 1),
{ok, _} = emqtt:publish(ConnPub, Topic, <<"hello1">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 5_000),
ok = emqtt:disconnect(ConnRegular).
emqtt_connect_sub(ClientId) ->
{ok, C} = emqtt:start_link([
{client_id, ClientId},